MetaTrader 5用のMQTTクライアントの開発:TDDアプローチ(第5回)
「早すぎる最適化は諸悪の根源である」ドナルド・クヌース
はじめに
MQTTはpub/subメッセージ共有プロトコルです。したがって、その中核はPUBLISHとSUBSCRIBEパケットにあると予想できます。他のすべてのパケットタイプは、その上に存在します。
クライアントが他のクライアントから受け取るメッセージもPUBLISHパケットであるため、PUBLISHパケットを書けるだけでなく、PUBLISHパケットを読むこともできなければなりません。配信プロトコルが対称的だからです。
「PUBLISHパケットは、Application Messageを伝送するために、クライアントからサーバーへ、あるいはサーバーからクライアントへ送られる。」
PUBLISHパケットは、Publishフラグを持つ固定ヘッダーと、UFT-8文字列としてエンコードされた必須Topic Nameと必須Packet Identifier(QoS>0の場合)を含む可変ヘッダーを持ちます。それに加えて、最終的には、MQTT 5.0で導入された、Request/Response相互作用モードに関連するプロパティを含む、ほとんどすべてのプロパティとユーザープロパティを使用することができます。
この記事では、ヘッダーの構造と、Publishフラグ、Topic Name、Packet Identifierのテストと実装方法について説明します。
以下の説明では、OASIS標準で使用されている必要[MUST]と可能[MAY]という用語を使用します。OASIS標準は、IETF RFC 2119で説明されているようにこれらの用語を使用します。
特に断りのない限り、引用はすべてOASIS標準からのものです。
MQTT 5.0 PUBLISHパケットの固定ヘッダーの構造
PUBLISHパケットの固定ヘッダーは、他のすべてのコントロールパケットタイプと同じ2バイトの基本構造に従っています。最初のバイトは、パケットタイプを伝送するためのものです。2番目のバイトは、可変バイト整数としてエンコードされたパケットの残り長のホストです。
他のすべてのパケットタイプが、最初のバイトの最初の4ビットをRESERVED状態にしているのに対し、PUBLISHパケットはこの4ビットを使用してRETAIN、QoSLevel、DUPの3つの機能をエンコードします。
MQTTコントロールパケット | 固定ヘッダーフラグ | ビット3 | ビット2 | ビット1 | ビット0 |
---|---|---|---|---|---|
CONNECT | Reserved | 0 | 0 | 0 | 0 |
CONNACK | Reserved | 0 | 0 | 0 | 0 |
PUBLISH | MQTTv5.0で使用 | DUP | QoS 2 | QoS 1 | RETAIN |
PUBACK | Reserved | 0 | 0 | 0 | 0 |
PUBREC | Reserved | 0 | 0 | 0 | 0 |
PUBREL | Reserved | 0 | 0 | 1 | 0 |
PUBCOMP | Reserved | 0 | 0 | 0 | 0 |
SUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
SUBACK | Reserved | 0 | 0 | 0 | 0 |
UNSUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
UNSUBACK | Reserved | 0 | 0 | 0 | 0 |
PINGREQ | Reserved | 0 | 0 | 0 | 0 |
PINGRESP | Reserved | 0 | 0 | 0 | 0 |
DISCONNECT | Reserved | 0 | 0 | 0 | 0 |
AUTH | Reserved | 0 | 0 | 0 | 0 |
表1:MQTT 5.0 OASIS標準の表2-3フラグビットの再現
「フラグビットがReservedとマークされている場合、それは将来の使用のために予約されており、記載されている値に設定されなければならない[MUST]。」
PUBLISHパケットと他のすべてのコントロールパケットとの間にこのような固定ヘッダーの違いがあるため、固定ヘッダーを生成するために使用してきた関数はここでは使用できません。
//+------------------------------------------------------------------+ //| SetFixedHeader | //+------------------------------------------------------------------+ void SetFixedHeader(ENUM_PKT_TYPE pkt_type, uchar& buf[], uchar& dest_buf[]) { dest_buf[0] = (uchar)pkt_type << 4; dest_buf[1] = EncodeVariableByteInteger(buf); }
おわかりのように、関数パラメータにはパケットタイプと2つの配列への参照のみが含まれており、1つは固定ヘッダー配列の送信元で、もう 1つは宛先です。最初の行は、Enumからパケットタイプの整数値を取り出し、その整数値を4ビット左シフトし、ビット演算の結果を固定ヘッダー配列の最初のバイト(dest_buf[0])に代入します。このビット演算により、最初の4ビットは未割り当て、つまり標準が要求するReservedのままとなります。
2行目は、パケットの残り長を計算する関数を呼び出し、その値を可変バイト整数としてエンコードされた固定ヘッダー配列(dest_buf[1])の2バイト目に代入します。
しかし、この関数はPublishフラグを設定する手段を提供しません。
図1:MQTT 5.0のPUBLISHパケット固定ヘッダーRETAIN、QoSレベル、DUPのフラグ
したがって、PUBLISHパケットを収容するためのSwitchと、PublishFlagsを受け取るための最後のパラメータを追加しました。PUBLISHパケットの特異性を実装するためにその本文を少し修正しながら、Publishフラグを受け取るために関数をオーバーロードすることができました。しかし、1つ(PUBLISH)を例外として他のケースはすべて以前の実装がデフォルトなので、これはSwitchにとって完璧なユースケースです。
つまり、パケットの固定ヘッダーをすべて設定するときには、このパラメータは無視できます。Publishフラグがオンの場合にのみ、dest_bufを変更します。
//+------------------------------------------------------------------+ //| SetFixedHeader | //+------------------------------------------------------------------+ void SetFixedHeader(ENUM_PKT_TYPE pkt_type, uchar& buf[], uchar& dest_buf[], uchar publish_flags = 0) { switch(pkt_type) { case PUBLISH: dest_buf[0] = (uchar)pkt_type << 4; dest_buf[0] |= publish_flags; dest_buf[1] = EncodeVariableByteInteger(buf); break; default: dest_buf[0] = (uchar)pkt_type << 4; dest_buf[1] = EncodeVariableByteInteger(buf); break; } }
見てわかるように、固定ヘッダーをホストする宛先バッファは、その最初のバイトへの割り当てと組み合わせたORビット演算によって変更されます。このパターンはConnectフラグの切り替えに多用してきましたが、今度は同じパターンをPublishフラグの切り替えに使用します。
例えば、RETAINフラグは以下のコードでオン/オフにされます。
//+------------------------------------------------------------------+ //| CPktPublish::SetRetain | //+------------------------------------------------------------------+ void CPktPublish::SetRetain(const bool retain) { retain ? m_publish_flags |= RETAIN_FLAG : m_publish_flags &= ~RETAIN_FLAG; SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags); }
QoS_1レベルフラグ(類似関数シグネチャを取り除いたもの)
QoS_1 ? m_publish_flags |= QoS_1_FLAG : m_publish_flags &= ~QoS_1_FLAG; SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);
QoS_2レベルフラグ
QoS_2 ? m_publish_flags |= QoS_2_FLAG : m_publish_flags &= ~QoS_2_FLAG; SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);
DUPフラグ
dup ? m_publish_flags |= DUP_FLAG : m_publish_flags &= ~DUP_FLAG; SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);
フラグ(フラグマスク)の値は、トグルされるバイト上の各ビットの位置に応じて、Enumで2の冪乗値として定義された定数です。
//+------------------------------------------------------------------+ //| PUBLISH - FIXED HEADER - PUBLISH FLAGS | //+------------------------------------------------------------------+ enum ENUM_PUBLISH_FLAGS { RETAIN_FLAG = 0x01, QoS_1_FLAG = 0x02, QoS_2_FLAG = 0x04, DUP_FLAG = 0x08 };
したがって、フラグは以下のような2進数値とバイト上の位置を持ちます。
RETAIN
1(10進数) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 1 |
QoS 1
2(10進数) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 |
QoS 2
4(10進数) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
DUP
8(10進数) | 0 | 0 | 0 | 0 | 1 | 0 | 0 | 0 |
PUBLISHパケットの10進数値は3です。
3(10進数) | 0 | 0 | 0 | 0 | 0 | 0 | 1 | 1 |
パケットタイプの値を4ビット左シフトしました(dest_buf[0] = (uchar)pkt_type << 4)。
48(10進数) | 0 | 0 | 1 | 1 | 0 | 0 | 0 | 0 |
パケットタイプ値とフラグのバイナリ表現にORビット演算(dest_buf[0] |= publish_flags;)を適用すると、実質的にビットが結合されます。つまり、DUPフラグがオンにされた左シフトされたPUBLISHパケット値のバイナリ表現は以下のようになります。
56(10進数) | 0 | 0 | 1 | 1 | 1 | 0 | 0 | 0 |
RETAINとQoS 2のフラグが設定された場合、固定ヘッダーの最初のバイトのビットは以下のようになります。
53(10進数) | 0 | 0 | 1 | 1 | 0 | 1 | 0 | 1 |
逆に、パケットタイプ値とフラグのバイナリ表現の1補数(~)とのANDビット演算で、フラグはオフになります(m_publish_flags &= ~RETAIN_FLAG )。
つまり、DUPもRETAINもなくてQoS 1でバイトが設定された場合、次のようになります。
50(10進数) | 0 | 0 | 1 | 1 | 0 | 0 | 1 | 0 |
上記のQoS 1フラグの1の補数は、すべてのビットを反転させた値です。
QoS_1フラグ | 0 | 0 | 1 | 0 |
~QoS_1フラグ | 1 | 1 | 0 | 1 |
0と任意の値のANDは0なので、事実上フラグがオフになります。
フラグを設定すれば、明らかにバイトのバイナリ値が変化します。すべてのフラグがオフの状態では、10進数3の値を4ビット左シフトした48の値を持ちます。RETAINフラグをオンにすると、10進数で49になり、RETAINとQoSが1だと、値は51になる、などです。
これらの10進数値は、テストでフラグの設定/解除のすべての可能な組み合わせを探索する際に求める値です。
//+------------------------------------------------------------------+ //| TEST_SetFixedHeader_DUP_QoS2_RETAIN | //+------------------------------------------------------------------+ bool TEST_SetFixedHeader_DUP_QoS2_RETAIN() { Print(__FUNCTION__); //--- Arrange static uchar expected[] = {61, 0}; uchar buf[] = {}; //--- Act CPktPublish *cut = new CPktPublish(buf); cut.SetDup(true); cut.SetQoS_2(true); cut.SetRetain(true); uchar result[]; ArrayCopy(result, cut.ByteArray); //--- Assert bool isTrue = AssertEqual(expected, result); //--- cleanup delete cut; ZeroMemory(result); return isTrue; }
実装の前に書かれたこのやや素朴なテスト(およびもう少し複雑な他のテスト)は私たちの開発を導くものです。これらは目の前のタスクに集中できるだけでなく、コードの変更やリファクタリングが必要になったときの「セーフティネット」として機能するためです。添付ファイルにはこれらのテストが多数あります。
テストを実行すると、次のようになるはずです。
図2:MQTT 5.0 PUBLISHテスト出力固定ヘッダー
パブリッシュ/サブスクライブサイクルがプロトコルの中核であるならば、これら3つの機能(RETAIN、DUP、QoS)はプロトコルのOperational Behaviorの中核です。それらは間違いなく、Session状態の管理に大きな影響を与えるでしょう。そこで、厳密なプロトコルの仕様を少し超えて、そのセマンティクスを感覚的に理解してみましょう。
RETAIN
本連載の最初の部分で見たように、パブリッシュ/サブスクライブパターンは特定のTopic Nameに結びつきます。クライアントはあるTopic Nameでメッセージをパブリッシュするか、あるTopic Nameをサブスクライブし、すべてのクライアントはサブスクライブしたTopic Nameでパブリッシュされたメッセージを受け取ります。
パブリッシュする際、RETAINフラグを1 (true)に設定することで、サーバーにメッセージを保存し、新しいサブスクライバーに「保持メッセージ」として配信するよう指示することができます。保持メッセージは常に1つだけであり、既存の保持メッセージを保存/置き換えるためにRETAINを1に設定します。このフラグを1に設定して0バイトのペイロードを送信し、保持メッセージをクリーンアップします。このTopic Nameで保持されているメッセージに対して、保存、交換、クリーンアップを何もしないようにサーバーに指示するために、この値を0に設定解除します。
Topic Nameにサブスクライブすると、保持メッセージを受け取ります。Shared Subscriptionでは、保持メッセージは、共有Topic Filterのクライアントの1つだけに送信されます。Shared Subscriptionについては、SUBSCRIBEパケットを扱うときに深く掘り下げます。
この機能は、サーバーから送信されるCONNACKパケットのRetain AvailableおよびRetain Not Supportedのフラグと連動して動作します。
保持メッセージは、PUBLISHまたはCONNECTペイロードのWill Propertiesで設定されたMessage Expiry Intervalに従って、他のメッセージと同様に失効します。
RETAINは動的な証券会社機能であり、同じSessionで「利用可能」から「サポートされていない」に、あるいはその逆に変化する可能性があることを考慮しなければなりません。
QoSレベル
QoSレベルについては、本連載の初回記事で、プロトコルの作成者がおこなった設計上の選択を列挙した際にすでに触れました。
「技術スタックの制限と高価なネットワークコストのため、堅牢で高速かつ安価に設計されているにもかかわらず、信頼性の低い、あるいは断続的なインターネット接続にも対応できる、継続的なセッション認識によるサービス品質のデータ配信を提供する必要がありました。」
Connectフラグの文脈では、各QoSレベルの定義が以下の表に示されています。
QoS値 | ビット2 | ビット1 | 詳細 |
---|---|---|---|
0 | 0 | 0 | 最大1回まで |
1 | 0 | 1 | 少なくとも1回の配信 |
2 | 1 | 0 | きっちり1回の配信 |
- | 1 | 1 | 予約済み:使用不可 |
表2:MQTT 5.0 OASIS標準の表3-9QoS定義の再現
QoSレベルやその他の機能の使用について説明する間、メッセージを配信するサービスを指定するために「サーバー」と「証券会社」という用語を使用してきました。しかし、スタンダードによれば
「配信プロトコルは対称的である。以下の説明では、クライアントとサーバーはそれぞれ送信者または受信者の役割を果たすことができる。配信プロトコルは、単一の送信者から単一の受信者へのアプリケーションメッセージの配信にのみ関係する。サーバーが複数のクライアントにApplication Messageを配信する場合、各クライアントは独立して扱われる。Application Messageをアウトバウンドでクライアントに配信するために使用されるQoSレベルは、インバウンドのApplication MessageのQoSレベルとは異なる可能性がある。」(強調は私たちのものです。)
つまり、これまで私たちが使用してきた意味での「サーバー」と「証券会社」という用語の使用は、広い意味でのクライアントの立場から話しているため正当化されますが、配信プロトコルにおけるこの対称性を覚えておいてください。
デフォルトのQoSレベルは0です。つまり、このフラグを設定しないと、サーバーに0が受け入れ可能な最大QoSレベルであることを通知することになります。コンプライアンスを遵守する証券会社であれば、このレベルを受け入れます。これは「ファイア・アンド・フォーゲット」パブリッシングであり、配信時に紛失と重複の両方が発生する可能性があることを送信者が受け入れるものです。
図3:MQTT 5.0-QoSレベル0クライアントサーバフロー図
QoSレベル1では、配信時に重複が発生することは許容しますが、紛失は許容しません。サーバーはPUBACKでメッセージを確認します。
図4:MQTT 5.0-QoSレベル1クライアントサーバーフロー図
QoSレベル2では、紛失や重複は発生しません。このレベルには4つのパケットがあります。サーバーはPUBRECで配信が始まったことを認識します。その後、クライアントはPUBRELで特定のPacket Identifierの解放を要求し、最後にサーバーはPUBCOMPで配信完了を通知します。
図5:MQTT 5.0-QoSレベル2クライアントサーバーフロー図
次は、前回Connectフラグの話をしたときの例えです。
「この[QoS 2]レベルは、書留小包を送るようなものだと考えることができます。郵便システムは、小包を相手の手に渡すと、当面は正しい住所に届ける責任があることを示す受領書を渡します。そして、小包が配達されると、受取人から小包の配達を認める署名入りの領収書が送られてきます。」
サービス品質はWill Message、Subscription (Shared Subscriptionを含む)、または特定のメッセージに対して要求することができます。
Will Message | Subscription | Message |
---|---|---|
CONNECT Will QoS | SUBSCRIBE Subscriptionオプション | PUBLISH QoS レベルフラグ |
表3:QoSレベルを設定できるMQTT 5.0パケットとフラグ
注意深い読者は、QoS 1とQoS 2の両方が、ある種のSession状態を含むことに気づいたかもしれません。Session状態と対応する永続化層については、この広範なトピックに特化した記事で扱います。
DUP
設定されていると、以前に失敗したPUBLISHパケットの送信を再試行していることを示します。すべてのQoS 0メッセージに対して0に設定されなければなりません[MUST]。重複はパケットそのものを指し、メッセージを指すのではありません。
MQTT 5.0 PUBLISHパケットの可変ヘッダー:Topic Name、Packet Identifier、およびプロパティ
MQTT 5.0PUBLISHパケットの可変ヘッダーはTopic Nameを持たなければならず[MUST]、QoSが0(0)より大きい場合、Packet Identifierも持たなければなりません[MUST]。これら2つのフィールドの後には、通常、プロパティとペイロードのセットが続きますが、プロパティと長さ0のペイロードのないPUBLISHパケットも有効なパケットです。言い換えれば、最も単純で有効なPUBLISHパケットは、QoS 0の固定ヘッダー、DUPフラグとRETAINフラグなし、Topic Nameのみの可変ヘッダーを持つものです。
Topic Name
pub/subメッセージ共有プロトコルにおけるクライアントとサーバー間のすべてのやり取り、ひいてはユーザー/デバイス間のすべてのやり取りは、トピックへのパブリッシュとトピックへのサブスクライブを中心に展開されるので、Topic Nameフィールドはここで特別な注意を払うに値すると言えます。多くのリアルタイムサービスでは、Topic Nameの代わりに「チャンネル」という言葉が使用されています。Topic Nameはクライアントがサブスクライブしている情報チャネルを表すからです。
Topic Nameは、階層ツリー構造で編成されたUTF-8エンコード文字列です。フォワードスラッシュ(「/」U+002F)は、トピックレベルの区切り文字として使用されます。
broker1/account12345/EURUSD
大文字と小文字は区別されます。つまり、以下は2つの異なるトピックです。
- broker1/account12345/EURUSD
- broker1/account12345/eurusd
これらのレベルセパレータは、Topic Filterのワイルドカード文字(下記参照)のいずれかがクライアントサブスクリプションに存在するときにのみ意味を持ちます。UTF-8文字列自体の制限を除けば、レベル数に制限はありません。最終的には、Topic NameはTopic Aliasに置き換えられます。
「Topic Aliasは、Topic Nameを使用する代わりにトピックを識別するために使用される整数値である。これはPUBLISHパケットのサイズを小さくするもので、Topic Nameが長く、ネットワーク接続内で同じTopic Nameが繰り返し使用される場合に役に立つ。」
Packet Identifier
Packet Identifierは、QoS>0のPUBLISHパケットに必要な2バイトの整数フィールドです。これは、Sessionステート管理のために、Pub/Subサイクルに直接関与するすべてのパケットで使用されます。Packet Identifierは、QoS 0のPUBLISHでは使用してはなりません[MUST NOT]。
これは、PUBLISHとそれに関連するACKを接続するために使用されます。
配信プロトコルは対称的であるため、QoS 1を使用する場合、クライアントは、私たちが以前に送信したPUBLISHに関連するPUBACKを受信する前に、同じパケットIDでサーバーからPUBLISHを受信する可能性があることを覚えておいてください。
「クライアントがPacket Identifier 0x1234のPUBLISHパケットを送信し、送信したPUBLISHパケットに対するPUBACKを受信する前に、そのサーバーからPacket Identifier 0x1234の別のPUBLISHパケットを受信することは可能である。」
Topic Nameの書き方
Topic Nameは、可変ヘッダーの最初のフィールドです。これはUTF-8の文字列としてエンコードされ、Unicodeの符号位置がいくつか許可されていません。MQTT 5.0でUTF-8文字列をエンコードするために必要な3つの文をご覧ください。
「[...]文字データはU+D800とU+DFFFの間の符号位置のエンコーディングを含んではならない[MUST NOT]。クライアントまたはサーバーが不正な形式のUTF-8を含むMQTTコントロールパケットを受信した場合は、Malformed Packetである。」
「UTF-8エンコード文字列は、ヌル文字U+0000のエンコードを含んではならない[MUST NOT]。受信者(サーバーまたはクライアント)がU+0000を含むMQTTコントロールパケットを受信した場合は、Malformed Packetである。」
「データは、以下に列挙するUnicode[Unicode]符号位置のエンコーディングを含むべきではない[SHOULD NOT]」。受信者(サーバーまたはクライアント)がこれらのいずれかを含むMQTT Control Packetを受信した場合は、Malformed Packetとして扱ってもよい[MAY]。次は、許可されていないUnicodeの符号位置である。
U+0001...U+001F制御文字
U+007F...U+009F制御文字
Unicode仕様[Unicode]で非文字と定義されている符号位置」
ご覧のように、上記の1つ目と2つ目の文はどちらも厳密[MUST NOT]であり、準拠した実装であれば許可されていない符号位置の存在を確認することを意味します。一方、3つ目の文は推奨[SHOULD NOT]であり、許可されていない符号位置の存在を確認しなくても、準拠した実装とみなされることを意味します。
Malformed Packetは切断の理由となるため、もし私たちがクライアントでこれらの符号位置を許可し、証券会社がそれらをMalformed Packetとして扱わないことを選択した場合、推奨を実施する他のクライアントの切断を引き起こす可能性があります。そのため、Unicodeの制御文字や非文字の除外は推奨に過ぎないにもかかわらず、私たちの実装ではこれらを許可していません。
今のところ、文字列をUTF-8としてエンコードする関数は次のようになります。
//+------------------------------------------------------------------+ //| Encode UTF-8 String | //+------------------------------------------------------------------+ void EncodeUTF8String(string str, ushort& dest_buf[]) { uint str_len = StringLen(str); // check for disallowed Unicode code points uint iter_pos = 0; while(iter_pos < str_len) { Print("Checking disallowed code points"); ushort code_point = StringGetCharacter(str, iter_pos); if(IsDisallowedCodePoint(code_point)) { printf("Found disallowed code point at position %d", iter_pos); ZeroMemory(dest_buf); return; } printf("Iter position %d", iter_pos); iter_pos++; } if(str_len == 0) { Print("Cleaning buffer: string empty"); ZeroMemory(dest_buf); return; } // we have no disallowed code points and the string is not empty: encode it. printf("Encoding %d bytes ", str_len); ArrayResize(dest_buf, str_len + 2); dest_buf[0] = (char)str_len >> 8; // MSB dest_buf[1] = (char)str_len % 256; // LSB ushort char_array[]; StringToShortArray(str, char_array, 0, str_len);// to Unicode ArrayCopy(dest_buf, char_array, 2); ZeroMemory(char_array); }
この関数に渡された文字列が許可されていない符号位置を持っている場合、文字列上のその符号位置の位置を記録し、宛先バッファをZero Memoryに渡し、すぐに戻ります。Topic Nameの長さは最低で1である必要であるため、文字列が空の場合は、同じようにログを記録し、バッファをクリーンアップして戻ります。
ところで、StringToShortArrayを使用して文字列をUnicode配列に変換していることにご注目ください。もしASCII配列に変換するのであれば、StringToCharArrayを使うでしょう。詳細な説明やその他多くのことは、最近ドキュメントに含まれた本や、MQL5の文字列に関する包括的な記事でご覧いただけます。
また、このStringToShortArrayの同じ呼び出しでは、関数のデフォルトではなく、文字列の長さを最後のパラメータとして使用していることにご注意ください。これは、配列にヌル文字(0x00)を入れたくないからです。関数のドキュメントによれば、
「デフォルト値は-1であり、これは配列の最後、つまり端末0までコピーすることを意味します。端末0も受信者の配列にコピーされます」
一方、StringLenの戻り値は
「末尾の0を除いた文字列の記号の数」です。
許可されない符号位置を確認する関数は些細なものです。
//+------------------------------------------------------------------+ //| IsDisallowedCodePoint | //| https://unicode.org/faq/utf_bom.html#utf16-2 | //+------------------------------------------------------------------+ bool IsDisallowedCodePoint(ushort code_point) { if((code_point >= 0xD800 && code_point <= 0xDFFF) // Surrogates || (code_point > 0x00 && code_point <= 0x1F) // C0 - Control Characters || (code_point >= 0x7F && code_point <= 0x9F) // C0 - Control Characters || (code_point == 0xFFF0 || code_point == 0xFFFF)) // Specials - non-characters { return true; } return false; };
許可されない符号位置のほかに、サブスクリプションのTopic Filterでは使用されるがTopic Nameでは禁止されている2つのワイルドカード文字、プラス記号(「+」U+002B)と数字記号(「#」U+0023)も確認する必要があります。
許可されない符号位置を確認する関数は、あらゆる文字列をエンコードするために一般的に使用されるため、MQTT.mqhヘッダーでホストされます。一方、ワイルドカード文字を確認する関数はTopic Nameに特有であるため、CPktPublishクラスの一部です。
//+------------------------------------------------------------------+ //| CPktPublish::HasWildcardChar | //+------------------------------------------------------------------+ bool CPktPublish::HasWildcardChar(const string str) { if(StringFind(str, "#") > -1 || StringFind(str, "+") > -1) { printf("Wildcard char not allowed in Topic Names"); return true; } return false; }
組み込み関数StringFindは、マッチする部分文字列の開始位置を返し、マッチする部分文字列が見つからなければ-1を返します。つまり、-1以上の値があるかどうかを確認するだけです。そして、それをメイン関数から呼び出します。
//+------------------------------------------------------------------+ //| CPktPublish::SetTopicName | //+------------------------------------------------------------------+ void CPktPublish::SetTopicName(const string topic_name) { if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0) { ArrayFree(ByteArray); return; } ushort encoded_string[]; EncodeUTF8String(topic_name, encoded_string); ArrayCopy(ByteArray, encoded_string, 2); ByteArray[1] = EncodeVariableByteInteger(encoded_string); }
この時点で、ワイルドカードが見つかれば、これまでと同じ「エラー処理」をおこないます。情報をログに記録し、バッファをクリアしてすぐに戻ります。後でアラートを上げるなどして、これを改善することができます。
関数の最後の行は、標準が提案するアルゴリズムを使用して、パケットの残り長を固定ヘッダーの2バイト目に割り当てます。本連載の第1回でそれについてコメントしました。
テストもまったく同じ構成になっています。
//+------------------------------------------------------------------+ //| TEST_SetTopicName_WildcardChar_NumberSign | //+------------------------------------------------------------------+ bool TEST_SetTopicName_WildcardChar_NumberSign() { Print(__FUNCTION__); //--- Arrange static uchar expected[] = {}; uchar payload[] = {}; //--- Act CPktPublish *cut = new CPktPublish(payload); cut.SetTopicName("a#"); uchar result[]; ArrayCopy(result, cut.ByteArray); //--- Assert bool isTrue = AssertEqual(expected, result); //--- cleanup delete cut; ZeroMemory(result); return isTrue; }
テストを実行すると、このように表示されるはずです。
図6:MQTT 5.0 PUBLISHテスト出力Topic Name
Packet Identifierの書き方
Packet Identifierは、ユーザーが割り当てるものではありません。その代わりに、QoSレベルが0を超えるPUBLISHパケットには、クライアントが割り当てなければなりません[MUST]。言い換えれば、QoS 1またはQoS 2でPUBLISHパケットを構築するたびに、そのPacket Identifierを設定しなければなりません。
今すぐテストを始めましょう。必要なのは、パケットをインスタンス化し、必要なTopic NameとQoSを1または2に設定することだけです。結果として得られるパケットバイト配列は、パケットIDを持つはずです。
//+------------------------------------------------------------------+ //| TEST_SetPacketID_QoS2_TopicName1Char | //+------------------------------------------------------------------+ bool TEST_SetPacketID_QoS2_TopicName5Char() { Print(__FUNCTION__); // Arrange uchar payload[] = {}; uchar result[]; // expected {52, 9, 0, 1, 'a', 'b', 'c', 'd', 'e', pktID MSB, pktID LSB} // Act CPktPublish *cut = new CPktPublish(payload); // FIX: if we call SetQoS first this test breaks cut.SetTopicName("abcde"); cut.SetQoS_2(true); ArrayCopy(result, cut.ByteArray); // Assert ArrayPrint(result); bool is_true = result[9] > 0 || result[10] > 0; // cleanup delete cut; ZeroMemory(result); return is_true; }
生成されたパケットIDは(擬似的な)乱数であるため、その値をテストすることはできません。その代わりに、その存在をテストします。また、FIXが必要であることにもご注意ください。SetTopicNameとSetQoS_Xを呼び出す関数の順序が、結果のバイト配列に予期せぬ影響を及ぼしています。関数間に呼び出し順の依存関係があるのは良い考えではありません。これはバグでしょうが、諺にもあるように、バグとは書かれていないテストです。そこで、次の反復では、この呼び出し順依存性がないことのテストを書くことにします。今のところは、このテストに合格することだけを考えています。
もちろん、パケットIDを設定する関数の実装ができるまでは、テストはコンパイルすることさらできません。Packet Identifierはいくつかのコントロールパケットで必要とされるため、それを書き込む関数はCPktPublishクラスのメンバーであってはなりません。MQTT.mqhヘッダーは、それをホストするファイルとしてより適しているようです。
//+------------------------------------------------------------------+ //| SetPacketID | //+------------------------------------------------------------------+ void SetPacketID(uchar& buf[], int start_idx) { // MathRand - Before the first call of the function, it's necessary to call // MathSrand to set the generator of pseudorandom numbers to the initial state. MathSrand((int)TimeLocal()); int packet_id = MathRand(); if(ArrayResize(buf, buf.Size() + 2) < 0) { printf("ERROR: failed to resize array at %s", __FUNCTION__); return; } buf[start_idx] = (uchar)packet_id >> 8; // MSB buf[start_idx + 1] = (uchar)packet_id % 256; //LSB }
パケット識別子を生成するために、組み込み関数MathRandを使用しています。MathSrandを事前に呼び出す必要があります。この関数に乱数生成器の「種」を渡さなければなりません。MQL5における擬似乱数生成に関する明確な言及が最近ドキュメントに追加された本にあった推奨に従って、TimeLocalを種として選択します。
パケットIDを設定するために、元のバイト配列をパケットID(2バイトの整数)用のスペースが空くようにリサイズし、引数として渡された位置(start_idx)から始まる最上位バイトと最下位バイトの値を設定します。最後のステップは、SetQoS_1メソッドとSetQoS_2メソッドでCPktPublishクラスの関数を呼び出すことです。
//+------------------------------------------------------------------+ //| CPktPublish::SetQoS_2 | //+------------------------------------------------------------------+ void CPktPublish::SetQoS_2(const bool QoS_2) { QoS_2 ? m_publish_flags |= QoS_2_FLAG : m_publish_flags &= ~QoS_2_FLAG; SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags); SetPacketID(ByteArray, ByteArray.Size()); }
添付ファイルに含まれるテストを実行すると、次のようなものが表示されるはずです(ここでは簡潔にするために削除)。
図7:MQTT 5.0 PUBLISHテスト出力Packet Identifier
結論
プロトコルの中核であるPUBLISHパケットを実装するのは少し大変です。このパケットは異なる固定ヘッダーを持ち、Topic NameをUTF-8でエンコードし、いくつかの許可されない符号位置から保護された可変ヘッダーを必要とし、QoS>0の場合はパケット識別子を必要とし、MQTT 5.0で利用可能なほぼすべてのプロパティとユーザープロパティを使用することができます。
この記事では、Publishフラグ、Topic Name、Packet Identifierで有効なPUBLISHヘッダーを構築する方法を報告しました。次回は、そのプロパティの書き方について見ていきます。
最後の変更についての余談です。このMQTTクライアントの開発をフォローしている読者は、いくつかの関数シグネチャ、変数名、フィールドアクセスレベル、テストフィクスチャなどが変更されたことにお気づきかもしれません。これらの変更のいくつかは、どのようなソフトウェア開発においても予想されるものですが、そのほとんどは、私たちがTDDアプローチを使用しており、この方法論にできるだけ忠実であろうと努力しているためです。最初の成果物を得るまでには、多くの変化が予想されます。
ご存知のように、このようなコードベースのクライアントを開発するために必要なことをすべて知っている開発者はいません。TDDは、私たちの「巨大なスペック、ベイビーステップ」の旅に大いに役立っていますが、もしお手伝いいただけるのであれば、コミュニティチャットか、以下のコメント欄に書き込んでください。どんな手助けでも大歓迎です。よろしくお願いします。
MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/13998
- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索