English Deutsch
preview
MetaTrader 5用のMQTTクライアントの開発:TDDアプローチ(第6回)

MetaTrader 5用のMQTTクライアントの開発:TDDアプローチ(第6回)

MetaTrader 5統合 | 21 5月 2024, 09:45
77 0
Jocimar Lopes
Jocimar Lopes

 

「楽観主義はプログラミングの職業病であり、フィードバックはその治療です。」(ケント・ベック)

はじめに

テスト駆動開発手法には多くの利点がありますが、大きな欠点が1つあります。その利点のひとつは、明確に定義されたユニットと、適切に命名された変数を書くことで、高いテストカバレッジを達成し、ドメインをより深く理解し、過剰なエンジニアリングを避け、目の前のタスクに集中し続けることができることです。大きな欠点は、目の前のタスクに焦点を絞ったことの直接的な結果です。つまり、プロジェクト全体の複雑さに怯えるのを避けるために、開発者として、一度にできる限り小さな課題を1 つだけ解決し続けます。天才が複雑さを解決することで複雑さを取り除く人だとすれば、TDD開発者は複雑さを意図的に無視する人です。 

そうです。目隠しをした馬や、ニンジンを追いかけるロバのようにです。

しかし、私たちが無視したからといって、複雑さが消えるわけではありません。それはそこに留まり、私たちがそれに直面するのを待っています。葉をよく見るために森を無視することで、技術的負債を残し続けます。冗長な関数、重複したメンバー、役に立たないテスト、不必要なクラス、読めないコード、到達できないコードなどを放置し続けるのです。開発中に蓄積されたこの技術的負債は、生産性に悪影響を及ぼす可能性があります。それが、リファクタリングがTDDの実践に不可欠な要素である理由です。下図は、TDD実践の典型的なステップを示しています。

TDD実践の典型的なステップ:レッド、グリーン、リファクタリング

図01:TDD実践の典型的なステップ:レッド、グリーン、リファクタリング(出典:IBM Developer

以下のセクションでは、以前書いたクラスをどのようにリファクタリングしたかを説明し、いくつかの改善点についてコメントします。これらの改良を経て、私たちがどのようにPUBLISHパケットを構築しているのか、そしてどのようにしてパケット構築クラスの実行可能な設計図にたどり着いたのかを紹介します。新しいパターンに従った最初のクラスはPUBACKクラスです。PUBACKパケットはQoS 1のPUBLISHパケットと対になるものなので、Session Stateの管理を始める必要があります。私たちのクライアントは、状態を保持し更新するために、ある種の永続化層を持つ必要があります。 

永続化層はOASIS標準の範囲外です。これはアプリケーション固有のものです。ローカルファイルシステム内の単純なファイルかもしれないし、クラウド上の完全に分散された高可用性データベースシステムかもしれません。ここでの目的のためには、Windows上でローカルに、あるいはWSL経由で動作するPostgreSQLサーバーのようなデータベースで十分です。ただし、ここではMQLとSQLiteをネイティブに統合しているので、このシングルファイル、ノーサーバーのRDBMSが当然の選択です。SQLiteは軽量かつスケーラブルで信頼性が高く、サーバーのメンテナンスが不要です。オンメモリだけのデータベースも持てるので、テストやデバッグにはかなり便利です。 

しかし、この時点では、永続化層を実装するつもりはありません。なぜなら、Session Stateの管理を扱う前に、パケットの書き込みと読み込みを十分にテストすることにしたからです。永続化層に進む前に、MQTTプロトコルで使用されるさまざまなデータタイプを正しくエンコードおよびデコードしていることを確認する必要があります。この目標を達成するために広範なユニットテストを書き、まもなく実際の証券会社(Eclipse Foundationからのオープンソースmosquitto証券会社)に対してローカルで動作している小さな機能テストを開始する予定です。

PUBLISH/PUBBACKインタラクションをテストするために、偽のデータベース、テストに必要な制御されたデータを生成する関数のコレクション、一種のフィクスチャを使用することにします。CPubackクラスについて説明するときに紹介します。

以下の説明では、OASIS標準で使用されている必要[MUST]と可能[MAY]という用語を使用します。OASIS標準は、IETF RFC 2119で説明されているようにこれらの用語を使用します。

特に断りのない限り、引用はすべてOASIS標準からのものです。


PUBLISHパケットの構築の仕方

CPublishクラスを書き直す過程で、いくつかのクラスメンバーを削除しました。また、固定ヘッダーと可変ヘッダーのビルドをワンステップビルダーに統合しました。これらの変更は、他のコントロールパケットクラスでも同様です。

現在、CPublishクラスには以下のメンバとメソッドがあります。

//+------------------------------------------------------------------+
//|                                                      Publish.mqh |
//|            ********* WORK IN PROGRESS **********                 |
//| **** PART OF ARTICLE https://www.mql5.com/ja/articles/14391 **** |
//+------------------------------------------------------------------+
#include "IControlPacket.mqh"
//+------------------------------------------------------------------+
//|        PUBLISH VARIABLE HEADER                                   |
//+------------------------------------------------------------------+
/*
The Variable Header of the PUBLISH Packet contains the following fields in the order: Topic Name,
Packet Identifier, and Properties.
*/
//+------------------------------------------------------------------+
//| Class CPublish.                                                  |
//| Purpose: Class of MQTT Publish Control Packets.                  |
//|          Implements IControlPacket                               |
//+------------------------------------------------------------------+
class CPublish : public IControlPacket
  {
private:
   bool              IsControlPacket() {return true;}
   bool              HasWildcardChar(const string str);
protected:
   uchar             m_pubflags;
   uint              m_remlen;
   uchar             m_topname[];
   uchar             m_props[];
   uint              m_payload[];
public:
                     CPublish();
                    ~CPublish();
   //--- methods for setting Publish flags
   void              SetRetain(const bool retain);
   void              SetQoS_1(const bool QoS_1);
   void              SetQoS_2(const bool QoS_2);
   void              SetDup(const bool dup);
   //--- method for setting Topic Name
   void              SetTopicName(const string topic_name);
   //--- methods for setting Properties
   void              SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format);
   void              SetMessageExpiryInterval(uint msg_expiry_interval);
   void              SetTopicAlias(ushort topic_alias);
   void              SetResponseTopic(const string response_topic);
   void              SetCorrelationData(uchar &binary_data[]);
   void              SetUserProperty(const string key, const string val);
   void              SetSubscriptionIdentifier(uint subscript_id);
   void              SetContentType(const string content_type);
   //--- method for setting the payload
   void              SetPayload(const string payload);
   //--- method for building the final packet
   void              Build(uchar &result[]);
  };

