
MetaTrader 5用のMQTTクライアントの開発:TDDアプローチ(第6回)
「楽観主義はプログラミングの職業病であり、フィードバックはその治療です。」(ケント・ベック)
はじめに
テスト駆動開発手法には多くの利点がありますが、大きな欠点が1つあります。その利点のひとつは、明確に定義されたユニットと、適切に命名された変数を書くことで、高いテストカバレッジを達成し、ドメインをより深く理解し、過剰なエンジニアリングを避け、目の前のタスクに集中し続けることができることです。大きな欠点は、目の前のタスクに焦点を絞ったことの直接的な結果です。つまり、プロジェクト全体の複雑さに怯えるのを避けるために、開発者として、一度にできる限り小さな課題を1 つだけ解決し続けます。天才が複雑さを解決することで複雑さを取り除く人だとすれば、TDD開発者は複雑さを意図的に無視する人です。
そうです。目隠しをした馬や、ニンジンを追いかけるロバのようにです。
しかし、私たちが無視したからといって、複雑さが消えるわけではありません。それはそこに留まり、私たちがそれに直面するのを待っています。葉をよく見るために森を無視することで、技術的負債を残し続けます。冗長な関数、重複したメンバー、役に立たないテスト、不必要なクラス、読めないコード、到達できないコードなどを放置し続けるのです。開発中に蓄積されたこの技術的負債は、生産性に悪影響を及ぼす可能性があります。それが、リファクタリングがTDDの実践に不可欠な要素である理由です。下図は、TDD実践の典型的なステップを示しています。
図01:TDD実践の典型的なステップ:レッド、グリーン、リファクタリング(出典:IBM Developer)
以下のセクションでは、以前書いたクラスをどのようにリファクタリングしたかを説明し、いくつかの改善点についてコメントします。これらの改良を経て、私たちがどのようにPUBLISHパケットを構築しているのか、そしてどのようにしてパケット構築クラスの実行可能な設計図にたどり着いたのかを紹介します。新しいパターンに従った最初のクラスはPUBACKクラスです。PUBACKパケットはQoS 1のPUBLISHパケットと対になるものなので、Session Stateの管理を始める必要があります。私たちのクライアントは、状態を保持し更新するために、ある種の永続化層を持つ必要があります。
永続化層はOASIS標準の範囲外です。これはアプリケーション固有のものです。ローカルファイルシステム内の単純なファイルかもしれないし、クラウド上の完全に分散された高可用性データベースシステムかもしれません。ここでの目的のためには、Windows上でローカルに、あるいはWSL経由で動作するPostgreSQLサーバーのようなデータベースで十分です。ただし、ここではMQLとSQLiteをネイティブに統合しているので、このシングルファイル、ノーサーバーのRDBMSが当然の選択です。SQLiteは軽量かつスケーラブルで信頼性が高く、サーバーのメンテナンスが不要です。オンメモリだけのデータベースも持てるので、テストやデバッグにはかなり便利です。
しかし、この時点では、永続化層を実装するつもりはありません。なぜなら、Session Stateの管理を扱う前に、パケットの書き込みと読み込みを十分にテストすることにしたからです。永続化層に進む前に、MQTTプロトコルで使用されるさまざまなデータタイプを正しくエンコードおよびデコードしていることを確認する必要があります。この目標を達成するために広範なユニットテストを書き、まもなく実際の証券会社(Eclipse Foundationからのオープンソースmosquitto証券会社)に対してローカルで動作している小さな機能テストを開始する予定です。
PUBLISH/PUBBACKインタラクションをテストするために、偽のデータベース、テストに必要な制御されたデータを生成する関数のコレクション、一種のフィクスチャを使用することにします。CPubackクラスについて説明するときに紹介します。
以下の説明では、OASIS標準で使用されている必要[MUST]と可能[MAY]という用語を使用します。OASIS標準は、IETF RFC 2119で説明されているようにこれらの用語を使用します。
特に断りのない限り、引用はすべてOASIS標準からのものです。
PUBLISHパケットの構築の仕方
CPublishクラスを書き直す過程で、いくつかのクラスメンバーを削除しました。また、固定ヘッダーと可変ヘッダーのビルドをワンステップビルダーに統合しました。これらの変更は、他のコントロールパケットクラスでも同様です。
現在、CPublishクラスには以下のメンバとメソッドがあります。
//+------------------------------------------------------------------+ //| Publish.mqh | //| ********* WORK IN PROGRESS ********** | //| **** PART OF ARTICLE https://www.mql5.com/ja/articles/14391 **** | //+------------------------------------------------------------------+ #include "IControlPacket.mqh" //+------------------------------------------------------------------+ //| PUBLISH VARIABLE HEADER | //+------------------------------------------------------------------+ /* The Variable Header of the PUBLISH Packet contains the following fields in the order: Topic Name, Packet Identifier, and Properties. */ //+------------------------------------------------------------------+ //| Class CPublish. | //| Purpose: Class of MQTT Publish Control Packets. | //| Implements IControlPacket | //+------------------------------------------------------------------+ class CPublish : public IControlPacket { private: bool IsControlPacket() {return true;} bool HasWildcardChar(const string str); protected: uchar m_pubflags; uint m_remlen; uchar m_topname[]; uchar m_props[]; uint m_payload[]; public: CPublish(); ~CPublish(); //--- methods for setting Publish flags void SetRetain(const bool retain); void SetQoS_1(const bool QoS_1); void SetQoS_2(const bool QoS_2); void SetDup(const bool dup); //--- method for setting Topic Name void SetTopicName(const string topic_name); //--- methods for setting Properties void SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format); void SetMessageExpiryInterval(uint msg_expiry_interval); void SetTopicAlias(ushort topic_alias); void SetResponseTopic(const string response_topic); void SetCorrelationData(uchar &binary_data[]); void SetUserProperty(const string key, const string val); void SetSubscriptionIdentifier(uint subscript_id); void SetContentType(const string content_type); //--- method for setting the payload void SetPayload(const string payload); //--- method for building the final packet void Build(uchar &result[]); };
単純化されただけでなく、パブリッシュフラグ、トピック名、プロパティを設定するプロセスはすべて独立しています。つまり、Build()メソッドが最後に呼び出されるのであれば、それぞれをどのような順番で設定しても構いません。
このテストは、その動作を形式化したもので、RETAINとQoS1という2つのフラグを設定したクラスコンストラクタと、必須のTopic Nameをテストします。
bool TEST_Ctor_Retain_QoS1_TopicName1Char() { Print(__FUNCTION__); CPublish *cut = new CPublish(); uchar expected[] = {51, 6, 0, 1, 'a', 0, 1, 0}; // QoS > 0 require packet ID uchar result[]; cut.SetTopicName("a"); cut.SetRetain(true); cut.SetQoS_1(true); cut.Build(result); bool isTrue = AssertEqual(expected, result); delete(cut); ZeroMemory(result); return isTrue; }
これで、SetTopicName()、SetRetain()、SetQos1()の各メソッドは、どのような順番で呼び出しても、 結果のパケットは有効なままとなります。前述したように、この動作はすべてのコントロールパケットクラスで再現されており、パブリッシュフラグのすべての組み合わせに対してテストをおこなっています。添付ファイルをご覧ください。
PUBLISHパケットの固定ヘッダー
PUBLISHパケットの固定ヘッダーは、現在のバージョンのプロトコルの他のすべてのMQTT 5.0コントロールパケットとは異なります。将来のために予約されていないフラグが3つあります。RETAIN、QoS、DUPフラグです。本連載の第5回(前回の記事)では、これらのPUBLISHフラグについて詳しく書いています。
図02:MQTT 5.0のPUBLISHパケット固定ヘッダーRETAIN、QoSレベル、DUPフラグ
パブリッシュフラグのいずれかを切り替えるために同じパターンを使用していますが、リファクタリングの結果、各パブリッシュフラグでSetFixedHeader()を呼び出さなくなりました。まず、トグルをブール値として定義し、関数の引数として渡します。
void CPktPublish::SetRetain(const bool retain) { retain ? m_pubflags |= RETAIN_FLAG : m_pubflags &= ~RETAIN_FLAG; }
次に、ブール値がtrueかfalseかを確認します。
void CPktPublish::SetQoS_1(const bool QoS_1) { QoS_1 ? m_pubflags |= QoS_1_FLAG : m_pubflags &= ~QoS_1_FLAG; }
ブール値がtrueの場合、フラグ値とucharメンバ(1バイト)の間でビットごとのOR代入をおこない、フラグを設定します。
void CPktPublish::SetQoS_2(const bool QoS_2) { QoS_2 ? m_pubflags |= QoS_2_FLAG : m_pubflags &= ~QoS_2_FLAG; }
ブール値がfalseの場合、フラグ値と同じucharメンバの間でビットごとのAND代入をおこない、フラグを解除します。
void CPktPublish::SetDup(const bool dup) { dup ? m_pubflags |= DUP_FLAG : m_pubflags &= ~DUP_FLAG; }
こうすることで、m_pubflags変数は、パケット設定中に設定/解除されたすべてのフラグを保持します。その後、Build()メソッドが呼ばれると、今度はm_pubflagsとパケットの最初のバイト(バイト0)との間で、再びビットごとのOR代入を実行します。
pkt[0] |= m_pubflags;
PUBLISHパケットの可変ヘッダー
PUBLISHパケットの可変ヘッダーには、Topic Name、Packet Identifier、Propertiesのフィールドが順番に含まれます。
Topic Name
パブリッシャとサブスクライバーの間のすべての関係は、パブリケーションのTopic Nameに関連付けられるので、このフィールドはPUBLISHパケットでは必須であり、ワイルドカード文字を含めることはできません。このフィールドを設定するとき、ワイルドカード文字と長さゼロの文字列の2つのガード条件があり、これらの条件のいずれかがtrueであれば即座に戻り、エラーをログに記録します。
void CPktPublish::SetTopicName(const string topic_name) { if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0) { ArrayFree(m_topname); return; } EncodeUTF8String(topic_name, m_topname); }
どのガード条件も満たされない場合、文字列をUTF-8としてエンコードし、そのchar配列をm_topname protectedメンバーに格納し、さらにBuild()が呼ばれたときに最終パケットに含まれるようにします。
Packet Identifier
Packet Identifierはユーザーによって設定されず、QoS 0には必要ありません。代わりに、必要なQoSが0を超える場合は、Build()メソッドで自動的に設定されます。
// QoS > 0 requires packet ID if((m_pubflags & 0x06) != 0) { SetPacketID(pkt, pkt.Size()); }最終的なパケットを構築する際、m_pubflagsメンバーを0110 (0x06)のバイナリ値とのビットANDで確認します。結果が0でなければ、そのパケットはQoS_1またはQoS_2であると判断し、Packet Identifierを設定します。
SetPacketID関数は、TimeLocal()を使用して擬似乱数整数を生成し、初期状態を生成します。テストを楽にするために、ブール変数TESTを定義しました。この変数がtrueのとき、この関数はパケットIDとして1の値を設定します。
//+------------------------------------------------------------------+ //| SetPacketID | //+------------------------------------------------------------------+ #define TEST true void SetPacketID(uchar& buf[], int start_idx) { // MathRand - Before the first call of the function, it's necessary to call // MathSrand to set the generator of pseudorandom numbers to the initial state. MathSrand((int)TimeLocal()); int packet_id = MathRand(); if(ArrayResize(buf, buf.Size() + 2) < 0) { printf("ERROR: failed to resize array at %s", __FUNCTION__); return; } buf[start_idx] = (uchar)packet_id >> 8; // MSB buf[start_idx + 1] = (uchar)(packet_id % 256) & 0xff; //LSB //--- if testing, set packet ID to 1 if(TEST) { Print("WARN: SetPacketID TEST true fixed ID = 1"); buf[start_idx] = 0; // MSB buf[start_idx + 1] = 1; //LSB } }
ご覧のように、万が一に備えてWARNも設定しています。
Properties
本連載の第4回では、プロパティとは何か、そしてMQTT 5.0拡張メカニズムの一部としての役割について詳しく見てきました。ここでは、データ型エンコーディングの違いに特に注意しながら、どのように実装しているかを説明します。
MQTT 5.0コントロールパケットには、プロパティ値のエンコードに使用される6種類のデータ表現があります。
- 1バイト整数(8ビットの符号なし整数)
- ビッグエンディアン(ネットワーク順とも呼ばれる)(16ビットの符号なし整数)
- 4バイト整数(32ビットの符号なし整数で、ビッグエンディアン順)
- 可変バイト整数(0~268,435,455の値を表すために、最大で4バイトまでの最小バイトを使用)
- バイナリデータ(長さ0~65,535)
- UTF-8でエンコードされた文字列(ユーザープロパティでキーと値のペアをエンコードするためにも使用)
以下の表に、利用可能なPUBLISHプロパティと、それぞれのデータ表現を示します。
プロパティ | データ表現 |
---|---|
Payload Format Indicator | 1バイト整数 |
Message Expiry Interval | 4バイト整数 |
Topic Alias | 2バイト整数 |
Response Topic | UTF-8エンコード文字列 |
Correlation Data | バイナリデータ |
User Property | UTF-8エンコードされた文字列ペア |
Subscription Identifier | 変数バイト整数 |
Content Type | UTF-8エンコード文字列 |
表01:MQTT 5.0のPUBLISHプロパティとそれぞれのデータ表現
プロパティ識別子はDefines.mqhヘッダーに含まれています。
//+------------------------------------------------------------------+ //| PROPERTIES | //+------------------------------------------------------------------+ /* The last field in the Variable Header of the CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, and AUTH packet is a set of Properties. In the CONNECT packet there is also an optional set of Properties in the Will Properties field with the Payload */ #define MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR 0x01 // (1) Byte #define MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL 0x02 // (2) Four Byte Integer #define MQTT_PROP_IDENTIFIER_CONTENT_TYPE 0x03 // (3) UTF-8 Encoded String #define MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC 0x08 // (8) UTF-8 Encoded String #define MQTT_PROP_IDENTIFIER_CORRELATION_DATA 0x09 // (9) Binary Data #define MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER 0x0B // (11) Variable Byte Integer #define MQTT_PROP_IDENTIFIER_SESSION_EXPIRY_INTERVAL 0x11 // (17) Four Byte Integer . . .
Payload Format Indicator
Payload Format Indicatorは0または1の値で、それぞれ生バイトまたはUTF-8エンコードされた文字列を意味します。存在しない場合は0(生バイト)とみなされます。
このフィールドは、m_propsメンバー配列に直接設定することもできますが、最終的なプロパティ配列にコピーされる前に何らかの操作を必要とするプロパティの大部分と整合性を取るために、補助的なローカルバッファを仲介として使用することを選択しました。
void CPktPublish::SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format) { uchar aux[2]; aux[0] = MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR; aux[1] = (uchar)format; ArrayCopy(m_props, aux, m_props.Size()); }
このプロパティには2つの値しかありませんが、読みやすさのために記号値を割り当てることにし,ました。
enum PAYLOAD_FORMAT_INDICATOR { RAW_BYTES = 0x00, UTF8 = 0x01 };
記号値を使用することで、ライブラリのエンドユーザーにとってメソッド呼び出しが明示的になります。
cut.SetPayloadFormatIndicator(RAW_BYTES);
cut.SetPayloadFormatIndicator(UTF8);
Message Expiry Interval
Message Expiry Intervalは4バイトの整数で表されます。この表現は、可変バイト整数の表現とは異なることを覚えておく価値があります。後者が値を表現するのに必要な最小限のバイト数を使用するのに対し、前者は常に4バイト全体を使用して表現されます。
void CPktPublish::SetMessageExpiryInterval(uint msg_expiry_interval) { uchar aux[4]; aux[0] = MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL; ArrayCopy(m_props, aux, m_props.Size(), 0, 1); EncodeFourByteInteger(msg_expiry_interval, aux); ArrayCopy(m_props, aux, m_props.Size()); }
4バイト整数を符号化する関数は、ビッグエンディアン順(またはネットワーク順)を確保するために、2の冪乗右シフトというよく知られたパターンに従っています。
void EncodeFourByteInteger(uint val, uchar &dest_buf[]) { ArrayResize(dest_buf, 4); dest_buf[0] = (uchar)(val >> 24) & 0xff; dest_buf[1] = (uchar)(val >> 16) & 0xff; dest_buf[2] = (uchar)(val >> 8) & 0xff; dest_buf[3] = (uchar)val & 0xff; }
Topic Alias
Topic Aliasプロパティは、パケットサイズを小さくするために使用できます。これは各ネットワーク接続に限定され、MQTTセッション状態の一部です。つまり、Topic Aliasを設定する関数は、今のところはスタブと見なすことができます。Session Stateを扱うときには、必ず完成させなければなりません。
void CPktPublish::SetTopicAlias(ushort topic_alias) { uchar aux[2]; aux[0] = MQTT_PROP_IDENTIFIER_TOPIC_ALIAS; ArrayCopy(m_props, aux, m_props.Size(), 0, 1); EncodeTwoByteInteger(topic_alias, aux); ArrayCopy(m_props, aux, m_props.Size()); }
2バイト整数をエンコードする私たちの関数は、4バイト整数をエンコードするのに使用したのと同じ、よく知られたパターンに従っています。必要なビッグエンディアン順を確保するために、2の冪乗右シフトをおこないます。
void EncodeTwoByteInteger(uint val, uchar &dest_buf[]) { ArrayResize(dest_buf, 2); dest_buf[0] = (uchar)(val >> 8) & 0xff; dest_buf[1] = (uchar)val & 0xff; }
Response Topic
Response Topicプロパティは、パブリッシュ/サブスクライブパターンの一部ではありません。その代わりに、MQTTを介したリクエスト/レスポンス相互作用の一部となります。ご覧のように、この関数は2つの補助バッファを使用しています。1つはプロパティ識別子をホストするバッファで、もう1つはエンコードされたUTF-8文字列をホストするバッファです。文字列エンコーダー関数には、宛先バッファの開始インデックスを指定する第3のパラメータがないため、他のUTF-8エンコード文字列でも同じことが起こります。これは、次のバージョンではオーバーロードで解決されるかもしれません。
void CPktPublish::SetResponseTopic(const string response_topic) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeUTF8String(response_topic, buf); ArrayCopy(m_props, buf, m_props.Size()); }
Correlation Data
Correlation Dataプロパティもまた、MQTTを介したリクエスト/レスポンス相互作用の一部であり、パブリッシュ/サブスクライブパターンの一部ではありません。その値はバイナリデータなので、この関数は、プロパティ識別子を設定した後、引数として渡されたデータをm_propsバイト配列にコピーしているだけです。
void CPktPublish::SetCorrelationData(uchar &binary_data[]) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_CORRELATION_DATA; ArrayCopy(m_props, aux, m_props.Size()); ArrayCopy(m_props, binary_data, m_props.Size()); }
User Property
User Propertyは、アプリケーション固有のセマンティクスを持つUTF-8エンコードされたkey:valueペアを送信するために使用できるため、最も柔軟なMQTT 5.0プロパティです。
「Non-normative comment
このプロパティは、意味と解釈が送受信を担当するアプリケーションプログラムによってのみ知られているアプリケーション層の名前値タグを転送する手段を提供することを意図しています。」
私たちの関数は、このプロパティをエンコードするために3つの補助バッファを使用しています。なぜなら、現在のところ、UTF-8文字列エンコーダーには、宛先バッファの開始インデックスを指定する3番目のパラメータがないからです。これは次のバージョンではオーバーロードで解決されるかもしれません。(上記のResponse Topicを参照)
void CPktPublish::SetUserProperty(const string key, const string val) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_USER_PROPERTY; ArrayCopy(m_props, aux, m_props.Size()); uchar key_buf[]; EncodeUTF8String(key, key_buf); ArrayCopy(m_props, key_buf, m_props.Size()); uchar val_buf[]; EncodeUTF8String(val, val_buf); ArrayCopy(m_props, val_buf, m_props.Size()); }
Subscription Identifier
Subscription Identifierプロパティを設定する関数は、渡された引数が1から268,435,455の間であるかどうかを確認します。もしそうでなければ、エラーメッセージを表示し、すぐに戻ります。
void CPktPublish::SetSubscriptionIdentifier(uint subscript_id) { if(subscript_id < 1 || subscript_id > 0xfffffff) { printf("Error: " + __FUNCTION__ + "Subscription Identifier must be between 1 and 268,435,455"); return; } uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeVariableByteInteger(subscript_id, buf); ArrayCopy(m_props, buf, m_props.Size()); }
Content Type
Content Typeプロパティの値はアプリケーションによって定義されます。「MQTTは、文字列が有効なUTF-8エンコード文字列であることを確認する以外、文字列の検証をおこないません。」
void CPktPublish::SetContentType(const string content_type) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_CONTENT_TYPE; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeUTF8String(content_type, buf); ArrayCopy(m_props, buf, m_props.Size()); };
ペイロード
PUBLISH可変ヘッダーの最後のフィールドは、正しく言えばペイロードです。長さゼロのペイロードは有効です。この関数は、UTF-8文字列エンコーダーのラッパーに過ぎず、補助バッファーを使用してさらにm_payloadメンバーにコピーするという同じパターンを踏襲しています。
void CPktPublish::SetPayload(const string payload) { uchar aux[]; EncodeUTF8String(payload, aux); ArrayCopy(m_payload, aux, m_props.Size()); }
最終的なBuildメソッド
Build()メソッドの目的は、プロパティの長さとパケットの残りの長さの両方を変数バイト整数としてエンコードしながら、固定ヘッダー、Topic Name、Packet Identifier、プロパティ、およびペイロードを最終パケットに結合することです。
まず、必須のTopic Nameがあるかどうかを確認します。もしその長さがゼロであれば、エラーを表示/記録し、すぐに戻ります。
void CPktPublish::Build(uchar &pkt[]) { if(m_topname.Size() == 0) { printf("Error: " + __FUNCTION__ + " topic name is mandatory"); return; } ArrayResize(pkt, 2);
そして、固定ヘッダーの最初のバイトに、コントロールパケットのタイプとそれぞれのPUBLISHフラグを設定します。
// pkt type with publish flags pkt[0] = (uchar)PUBLISH << 4; pkt[0] |= m_pubflags;
次に、m_topname配列を最終パケットにコピーし、QoS > 0であればPacket Identifierをセット/コピーします。
// topic name ArrayCopy(pkt, m_topname, pkt.Size()); // QoS > 0 require packet ID if((m_pubflags & 0x06) != 0) { SetPacketID(pkt, pkt.Size()); }
次に、Property(ies) Lengthを可変バイト整数としてエンコードします。
// properties length uchar buf[]; EncodeVariableByteInteger(m_props.Size(), buf); ArrayCopy(pkt, buf, pkt.Size());
プロパティとペイロードを、それぞれのクラスメンバーから最終的なパケット配列にコピーします。
// properties ArrayCopy(pkt, m_props, pkt.Size()); // payload ArrayCopy(pkt, m_payload, pkt.Size());
最後に、可変バイト整数としてエンコードされたパケットの残り長を設定します。
// remaining length m_remlen += pkt.Size() - 2; uchar aux[]; EncodeVariableByteInteger(m_remlen, aux); ArrayCopy(pkt, aux, 1); }
PUBACKコントロールパケット
CPublishクラスを実装するときに見たように、QoS 1のPUBLISHパケットには、0 以外のPacket Identifierが必要です。このパケットIDは、対応するPUBACKパケットで返されます。このIDによって、クライアントは、以前に送信されたPUBLISHパケットが配送されたのか、それともエラーがあったのかを知ることができます。成功した配信であれ、失敗した配信であれ、PUBACKはセッション状態を更新するために使用されるトリガーです。Reason Codeに基づいてSession Stateを更新します。
PUBACKパケットは9つのReason Codeのうちの1つを戻します。
SUCCESS:メッセージはすべて正常です。受理され、現在パブリッシュが進行中です。ここでの「成功」は、受信者がメッセージの所有権を受け入れたことを意味します。これは暗黙的な、つまり省略可能な唯一のReason Codeです。パケットIDだけのPUBACKは、QoS 1の配送が成功したと解釈されなければなりません[MUST]。
「PUBACKパケットを送信するクライアントまたはサーバーは、PUBACK Reason Codeのいずれかを使用しなければならない[MUST] [MQTT-3.4.2-1]。Reason Codeが0x00(成功)であり、Propertiesが存在しない場合、Reason CodeとProperty Lengthは省略可能です。」
NO MATCHING SUBSCRIBERS:メッセージはすべて正常です。受理され、現在もパブリッシュが進行中ですが、そのTopic Nameには誰も登録してません。このReason Codeは証券会社によってのみ送信され、オプションです。つまり、証券会社はSUCCESSの代わりにこのReason Codeを送ってもよいです[MAY]。
UNSPECIFIED ERROR:メッセージが拒否されましたが、パブリッシャはその理由を明かしたくないか、 他のどのReason Codeもその理由を説明するのに適していません。
IMPLEMENTATION SPECIFIC ERROR:メッセージはすべて正常ですが、パブリッシャがそれをパブリッシュしたくないようです。規格は、このReason Codeのセマンティクスに関する追加的な詳細を提供していませんが、パブリッシュしない理由はプロトコルの範囲外、つまりアプリケーション固有であると推測できます。
NOT AUTHORIZED:説明不要。
TOPIC NAME INVALID:Topic Nameを含め、メッセージはすべて正常です。これは、適切に形式化され、適切にエンコードされた UTF-8文字列です。しかし、クライアントであれ証券会社であれ、パブリッシャはこのTopic Nameを受け入れません。繰り返しになりますが、パブリッシュしない理由はアプリケーション固有のものだと推測できます。
PACKET IDENTIFIER IN USE:メッセージはすべて正常ですが、PUBLISHで送信したパケットIDがすでに使用されているため、クライアントと証券会社間のセッション状態に不一致がある可能性があります。
QUOTA EXCEEDED:説明不要。繰り返しになりますが、拒否の理由はプロトコルの範囲外です。これはアプリケーション固有のものです。
PAYLOAD FORMAT INVALID: メッセージはすべて正常ですが、PUBLISHで送信したPayload Format Indicatorプロパティが実際のペイロードフォーマットと異なっています。
PUBACKパケットは、Reason Codeの他に、Reason StringとUser Propertyを持つことができます。
Reason Stringは、診断に役立つことを目的とした、人間が読めるUTF-8エンコード文字列です。受信者が解析することは意図していません。その代わりに、ログに記録したり、出力したり、レポートに添付したりすることができる追加情報を運ぶことが目的です。Reason Stringが含まれることで、接続時(CONNECTパケット)に指定されたMaximum Packet Sizeを超えてパケットサイズが大きくなる場合、準拠するサーバーまたはクライアントはReason Stringを送信しないことは注目に値します。
PUBACKはまた、User Property(複数可)としてエンコードされた任意の数のkey:valueペアを持つことができます。これらのペアは、エラーに関する追加情報を提供するために使用することができ、アプリケーション固有のものでもあります。つまり、プロトコルはそのセマンティクスを定義していません。
クライアントは「受信者から対応するPUBACKパケットを受け取るまで、PUBLISHパ ケットをunacknowledgedとして扱わなければなりません[MUST]」。
CPubackクラス
CPubackクラスはCPublishクラスと同じ設計図に従っています。また、オブジェクト階層のスタブルートとなるIControlPacketインターフェイスも実装しています。
PUBACKパケットは、QoS 1のPUBLISHパケットに対する応答として送信されます。その2バイトの固定ヘッダーは、1バイト目にコントロールパケット識別子のみ、2バイト目にパケットの残りの長さを持ちます。このプロトコルのバージョンでは、そのビットフラグはすべてRESERVEDに設定されています。
図03:MQTT-5.0-PUBACKパケットの固定ヘッダーの構造
「PUBACKパケットの可変ヘッダーは、PUBLISHパケットのPacket Identifier、PUBACK Reason Code、Property Length、Propertiesの各フィールドを順番に含みます。」
図04:MQTT-5.0-PUBACKパケットの可変ヘッダーの構造
これまではクライアントを送信者としてのみ扱ってきましましたが、これからは受信者の役割も考慮する必要があります。理由は次です。
「配信プロトコルは対称的で、[...]クライアントとサーバーはそれぞれ送信者または受信者の役割を担うことができます。」
承認されるパケットの識別子を取得する関数のテストを書く必要があります。
- PUBACKの受信時に証券会社が送信したリターンパケットから
- または、PUBACK送信時に永続化システムから
QoS 1のPUBLISHパケットは、対応するPUBACKなしには意味を持たず、PUBACKは、対応するPUBLISHパケットのパケットIDを保存するために、ある種の永続性を必要とします。ただし、いつかは本物のデータベースを永続化層として使用する必要があることはすでに分かっていますが、現時点ではまだ必要ありません。この関数をテストし開発するために必要なのは、データベースのような働きをするもので、問い合わせたときに、確認待ちのPUBLISHパケットの識別子を返してくれるものです。驚きのないように、GetPendingPublishIDs(ushort &result[])という関数を1つ作り、DB.mqhというファイルに保存しておきます。
void GetPendingPublishIDs(ushort &result[]) { ArrayResize(result, 3); result[0] = 1; result[1] = 255; // one byte result[2] = 65535; // two bytes }
「永続化層」を配置すると、目の前のタスクに集中できます。つまり、証券会社から送信されたPUBACKバイト配列(パケット)を渡されたときに、確認されたPUBLISHの識別子を取得し、永続化層に格納されている保留中のPUBLISH IDと照合する関数を書くことです。IDが一致したら「True」を戻します。後日、プロトコル「Operational Behavior」を実装する際に、このマッチングIDをリアルストアから解放します。
上記のPUBACK可変ヘッダー構造を考えると、今必要なのは、最初の2バイトを読んで、確認されるパケットのIDを得ることだけです。
ushort CPuback::GetPacketID(uchar &pkt[]) { return (pkt[0] * 256) + pkt[1]; }
パケット識別子は、ビッグエンディアン(またはネットワーク)順の2バイト整数としてエンコードされ、最上位バイト(MSB)が最初に表示されることを覚えておきましょう。これを符号化するために、左シフトのビット演算(<<)を使用しました。デコードするには、最上位バイトに256を掛け、最下位バイトを足します。
今のところは上記の関数で十分です。後にオープンネットワークで実際の証券会社とテストする際に、エンディアンの問題に対処しなければならないかもしれませんが、この時点ではテストしません。魅力的なニンジン、目の前の課題に向かって進み続けましょう。
bool CPuback::IsPendingPkt(uchar &pkt[]) { ushort pending_ids[]; GetPendingPublishIDs(pending_ids); ushort packet_id = GetPacketID(pkt); for(uint i = 0; i < pending_ids.Size(); i++) { if(pending_ids[i] == packet_id) { return true; } } return false; }
上記の関数は、バイト配列を引数として受け取ります。このバイト配列はPUBACKパケットの可変ヘッダーです。そして、ローカル変数pending_idsに、まだ承認されていないストア/データベースのパケット識別子の配列を格納します。最後に、証券会社から送られたバイト配列のパケットIDを読み取り、保留中のIDの配列と比較します。もしパケットが配列の中にあれば、この関数はTrueを返し、IDを解放することができます。
同じロジックで、QoS 2のPUBLISH用のPUBREC、PUBREL、PUBCOMPパケット識別子を解放することができます。また、後で、偽の、ファイルに1つの関数を含む「永続化層」を本物のデータベースに置き換えますが、関数のメインロジックは残ります。この時点で、別の開発者が永続化層の開発に取り組んでいる間に、私たちは完全に独立した方法でパケットクラスを開発することができます。
また、PUBACK可変ヘッダーからReason Code(複数可)を読み取れるようにする必要もあります。このフィールドは位置とサイズが決まっているので、必要なのは特定のバイトを読み取ることだけです。
uchar CPuback::GetReasonCode(uchar &pkt[]) { return pkt[2]; }
クライアントの受信側だけで作業しているため、つまりPUBACKをまだ送信していないため、次の機能テストは上記の関数で十分です。次は、本物の証券会社を扱います。
結論
継続的なリファクタリングはTDDの実践の一部です。完全に機能するだけでなく、単一責任の単位と関数(ここではクラスとメソッド)、可読識別子(クラス、メソッド、変数名)、冗長性の回避(「同じことを繰り返さない」)というクリーンなコードを実現することも目指しています。それはプロセスであり、ワンステップの作業ではありません。そのため、MQTT 5.0クライアントを完全に機能させるまで、リファクタリングを継続的におこなっていくことは確実です。
CONNECT, CONNACK, PUBLISH, PUBACKパケットが期待通りに動作しているかどうかを確認するために、実際のMQTT 証券会社に対して最初の機能テストを書き始める準備ができました。
PUBACKパケットは、QoS 1のPUBLISHパケットと対になるものです。QoS 2のPUBLISHパケットは、PUBREC、PUBCOMP、PUBRELパケットを必要とします。次回のテーマは、この2つです。
MQL5に精通していてこのオープンソースのMQTTクライアントの開発に貢献できる方は、以下のコメント欄かコミュニティチャットまでご連絡ください。
MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/14391





- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索