単純化されただけでなく、パブリッシュフラグ、トピック名、プロパティを設定するプロセスはすべて独立しています。つまり、Build()メソッドが最後に呼び出されるのであれば、それぞれをどのような順番で設定しても構いません。
このテストは、その動作を形式化したもので、RETAINとQoS1という2つのフラグを設定したクラスコンストラクタと、必須のTopic Nameをテストします。
bool TEST_Ctor_Retain_QoS1_TopicName1Char()
  {
   Print(__FUNCTION__);
   CPublish *cut = new CPublish();
   uchar expected[] = {51, 6, 0, 1, 'a', 0, 1, 0}; // QoS > 0 require packet ID
   uchar result[];
   cut.SetTopicName("a");
   cut.SetRetain(true);
   cut.SetQoS_1(true);
   cut.Build(result);
   bool isTrue = AssertEqual(expected, result);
   delete(cut);
   ZeroMemory(result);
   return isTrue;
  }

これで、SetTopicName()、SetRetain()、SetQos1()の各メソッドは、どのような順番で呼び出しても、 結果のパケットは有効なままとなります。前述したように、この動作はすべてのコントロールパケットクラスで再現されており、パブリッシュフラグのすべての組み合わせに対してテストをおこなっています。添付ファイルをご覧ください。

PUBLISHパケットの固定ヘッダー

PUBLISHパケットの固定ヘッダーは、現在のバージョンのプロトコルの他のすべてのMQTT 5.0コントロールパケットとは異なります。将来のために予約されていないフラグが3つあります。RETAIN、QoS、DUPフラグです。本連載の第5回(前回の記事)では、これらのPUBLISHフラグについて詳しく書いています。

MQTT 5.0のPUBLISHパケット固定ヘッダーRETAIN、QoSレベル、DUPフラグ

図02:MQTT 5.0のPUBLISHパケット固定ヘッダーRETAIN、QoSレベル、DUPフラグ

パブリッシュフラグのいずれかを切り替えるために同じパターンを使用していますが、リファクタリングの結果、各パブリッシュフラグでSetFixedHeader()を呼び出さなくなりました。まず、トグルをブール値として定義し、関数の引数として渡します。

void CPktPublish::SetRetain(const bool retain)
  {
   retain ? m_pubflags |= RETAIN_FLAG : m_pubflags &= ~RETAIN_FLAG;
  }

次に、ブール値がtrueかfalseかを確認します。

void CPktPublish::SetQoS_1(const bool QoS_1)
  {
   QoS_1 ? m_pubflags |= QoS_1_FLAG : m_pubflags &= ~QoS_1_FLAG;
  }

ブール値がtrueの場合、フラグ値とucharメンバ(1バイト)の間でビットごとのOR代入をおこない、フラグを設定します。

void CPktPublish::SetQoS_2(const bool QoS_2)
  {
   QoS_2 ? m_pubflags |= QoS_2_FLAG : m_pubflags &= ~QoS_2_FLAG;
  }

ブール値がfalseの場合、フラグ値と同じucharメンバの間でビットごとのAND代入をおこない、フラグを解除します。

void CPktPublish::SetDup(const bool dup)
  {
   dup ? m_pubflags |= DUP_FLAG : m_pubflags &= ~DUP_FLAG;
  }

こうすることで、m_pubflags変数は、パケット設定中に設定/解除されたすべてのフラグを保持します。その後、Build()メソッドが呼ばれると、今度はm_pubflagsとパケットの最初のバイト(バイト0)との間で、再びビットごとのOR代入を実行します。

pkt[0] |= m_pubflags;


PUBLISHパケットの可変ヘッダー

PUBLISHパケットの可変ヘッダーには、Topic Name、Packet Identifier、Propertiesのフィールドが順番に含まれます。

Topic Name

パブリッシャとサブスクライバーの間のすべての関係は、パブリケーションのTopic Nameに関連付けられるので、このフィールドはPUBLISHパケットでは必須であり、ワイルドカード文字を含めることはできません。このフィールドを設定するとき、ワイルドカード文字と長さゼロの文字列の2つのガード条件があり、これらの条件のいずれかがtrueであれば即座に戻り、エラーをログに記録します。

void CPktPublish::SetTopicName(const string topic_name)
  {
   if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0)
     {
      ArrayFree(m_topname);
      return;
     }
   EncodeUTF8String(topic_name, m_topname);
  }

どのガード条件も満たされない場合、文字列をUTF-8としてエンコードし、そのchar配列をm_topname protectedメンバーに格納し、さらにBuild()が呼ばれたときに最終パケットに含まれるようにします。

Packet Identifier

Packet Identifierはユーザーによって設定されず、QoS 0には必要ありません。代わりに、必要なQoSが0を超える場合は、Build()メソッドで自動的に設定されます。 

// QoS > 0 requires packet ID
   if((m_pubflags & 0x06) != 0)
     {
      SetPacketID(pkt, pkt.Size());
     }
最終的なパケットを構築する際、m_pubflagsメンバーを0110 (0x06)のバイナリ値とのビットANDで確認します。結果が0でなければ、そのパケットはQoS_1またはQoS_2であると判断し、Packet Identifierを設定します。
SetPacketID関数は、TimeLocal()を使用して擬似乱数整数を生成し、初期状態を生成します。テストを楽にするために、ブール変数TESTを定義しました。この変数がtrueのとき、この関数はパケットIDとして1の値を設定します。
//+------------------------------------------------------------------+
//|            SetPacketID                                           |
//+------------------------------------------------------------------+
#define TEST true

void SetPacketID(uchar& buf[], int start_idx)
  {
// MathRand - Before the first call of the function, it's necessary to call
// MathSrand to set the generator of pseudorandom numbers to the initial state.
   MathSrand((int)TimeLocal());
   int packet_id = MathRand();
   if(ArrayResize(buf, buf.Size() + 2) < 0)
     {
      printf("ERROR: failed to resize array at %s", __FUNCTION__);
      return;
     }
   buf[start_idx] = (uchar)packet_id >> 8; // MSB
   buf[start_idx + 1] = (uchar)(packet_id % 256) & 0xff; //LSB
//--- if testing, set packet ID to 1
   if(TEST)
     {
      Print("WARN: SetPacketID TEST true fixed ID = 1");
      buf[start_idx] = 0; // MSB
      buf[start_idx + 1] = 1; //LSB
     }
  }

ご覧のように、万が一に備えてWARNも設定しています。

Properties

本連載の第4回では、プロパティとは何か、そしてMQTT 5.0拡張メカニズムの一部としての役割について詳しく見てきました。ここでは、データ型エンコーディングの違いに特に注意しながら、どのように実装しているかを説明します。

MQTT 5.0コントロールパケットには、プロパティ値のエンコードに使用される6種類のデータ表現があります。

  1. 1バイト整数(8ビットの符号なし整数)
  2. ビッグエンディアン(ネットワーク順とも呼ばれる)(16ビットの符号なし整数)
  3. 4バイト整数(32ビットの符号なし整数で、ビッグエンディアン順)
  4. 可変バイト整数(0~268,435,455の値を表すために、最大で4バイトまでの最小バイトを使用)
  5. バイナリデータ(長さ0~65,535)
  6. UTF-8でエンコードされた文字列(ユーザープロパティでキーと値のペアをエンコードするためにも使用)

以下の表に、利用可能なPUBLISHプロパティと、それぞれのデータ表現を示します。

プロパティ データ表現
Payload Format Indicator 1バイト整数
Message Expiry Interval 4バイト整数
Topic Alias 2バイト整数 
Response Topic UTF-8エンコード文字列
Correlation Data  バイナリデータ 
User Property  UTF-8エンコードされた文字列ペア
Subscription Identifier  変数バイト整数
Content Type UTF-8エンコード文字列

表01:MQTT 5.0のPUBLISHプロパティとそれぞれのデータ表現

プロパティ識別子はDefines.mqhヘッダーに含まれています。

//+------------------------------------------------------------------+
//|              PROPERTIES                                          |
//+------------------------------------------------------------------+
/*
The last field in the Variable Header of the CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC,
PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, and
AUTH packet is a set of Properties. In the CONNECT packet there is also an optional set of Properties in
the Will Properties field with the Payload
*/
#define MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR          0x01 // (1) Byte                  
#define MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL           0x02 // (2) Four Byte Integer     
#define MQTT_PROP_IDENTIFIER_CONTENT_TYPE                      0x03 // (3) UTF-8 Encoded String  
#define MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC                    0x08 // (8) UTF-8 Encoded String  
#define MQTT_PROP_IDENTIFIER_CORRELATION_DATA                  0x09 // (9) Binary Data           
#define MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER           0x0B // (11) Variable Byte Integer
#define MQTT_PROP_IDENTIFIER_SESSION_EXPIRY_INTERVAL           0x11 // (17) Four Byte Integer   
.
.
. 

Payload Format Indicator

Payload Format Indicatorは0または1の値で、それぞれ生バイトまたはUTF-8エンコードされた文字列を意味します。存在しない場合は0(生バイト)とみなされます。

このフィールドは、m_propsメンバー配列に直接設定することもできますが、最終的なプロパティ配列にコピーされる前に何らかの操作を必要とするプロパティの大部分と整合性を取るために、補助的なローカルバッファを仲介として使用することを選択しました。

void CPktPublish::SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format)
  {
   uchar aux[2];
   aux[0] = MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR;
   aux[1] = (uchar)format;
   ArrayCopy(m_props, aux, m_props.Size());
  }

このプロパティには2つの値しかありませんが、読みやすさのために記号値を割り当てることにし,ました。

enum PAYLOAD_FORMAT_INDICATOR
  {
   RAW_BYTES   = 0x00,
   UTF8        = 0x01
  };

記号値を使用することで、ライブラリのエンドユーザーにとってメソッド呼び出しが明示的になります。

cut.SetPayloadFormatIndicator(RAW_BYTES);
cut.SetPayloadFormatIndicator(UTF8);

Message Expiry Interval

Message Expiry Intervalは4バイトの整数で表されます。この表現は、可変バイト整数の表現とは異なることを覚えておく価値があります。後者が値を表現するのに必要な最小限のバイト数を使用するのに対し、前者は常に4バイト全体を使用して表現されます。

void CPktPublish::SetMessageExpiryInterval(uint msg_expiry_interval)
  {
   uchar aux[4];
   aux[0] = MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL;
   ArrayCopy(m_props, aux, m_props.Size(), 0, 1);
   EncodeFourByteInteger(msg_expiry_interval, aux);
   ArrayCopy(m_props, aux, m_props.Size());
  }

4バイト整数を符号化する関数は、ビッグエンディアン順(またはネットワーク順)を確保するために、2の冪乗右シフトというよく知られたパターンに従っています。

void EncodeFourByteInteger(uint val, uchar &dest_buf[])
  {
   ArrayResize(dest_buf, 4);
   dest_buf[0] = (uchar)(val >> 24) & 0xff;
   dest_buf[1] = (uchar)(val >> 16) & 0xff;
   dest_buf[2] = (uchar)(val >> 8) & 0xff;
   dest_buf[3] = (uchar)val & 0xff;
  }

Topic Alias

Topic Aliasプロパティは、パケットサイズを小さくするために使用できます。これは各ネットワーク接続に限定され、MQTTセッション状態の一部です。つまり、Topic Aliasを設定する関数は、今のところはスタブと見なすことができます。Session Stateを扱うときには、必ず完成させなければなりません。

void CPktPublish::SetTopicAlias(ushort topic_alias)
  {
   uchar aux[2];
   aux[0] = MQTT_PROP_IDENTIFIER_TOPIC_ALIAS;
   ArrayCopy(m_props, aux, m_props.Size(), 0, 1);
   EncodeTwoByteInteger(topic_alias, aux);
   ArrayCopy(m_props, aux, m_props.Size());
  }

2バイト整数をエンコードする私たちの関数は、4バイト整数をエンコードするのに使用したのと同じ、よく知られたパターンに従っています。必要なビッグエンディアン順を確保するために、2の冪乗右シフトをおこないます。

void EncodeTwoByteInteger(uint val, uchar &dest_buf[])
  {
   ArrayResize(dest_buf, 2);
   dest_buf[0] = (uchar)(val >> 8) & 0xff;
   dest_buf[1] = (uchar)val & 0xff;
  }

Response Topic

Response Topicプロパティは、パブリッシュ/サブスクライブパターンの一部ではありません。その代わりに、MQTTを介したリクエスト/レスポンス相互作用の一部となります。ご覧のように、この関数は2つの補助バッファを使用しています。1つはプロパティ識別子をホストするバッファで、もう1つはエンコードされたUTF-8文字列をホストするバッファです。文字列エンコーダー関数には、宛先バッファの開始インデックスを指定する第3のパラメータがないため、他のUTF-8エンコード文字列でも同じことが起こります。これは、次のバージョンではオーバーロードで解決されるかもしれません。

void CPktPublish::SetResponseTopic(const string response_topic)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeUTF8String(response_topic, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  }

Correlation Data

Correlation Dataプロパティもまた、MQTTを介したリクエスト/レスポンス相互作用の一部であり、パブリッシュ/サブスクライブパターンの一部ではありません。その値はバイナリデータなので、この関数は、プロパティ識別子を設定した後、引数として渡されたデータをm_propsバイト配列にコピーしているだけです。

void CPktPublish::SetCorrelationData(uchar &binary_data[])
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_CORRELATION_DATA;
   ArrayCopy(m_props, aux, m_props.Size());
   ArrayCopy(m_props, binary_data, m_props.Size());
  }

User Property

User Propertyは、アプリケーション固有のセマンティクスを持つUTF-8エンコードされたkey:valueペアを送信するために使用できるため、最も柔軟なMQTT 5.0プロパティです。

「Non-normative comment

このプロパティは、意味と解釈が送受信を担当するアプリケーションプログラムによってのみ知られているアプリケーション層の名前値タグを転送する手段を提供することを意図しています。」

私たちの関数は、このプロパティをエンコードするために3つの補助バッファを使用しています。なぜなら、現在のところ、UTF-8文字列エンコーダーには、宛先バッファの開始インデックスを指定する3番目のパラメータがないからです。これは次のバージョンではオーバーロードで解決されるかもしれません。(上記のResponse Topicを参照)

void CPktPublish::SetUserProperty(const string key, const string val)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_USER_PROPERTY;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar key_buf[];
   EncodeUTF8String(key, key_buf);
   ArrayCopy(m_props, key_buf, m_props.Size());
   uchar val_buf[];
   EncodeUTF8String(val, val_buf);
   ArrayCopy(m_props, val_buf, m_props.Size());
  }

Subscription Identifier

Subscription Identifierプロパティを設定する関数は、渡された引数が1から268,435,455の間であるかどうかを確認します。もしそうでなければ、エラーメッセージを表示し、すぐに戻ります。

void CPktPublish::SetSubscriptionIdentifier(uint subscript_id)
  {
   if(subscript_id < 1 || subscript_id > 0xfffffff)
     {
      printf("Error: " + __FUNCTION__ +  "Subscription Identifier must be between 1 and 268,435,455");
      return;
     }
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeVariableByteInteger(subscript_id, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  }

Content Type

Content Typeプロパティの値はアプリケーションによって定義されます。「MQTTは、文字列が有効なUTF-8エンコード文字列であることを確認する以外、文字列の検証をおこないません。」

void CPktPublish::SetContentType(const string content_type)
  {
   uchar aux[1];
   aux[0] = MQTT_PROP_IDENTIFIER_CONTENT_TYPE;
   ArrayCopy(m_props, aux, m_props.Size());
   uchar buf[];
   EncodeUTF8String(content_type, buf);
   ArrayCopy(m_props, buf, m_props.Size());
  };

ペイロード

PUBLISH可変ヘッダーの最後のフィールドは、正しく言えばペイロードです。長さゼロのペイロードは有効です。この関数は、UTF-8文字列エンコーダーのラッパーに過ぎず、補助バッファーを使用してさらにm_payloadメンバーにコピーするという同じパターンを踏襲しています。

void CPktPublish::SetPayload(const string payload)
  {
   uchar aux[];
   EncodeUTF8String(payload, aux);
   ArrayCopy(m_payload, aux, m_props.Size());
  }

最終的なBuildメソッド

Build()メソッドの目的は、プロパティの長さとパケットの残りの長さの両方を変数バイト整数としてエンコードしながら、固定ヘッダー、Topic Name、Packet Identifier、プロパティ、およびペイロードを最終パケットに結合することです。

まず、必須のTopic Nameがあるかどうかを確認します。もしその長さがゼロであれば、エラーを表示/記録し、すぐに戻ります。

void CPktPublish::Build(uchar &pkt[])
  {
   if(m_topname.Size() == 0)
     {
      printf("Error: " + __FUNCTION__ + " topic name is mandatory");
      return;
     }
   ArrayResize(pkt, 2);


そして、固定ヘッダーの最初のバイトに、コントロールパケットのタイプとそれぞれのPUBLISHフラグを設定します。

// pkt type with publish flags
   pkt[0] = (uchar)PUBLISH << 4;
   pkt[0] |= m_pubflags;

次に、m_topname配列を最終パケットにコピーし、QoS > 0であればPacket Identifierをセット/コピーします。

// topic name
   ArrayCopy(pkt, m_topname, pkt.Size());
// QoS > 0 require packet ID
   if((m_pubflags & 0x06) != 0)
     {
      SetPacketID(pkt, pkt.Size());
     }

次に、Property(ies) Lengthを可変バイト整数としてエンコードします。

// properties length
   uchar buf[];
   EncodeVariableByteInteger(m_props.Size(), buf);
   ArrayCopy(pkt, buf, pkt.Size());

プロパティとペイロードを、それぞれのクラスメンバーから最終的なパケット配列にコピーします。

// properties
   ArrayCopy(pkt, m_props, pkt.Size());
// payload
   ArrayCopy(pkt, m_payload, pkt.Size());

最後に、可変バイト整数としてエンコードされたパケットの残り長を設定します。

// remaining length
   m_remlen += pkt.Size() - 2;
   uchar aux[];
   EncodeVariableByteInteger(m_remlen, aux);
   ArrayCopy(pkt, aux, 1);
  }


PUBACKコントロールパケット

CPublishクラスを実装するときに見たように、QoS 1のPUBLISHパケットには、0 以外のPacket Identifierが必要です。このパケットIDは、対応するPUBACKパケットで返されます。このIDによって、クライアントは、以前に送信されたPUBLISHパケットが配送されたのか、それともエラーがあったのかを知ることができます。成功した配信であれ、失敗した配信であれ、PUBACKはセッション状態を更新するために使用されるトリガーです。Reason Codeに基づいてSession Stateを更新します。

PUBACKパケットは9つのReason Codeのうちの1つを戻します。

SUCCESS:メッセージはすべて正常です。受理され、現在パブリッシュが進行中です。ここでの「成功」は、受信者がメッセージの所有権を受け入れたことを意味します。これは暗黙的な、つまり省略可能な唯一のReason Codeです。パケットIDだけのPUBACKは、QoS 1の配送が成功したと解釈されなければなりません[MUST]。

「PUBACKパケットを送信するクライアントまたはサーバーは、PUBACK Reason Codeのいずれかを使用しなければならない[MUST] [MQTT-3.4.2-1]。Reason Codeが0x00(成功)であり、Propertiesが存在しない場合、Reason CodeとProperty Lengthは省略可能です。」

NO MATCHING SUBSCRIBERS:メッセージはすべて正常です。受理され、現在もパブリッシュが進行中ですが、そのTopic Nameには誰も登録してません。このReason Codeは証券会社によってのみ送信され、オプションです。つまり、証券会社はSUCCESSの代わりにこのReason Codeを送ってもよいです[MAY]。

UNSPECIFIED ERROR:メッセージが拒否されましたが、パブリッシャはその理由を明かしたくないか、 他のどのReason Codeもその理由を説明するのに適していません。

IMPLEMENTATION SPECIFIC ERROR:メッセージはすべて正常ですが、パブリッシャがそれをパブリッシュしたくないようです。規格は、このReason Codeのセマンティクスに関する追加的な詳細を提供していませんが、パブリッシュしない理由はプロトコルの範囲外、つまりアプリケーション固有であると推測できます。

NOT AUTHORIZED:説明不要。

TOPIC NAME INVALID:Topic Nameを含め、メッセージはすべて正常です。これは、適切に形式化され、適切にエンコードされた UTF-8文字列です。しかし、クライアントであれ証券会社であれ、パブリッシャはこのTopic Nameを受け入れません。繰り返しになりますが、パブリッシュしない理由はアプリケーション固有のものだと推測できます。

PACKET IDENTIFIER IN USE:メッセージはすべて正常ですが、PUBLISHで送信したパケットIDがすでに使用されているため、クライアントと証券会社間のセッション状態に不一致がある可能性があります。

QUOTA EXCEEDED:説明不要。繰り返しになりますが、拒否の理由はプロトコルの範囲外です。これはアプリケーション固有のものです。

PAYLOAD FORMAT INVALID: メッセージはすべて正常ですが、PUBLISHで送信したPayload Format Indicatorプロパティが実際のペイロードフォーマットと異なっています。

PUBACKパケットは、Reason Codeの他に、Reason StringとUser Propertyを持つことができます。

Reason Stringは、診断に役立つことを目的とした、人間が読めるUTF-8エンコード文字列です。受信者が解析することは意図していません。その代わりに、ログに記録したり、出力したり、レポートに添付したりすることができる追加情報を運ぶことが目的です。Reason Stringが含まれることで、接続時(CONNECTパケット)に指定されたMaximum Packet Sizeを超えてパケットサイズが大きくなる場合、準拠するサーバーまたはクライアントはReason Stringを送信しないことは注目に値します。

PUBACKはまた、User Property(複数可)としてエンコードされた任意の数のkey:valueペアを持つことができます。これらのペアは、エラーに関する追加情報を提供するために使用することができ、アプリケーション固有のものでもあります。つまり、プロトコルはそのセマンティクスを定義していません。 

クライアントは「受信者から対応するPUBACKパケットを受け取るまで、PUBLISHパ ケットをunacknowledgedとして扱わなければなりません[MUST]」。


CPubackクラス

CPubackクラスはCPublishクラスと同じ設計図に従っています。また、オブジェクト階層のスタブルートとなるIControlPacketインターフェイスも実装しています。

PUBACKパケットは、QoS 1のPUBLISHパケットに対する応答として送信されます。その2バイトの固定ヘッダーは、1バイト目にコントロールパケット識別子のみ、2バイト目にパケットの残りの長さを持ちます。このプロトコルのバージョンでは、そのビットフラグはすべてRESERVEDに設定されています。

MQTT-5.0 PUBACKパケットの固定ヘッダーの構造

図03:MQTT-5.0-PUBACKパケットの固定ヘッダーの構造

「PUBACKパケットの可変ヘッダーは、PUBLISHパケットのPacket Identifier、PUBACK Reason Code、Property Length、Propertiesの各フィールドを順番に含みます。」

MQTT-5.0 PUBACKパケットの可変ヘッダーの構造

図04:MQTT-5.0-PUBACKパケットの可変ヘッダーの構造

これまではクライアントを送信者としてのみ扱ってきましましたが、これからは受信者の役割も考慮する必要があります。理由は次です。

「配信プロトコルは対称的で、[...]クライアントとサーバーはそれぞれ送信者または受信者の役割を担うことができます。」

承認されるパケットの識別子を取得する関数のテストを書く必要があります。

  1. PUBACKの受信時に証券会社が送信したリターンパケットから
  2. または、PUBACK送信時に永続化システムから

QoS 1のPUBLISHパケットは、対応するPUBACKなしには意味を持たず、PUBACKは、対応するPUBLISHパケットのパケットIDを保存するために、ある種の永続性を必要とします。ただし、いつかは本物のデータベースを永続化層として使用する必要があることはすでに分かっていますが、現時点ではまだ必要ありません。この関数をテストし開発するために必要なのは、データベースのような働きをするもので、問い合わせたときに、確認待ちのPUBLISHパケットの識別子を返してくれるものです。驚きのないように、GetPendingPublishIDs(ushort &result[])という関数を1つ作り、DB.mqhというファイルに保存しておきます。

void GetPendingPublishIDs(ushort &result[])
  {
   ArrayResize(result, 3);
   result[0] = 1;
   result[1] = 255; // one byte
   result[2] = 65535; // two bytes
  }

「永続化層」を配置すると、目の前のタスクに集中できます。つまり、証券会社から送信されたPUBACKバイト配列(パケット)を渡されたときに、確認されたPUBLISHの識別子を取得し、永続化層に格納されている保留中のPUBLISH IDと照合する関数を書くことです。IDが一致したら「True」を戻します。後日、プロトコル「Operational Behavior」を実装する際に、このマッチングIDをリアルストアから解放します。

上記のPUBACK可変ヘッダー構造を考えると、今必要なのは、最初の2バイトを読んで、確認されるパケットのIDを得ることだけです。

ushort CPuback::GetPacketID(uchar &pkt[])
  {
   return (pkt[0] * 256) + pkt[1];
  }

パケット識別子は、ビッグエンディアン(またはネットワーク)順の2バイト整数としてエンコードされ、最上位バイト(MSB)が最初に表示されることを覚えておきましょう。これを符号化するために、左シフトのビット演算(<<)を使用しました。デコードするには、最上位バイトに256を掛け、最下位バイトを足します。

今のところは上記の関数で十分です。後にオープンネットワークで実際の証券会社とテストする際に、エンディアンの問題に対処しなければならないかもしれませんが、この時点ではテストしません。魅力的なニンジン、目の前の課題に向かって進み続けましょう。

bool CPuback::IsPendingPkt(uchar &pkt[])
{
   ushort pending_ids[];
   GetPendingPublishIDs(pending_ids);
   ushort packet_id = GetPacketID(pkt);
   for(uint i = 0; i < pending_ids.Size(); i++)
     {
      if(pending_ids[i] == packet_id)
        {
         return true;
        }
     }
   return false;
}

上記の関数は、バイト配列を引数として受け取ります。このバイト配列はPUBACKパケットの可変ヘッダーです。そして、ローカル変数pending_idsに、まだ承認されていないストア/データベースのパケット識別子の配列を格納します。最後に、証券会社から送られたバイト配列のパケットIDを読み取り、保留中のIDの配列と比較します。もしパケットが配列の中にあれば、この関数はTrueを返し、IDを解放することができます。

同じロジックで、QoS 2のPUBLISH用のPUBREC、PUBREL、PUBCOMPパケット識別子を解放することができます。また、後で、偽の、ファイルに1つの関数を含む「永続化層」を本物のデータベースに置き換えますが、関数のメインロジックは残ります。この時点で、別の開発者が永続化層の開発に取り組んでいる間に、私たちは完全に独立した方法でパケットクラスを開発することができます。

また、PUBACK可変ヘッダーからReason Code(複数可)を読み取れるようにする必要もあります。このフィールドは位置とサイズが決まっているので、必要なのは特定のバイトを読み取ることだけです。

uchar CPuback::GetReasonCode(uchar &pkt[])
  {
   return pkt[2];
  }


クライアントの受信側だけで作業しているため、つまりPUBACKをまだ送信していないため、次の機能テストは上記の関数で十分です。次は、本物の証券会社を扱います。


結論

継続的なリファクタリングはTDDの実践の一部です。完全に機能するだけでなく、単一責任の単位と関数(ここではクラスとメソッド)、可読識別子(クラス、メソッド、変数名)、冗長性の回避(「同じことを繰り返さない」)というクリーンなコードを実現することも目指しています。それはプロセスであり、ワンステップの作業ではありません。そのため、MQTT 5.0クライアントを完全に機能させるまで、リファクタリングを継続的におこなっていくことは確実です。

CONNECT, CONNACK, PUBLISH, PUBACKパケットが期待通りに動作しているかどうかを確認するために、実際のMQTT 証券会社に対して最初の機能テストを書き始める準備ができました。 

PUBACKパケットは、QoS 1のPUBLISHパケットと対になるものです。QoS 2のPUBLISHパケットは、PUBREC、PUBCOMP、PUBRELパケットを必要とします。次回のテーマは、この2つです。

MQL5に精通していてこのオープンソースのMQTTクライアントの開発に貢献できる方は、以下のコメント欄かコミュニティチャットまでご連絡ください。 


MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/14391

添付されたファイル |
MQTT.zip (20.83 KB)
Tests.zip (16.88 KB)
MQL5の高度な変数とデータ型 MQL5の高度な変数とデータ型
変数とデータ型は、MQL5プログラミングだけでなく、どのプログラミング言語でも非常に重要なトピックです。MQL5の変数とデータ型は、単純なものと高度なものに分類できます。単純なものについては前回の記事ですでに述べたので、今回は高度なものを特定し、それについて学ぶことにします。
Pythonを使用した深層学習GRUモデルとEAによるONNX、GRUとLSTMモデルの比較 Pythonを使用した深層学習GRUモデルとEAによるONNX、GRUとLSTMモデルの比較
Pythonを使用してGRU ONNXモデルを作成する深層学習のプロセス全体を説明し、最後に取引用に設計されたエキスパートアドバイザー(EA)の作成と、その後のGRUモデルとLSTNモデルの比較をおこないます。
不一致問題(Disagreement Problem):AIにおける複雑性の説明可能性を深く掘り下げる 不一致問題(Disagreement Problem):AIにおける複雑性の説明可能性を深く掘り下げる
説明可能性という波乱の海を航海しながら、人工知能(AI)の謎の核心に飛び込みましょう。モデルがその内部構造を隠す領域において、私たちの探求は、機械学習の回廊にこだまする「不一致問題」を明らかにします。
プログラミングパラダイムについて(第2部):オブジェクト指向アプローチによるプライスアクションエキスパートアドバイザーの開発 プログラミングパラダイムについて(第2部):オブジェクト指向アプローチによるプライスアクションエキスパートアドバイザーの開発
オブジェクト指向プログラミングのパラダイムとMQL5コードへの応用について学びます。この第2回目の記事では、オブジェクト指向プログラミングの具体的な内容をより深く掘り下げ、実践的な例を通して実体験を提供します。EMA指標とローソク足価格データを使用した、手続き型プライスアクションエキスパートアドバイザー(EA)をオブジェクト指向コードに変換する方法を学びます。