
为 Metatrader 5 开发 MQTT 客户端:TDD 方法 - 第 6 部分
“乐观是编程的职业危害;反馈才是解决办法。”(Kent Beck)
概述
测试驱动的开发方法提供了许多好处,但有一个主要缺点。它的好处包括帮助我们编写定义明确的单元和命名良好的变量,这可以实现高测试覆盖率,更好地理解领域知识,避免过度工程,并集中精力于手头的任务。而主要的缺点是这种对手头任务的狭隘关注的直接后果,也就是说,为了避免被项目的整体复杂性吓到,作为开发人员,我们每次只解决最小的挑战,并且每次只解决一个挑战。如果天才是通过解决复杂性来消除复杂性的人,那么 TDD 开发人员就是故意忽略复杂性的人。
是的,你明白了:就好比我们是戴着眼罩的马,就像那头跟着胡萝卜的驴子。
但复杂性并不会因为我们忽略它而消失,它就在那里,等待我们去面对它。由于我们忽略森林,只顾看树叶,我们就会不断欠下技术债务。您要知道,我们在不断留下多余的函数、重复的成员、无用的测试、不必要的类、不可读和无法访问的代码。而在开发过程中积累的技术债务可能会损害我们的生产力。这就是重构成为 TDD 实践不可或缺的一部分的原因。下图展示了 TDD 实践的典型步骤。
图 01 - TDD 实践的典型步骤:红色、绿色、重构(来源:IBM 开发者)
在以下章节中,我们将描述如何重构我们以前编写的类并对一些改进进行讨论。我们展示了在这些改进之后我们如何构建我们的 PUBLISH 数据包以及如何为我们的数据包构建类得出可行的蓝图。遵循新模式的第一个类是 PUBACK 类。由于 PUBACK 数据包是 QoS 1 的 PUBLISH 数据包的对应部分,因此我们需要开始处理会话状态(Session State)管理。我们的客户端需要某种持久层来保存和更新状态。
持久层超出了 OASIS 标准的范围。它是用于特定应用程序的。它可以是本地文件系统中的一个简单文件,也可以是云中完全分布式的高可用性数据库系统。对于我们的目的来说,类似于在 Windows 上本地运行或通过 WSL 运行的 PostgreSQL 服务器这样的数据库就足够了。但是,由于我们在 MQL 和 SQLite 之间已有本机集成,因此这种单文件、无服务器 RDBMS 是显而易见的选择。SQLite 轻量、可扩展、值得信赖,并且无需服务器维护。我们甚至可以拥有一个仅在内存中的数据库,这对于测试和调试非常方便。
但此时我们不会实现持久层,因为我们选择在处理会话状态管理之前对数据包的写入和读取进行充分测试。在进入持久层之前,我们需要确保正确编码和解码 MQTT 协议使用的是不同数据类型。为了实现这一目标,我们正在编写大量的单元测试,并且很快我们将针对本地运行的真实代理(来自 Eclipse 基金会的开源 mosquitto 代理)进行小型功能测试。
因此,为了测试我们的 PUBLISH/PUBACK 交互,我们将使用一个假数据库、一组函数来生成我们测试所需的受控数据,就像一种装置。我们在下面描述 CPuback 类时会对其进行介绍。
在后面的描述中,我们使用的术语 MUST 和 MAY 是 OASIS 标准使用的,而 OASIS 标准使用的术语 MUST 和 MAY 又是 IETF RFC 2119 中描述的。
此外,除非另有说明,所有引文均来自 OASIS 标准。
我们如何构建 PUBLISH 数据包
在重写 CPublish 类的过程中,我们删除了一些类成员。我们还将固定报头/可变报头构建合并到一步构建器中。这些更改要在其他控制数据包类中重复进行。
目前,我们的 CPublish 类具有以下成员和方法。
//+------------------------------------------------------------------+ //| Publish.mqh | //| ********* WORK IN PROGRESS ********** | //| **** PART OF ARTICLE https://www.mql5.com/en/articles/14391 **** | //+------------------------------------------------------------------+ #include "IControlPacket.mqh" //+------------------------------------------------------------------+ //| PUBLISH VARIABLE HEADER | //+------------------------------------------------------------------+ /* The Variable Header of the PUBLISH Packet contains the following fields in the order: Topic Name, Packet Identifier, and Properties. */ //+------------------------------------------------------------------+ //| Class CPublish. | //| Purpose: Class of MQTT Publish Control Packets. | //| Implements IControlPacket | //+------------------------------------------------------------------+ class CPublish : public IControlPacket { private: bool IsControlPacket() {return true;} bool HasWildcardChar(const string str); protected: uchar m_pubflags; uint m_remlen; uchar m_topname[]; uchar m_props[]; uint m_payload[]; public: CPublish(); ~CPublish(); //--- methods for setting Publish flags void SetRetain(const bool retain); void SetQoS_1(const bool QoS_1); void SetQoS_2(const bool QoS_2); void SetDup(const bool dup); //--- method for setting Topic Name void SetTopicName(const string topic_name); //--- methods for setting Properties void SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format); void SetMessageExpiryInterval(uint msg_expiry_interval); void SetTopicAlias(ushort topic_alias); void SetResponseTopic(const string response_topic); void SetCorrelationData(uchar &binary_data[]); void SetUserProperty(const string key, const string val); void SetSubscriptionIdentifier(uint subscript_id); void SetContentType(const string content_type); //--- method for setting the payload void SetPayload(const string payload); //--- method for building the final packet void Build(uchar &result[]); };
除了简化之外,现在设置发布标志、主题名称和属性的过程都是独立的,这意味着它们每一个都可以按任何顺序设置,只要 Build() 方法是最后调用的。
该测试使行为正式化。它测试设置了两个标志 RETAIN 和 QoS1 以及所需的主题名称的类构造函数。
bool TEST_Ctor_Retain_QoS1_TopicName1Char() { Print(__FUNCTION__); CPublish *cut = new CPublish(); uchar expected[] = {51, 6, 0, 1, 'a', 0, 1, 0}; // QoS > 0 require packet ID uchar result[]; cut.SetTopicName("a"); cut.SetRetain(true); cut.SetQoS_1(true); cut.Build(result); bool isTrue = AssertEqual(expected, result); delete(cut); ZeroMemory(result); return isTrue; }
现在,可以按任何顺序调用方法 SetTopicName()、SetRetain() 和 SetQos1(),并且生成的数据包仍然有效。如上所述,此行为正在所有控制包类中重复进行,并且我们对每个发布标志的组合都进行了测试。请参阅附件以获取所有测试。
PUBLISH 数据包固定报头
PUBLISH 数据包固定报头与当前版本协议中的所有其他 MQTT 5.0 控制数据包不同。它们有三个标志不保留供将来使用:RETAIN、QoS 和 DUP 标志。在本系列的前一篇文章(第 5 部分)中,您可以看到 有关这些 PUBLISH 标志的详细说明。
图 02 - MQTT 5.0 PUBLISH 数据包固定报头 RETAIN、QoS 级别和 DUP 标志
我们使用相同的模式来切换任何发布标志,但是现在,重构之后,我们不再在每个发布标志中调用 SetFixedHeader()。首先,我们将切换定义为布尔值,并将其作为参数传递给函数。
void CPktPublish::SetRetain(const bool retain) { retain ? m_pubflags |= RETAIN_FLAG : m_pubflags &= ~RETAIN_FLAG; }
然后我们检查布尔值是 true 还是 false。
void CPktPublish::SetQoS_1(const bool QoS_1) { QoS_1 ? m_pubflags |= QoS_1_FLAG : m_pubflags &= ~QoS_1_FLAG; }
如果布尔值为 true,我们将标志值和 uchar 成员(一个字节)之间执行按位或赋值以设置该标志。
void CPktPublish::SetQoS_2(const bool QoS_2) { QoS_2 ? m_pubflags |= QoS_2_FLAG : m_pubflags &= ~QoS_2_FLAG; }
如果布尔值为 false,我们将标志值和相同的 uchar 成员之间执行按位与赋值以取消设置该标志。
void CPktPublish::SetDup(const bool dup) { dup ? m_pubflags |= DUP_FLAG : m_pubflags &= ~DUP_FLAG; }
这样,m_pubflags 变量就会保存配置数据包时设置/未设置的所有标志。稍后,当调用 Build() 方法时,我们再次执行按位或(OR)赋值,这次是在 m_pubflags 和数据包的第一个字节(字节 0)之间。
pkt[0] |= m_pubflags;
PUBLISH 数据包可变报头
PUBLISH 数据包的可变报头按顺序包含以下字段:主题名称、数据包标识符和属性。
主题名称
由于发布者和订阅者之间的所有关系都与发布的主题名称相关,因此该字段在 PUBLISH 数据包中是必需的,并且不能包含通配符。设置此字段时,我们有两个保护条件,针对通配符和长度为零的字符串,如果任何条件为 true,则立即返回并记录错误。
void CPktPublish::SetTopicName(const string topic_name) { if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0) { ArrayFree(m_topname); return; } EncodeUTF8String(topic_name, m_topname); }
如果没有满足任何保护条件,我们将字符串编码为 UTF-8,并将字符数组存储在 m_topname 受保护成员中,以便在调用 Build() 时进一步包含在最终数据包中。
数据包标识符
数据包标识符不是由用户设置的,并且对于 QoS 0 来说不是必需的。相反,如果所需的 QoS > 0,则会在 Build() 方法上自动设置。
// QoS > 0 requires packet ID if((m_pubflags & 0x06) != 0) { SetPacketID(pkt, pkt.Size()); }在构建最终的数据包时,我们通过按位与(AND)二进制值 0110(0x06)来检查 m_pubflags 成员。如果结果不等于零,我们就知道该数据包具有 QoS_1 或 QoS_2,然后我们就设置数据包标识符。
SetPacketID 函数使用 TimeLocal() 生成一个伪随机整数来生成初始状态。为了让测试更简单,我们定义了一个布尔变量 TEST。当此变量为 true 时,函数将值 1 设置为数据包 ID。
//+------------------------------------------------------------------+ //| SetPacketID | //+------------------------------------------------------------------+ #define TEST true void SetPacketID(uchar& buf[], int start_idx) { // MathRand - Before the first call of the function, it's necessary to call // MathSrand to set the generator of pseudorandom numbers to the initial state. MathSrand((int)TimeLocal()); int packet_id = MathRand(); if(ArrayResize(buf, buf.Size() + 2) < 0) { printf("ERROR: failed to resize array at %s", __FUNCTION__); return; } buf[start_idx] = (uchar)packet_id >> 8; // MSB buf[start_idx + 1] = (uchar)(packet_id % 256) & 0xff; //LSB //--- if testing, set packet ID to 1 if(TEST) { Print("WARN: SetPacketID TEST true fixed ID = 1"); buf[start_idx] = 0; // MSB buf[start_idx + 1] = 1; //LSB } }
正如您所看到的,我们还设置了警告,以防万一。
属性
在本系列文章的第 4 部分中,我们详细介绍了属性是什么以及它们作为 MQTT 5.0 扩展机制的一部分所起的作用。在这里我们将描述如何实现它们,特别关注不同的数据类型编码。
MQTT 5.0 控制包中有六种类型的数据表示用于对属性值进行编码:
- 单字节整数,即 8 位无符号整数
- 双字节整数,它们是按 大端序排列的 16 位无符号整数,也称为网络序
- 四字节整数,也是按大端序排列的 32 位无符号整数
- 可变字节整数,使用最多四个字节的最小数量来表示 0 到 268,435,455 之间的值
- 长度在 0 到 65,535 之间的二进制数据
- UTF-8 编码的字符串,也可用于对用户属性中的键值对进行编码
下表显示了可用的 PUBLISH 属性及其各自的数据表示。
属性 | 数据表示 |
---|---|
Payload Format Indicator | 一个字节整数 |
Message Expiry Interval | 四字节整数 |
Topic Alias | 双字节整数 |
Response Topic | UTF-8 编码字符串 |
Correlation Data | 二进制数据 |
User Property | UTF-8 编码字符串对 |
Subscription Identifier | 可变字节整数 |
Content Type | UTF-8 编码字符串 |
表 01 - MQTT 5.0 中的 PUBLISH 属性及其对应的数据表示
我们的属性标识符包含在我们的 Defines.mqh 头文件中。
//+------------------------------------------------------------------+ //| PROPERTIES | //+------------------------------------------------------------------+ /* The last field in the Variable Header of the CONNECT, CONNACK, PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCRIBE, UNSUBACK, DISCONNECT, and AUTH packet is a set of Properties. In the CONNECT packet there is also an optional set of Properties in the Will Properties field with the Payload */ #define MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR 0x01 // (1) Byte #define MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL 0x02 // (2) Four Byte Integer #define MQTT_PROP_IDENTIFIER_CONTENT_TYPE 0x03 // (3) UTF-8 Encoded String #define MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC 0x08 // (8) UTF-8 Encoded String #define MQTT_PROP_IDENTIFIER_CORRELATION_DATA 0x09 // (9) Binary Data #define MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER 0x0B // (11) Variable Byte Integer #define MQTT_PROP_IDENTIFIER_SESSION_EXPIRY_INTERVAL 0x11 // (17) Four Byte Integer . . .
Payload Format Indicator
Payload Format Indicator(有效载荷标识)可以是 0 或 1 值,分别表示原始字节或 UTF-8 编码字符串。如果不存在,则假定为 0(原始字节)。
虽然可以直接在 m_props 成员数组上设置此字段,但我们选择使用辅助本地缓冲区作为中介,以与其它大多数需要进行某种操作才能复制到最终属性数组的属性保持一致。
void CPktPublish::SetPayloadFormatIndicator(PAYLOAD_FORMAT_INDICATOR format) { uchar aux[2]; aux[0] = MQTT_PROP_IDENTIFIER_PAYLOAD_FORMAT_INDICATOR; aux[1] = (uchar)format; ArrayCopy(m_props, aux, m_props.Size()); }
虽然此属性只有两个可能的值,但为了易读性,我们选择为它们分配一个符号值。
enum PAYLOAD_FORMAT_INDICATOR { RAW_BYTES = 0x00, UTF8 = 0x01 };
这个符号值的使用使得方法调用对于库最终用户来说更加明确。
cut.SetPayloadFormatIndicator(RAW_BYTES);
cut.SetPayloadFormatIndicator(UTF8);
Message Expiry Interval
Message Expiry Interval(消息过期间隔)表示为四字节整数。需要记住的是,这种表示形式与可变字节整数的表示形式不同。虽然后者将使用表示值所需的最小字节数,但前者将始终使用整个四个字节表示。
void CPktPublish::SetMessageExpiryInterval(uint msg_expiry_interval) { uchar aux[4]; aux[0] = MQTT_PROP_IDENTIFIER_MESSAGE_EXPIRY_INTERVAL; ArrayCopy(m_props, aux, m_props.Size(), 0, 1); EncodeFourByteInteger(msg_expiry_interval, aux); ArrayCopy(m_props, aux, m_props.Size()); }
我们对四字节整数进行编码的函数遵循一种众所周知的右移两次幂的模式,以确保所需的大端序(或网络序)。
void EncodeFourByteInteger(uint val, uchar &dest_buf[]) { ArrayResize(dest_buf, 4); dest_buf[0] = (uchar)(val >> 24) & 0xff; dest_buf[1] = (uchar)(val >> 16) & 0xff; dest_buf[2] = (uchar)(val >> 8) & 0xff; dest_buf[3] = (uchar)val & 0xff; }
Topic Alias
Topic Alias(主题别名)属性可用于减少数据包大小。它仅限于每个网络连接并且是 MQTT 会话状态的一部分。因此,我们设置主题别名的函数现在可以被视为框架,在处理 Session State(会话状态)的时候必须完成。
void CPktPublish::SetTopicAlias(ushort topic_alias) { uchar aux[2]; aux[0] = MQTT_PROP_IDENTIFIER_TOPIC_ALIAS; ArrayCopy(m_props, aux, m_props.Size(), 0, 1); EncodeTwoByteInteger(topic_alias, aux); ArrayCopy(m_props, aux, m_props.Size()); }
我们对两字节整数进行编码的函数遵循与我们对四字节整数编码时相同的众所周知的模式,即两次右移的幂,以确保所需的大端序。
void EncodeTwoByteInteger(uint val, uchar &dest_buf[]) { ArrayResize(dest_buf, 2); dest_buf[0] = (uchar)(val >> 8) & 0xff; dest_buf[1] = (uchar)val & 0xff; }
Response Topic
Response Topic(响应主题)属性不是发布/订阅模式的一部分。相反,它是 MQTT 上请求/响应交互的一部分。如您所见,我们的函数使用两个辅助缓冲区,一个用于承载属性标识符,另一个缓冲区用于承载编码的 UTF-8 字符串。其他 UTF-8 编码的字符串也会发生同样的情况,因为我们的字符串编码器函数没有第三个参数来寻址目标缓冲区起始索引。这个问题可能会在下一版本中通过重载来解决。
void CPktPublish::SetResponseTopic(const string response_topic) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_RESPONSE_TOPIC; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeUTF8String(response_topic, buf); ArrayCopy(m_props, buf, m_props.Size()); }
Correlation Data
Correlation Data(相关数据)属性也是 MQTT 上请求/响应交互的一部分,而不是发布/订阅模式的一部分。由于其值是二进制数据,我们的函数在设置属性标识符后只是将作为参数传递的数据复制到 m_props 字节数组中。
void CPktPublish::SetCorrelationData(uchar &binary_data[]) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_CORRELATION_DATA; ArrayCopy(m_props, aux, m_props.Size()); ArrayCopy(m_props, binary_data, m_props.Size()); }
User Property
User Property(用户属性)是最灵活的 MQTT 5.0 属性,因为它可用于传输具有特定于应用程序的语义的 UTF-8 编码的键:成对的值。
“非规范性评论
此属性旨在提供一种传输应用层名称值标签的方法,其含义和解释只有负责发送和接收它们的应用程序才知道。”
我们的函数使用三个辅助缓冲区来编码此属性,因为目前我们的 UTF-8 字符串编码器没有第三个参数来寻址目标缓冲区起始索引。这可能会在下一版本中通过重载解决。(请参阅上面的响应主题属性。)
void CPktPublish::SetUserProperty(const string key, const string val) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_USER_PROPERTY; ArrayCopy(m_props, aux, m_props.Size()); uchar key_buf[]; EncodeUTF8String(key, key_buf); ArrayCopy(m_props, key_buf, m_props.Size()); uchar val_buf[]; EncodeUTF8String(val, val_buf); ArrayCopy(m_props, val_buf, m_props.Size()); }
Subscription Identifier
我们设置 Subscription Identifier(订阅标识符)属性的函数开始检查传递的参数是否介于 1 和 268,435,455 之间,这是该属性可接受的值。如果不是,我们会打印/记录错误消息并立即返回。
void CPktPublish::SetSubscriptionIdentifier(uint subscript_id) { if(subscript_id < 1 || subscript_id > 0xfffffff) { printf("Error: " + __FUNCTION__ + "Subscription Identifier must be between 1 and 268,435,455"); return; } uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_SUBSCRIPTION_IDENTIFIER; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeVariableByteInteger(subscript_id, buf); ArrayCopy(m_props, buf, m_props.Size()); }
Content Type
Content Type(内容类型)属性的值由应用程序定义。“MQTT 不对字符串进行任何验证,只是确保它是一个有效的 UTF-8 编码字符串。”
void CPktPublish::SetContentType(const string content_type) { uchar aux[1]; aux[0] = MQTT_PROP_IDENTIFIER_CONTENT_TYPE; ArrayCopy(m_props, aux, m_props.Size()); uchar buf[]; EncodeUTF8String(content_type, buf); ArrayCopy(m_props, buf, m_props.Size()); };
Payload
PUBLISH 变量头中的最后一个字段是正确表示的 Payload(有效载荷)。零长度的有效载荷也是有效的。我们的函数只不过是 UTF-8 字符串编码器的包装器,遵循使用辅助缓冲区进一步复制到 m_payload 成员的相同模式。
void CPktPublish::SetPayload(const string payload) { uchar aux[]; EncodeUTF8String(payload, aux); ArrayCopy(m_payload, aux, m_props.Size()); }
最终的 Build 方法
Build() 方法的目的是将固定报头、主题名称、数据包标识符、属性和有效负载合并到最终的数据包中,同时将属性长度和数据包剩余长度编码为可变字节整数。
我们首先检查强制性主题名称是否存在。如果它的长度为零,我们将打印/记录错误并立即返回。
void CPktPublish::Build(uchar &pkt[]) { if(m_topname.Size() == 0) { printf("Error: " + __FUNCTION__ + " topic name is mandatory"); return; } ArrayResize(pkt, 2);
然后我们使用控制包类型和相应的 PUBLISH 标志设置固定报头的第一个字节。
// pkt type with publish flags pkt[0] = (uchar)PUBLISH << 4; pkt[0] |= m_pubflags;
然后,我们将 m_topname 数组复制到最终数据包,并且如果 QoS > 0,则设置/复制数据包标识符。
// topic name ArrayCopy(pkt, m_topname, pkt.Size()); // QoS > 0 require packet ID if((m_pubflags & 0x06) != 0) { SetPacketID(pkt, pkt.Size()); }
接下来,我们将 Property(ies) Length(属性长度)编码为可变字节整数。
// properties length uchar buf[]; EncodeVariableByteInteger(m_props.Size(), buf); ArrayCopy(pkt, buf, pkt.Size());
我们将属性和有效负载从它们的类成员复制到最终的数据包数组。
// properties ArrayCopy(pkt, m_props, pkt.Size()); // payload ArrayCopy(pkt, m_payload, pkt.Size());
最后,我们将数据包的 Remaining Length(剩余长度)编码为可变字节整数。
// remaining length m_remlen += pkt.Size() - 2; uchar aux[]; EncodeVariableByteInteger(m_remlen, aux); ArrayCopy(pkt, aux, 1); }
PUBACK 控制数据包
正如我们在实现 CPublish 类时看到的,任何具有 QoS 1 的 PUBLISH 数据包都需要非零数据包标识符(Packet Identifier)。该数据包 ID 将在相应的 PUBACK 数据包中返回。正是这个 ID 使我们的客户端知道先前发送的 PUBLISH 数据包是否已送达,或者是否存在错误。无论是成功送达还是失败,PUBACK 都是我们用来更新会话状态的触发器。我们将根据 Reason Code(s)(原因代码)更新 Session State(会话状态) 。
PUBACK 数据包将返回九个原因代码之一。
SUCCESS(成功)- 消息一切正常。它已被接受并且正在发布。这里的“Success”意味着接收者已经接受了该消息的所有权。这是唯一可以隐式的原因代码,也就是说,它是唯一可以省略的原因代码。仅具有数据包 ID 的 PUBACK 必须被解释为成功的 QoS 1 传送。
“发送 PUBACK 数据包的客户端或服务器必须使用其中一个 PUBACK 原因代码 [MQTT-3.4.2-1]。如果原因代码为 0x00(成功)且没有属性,则可以省略原因代码和属性长度。”
NO MATCHING SUBSCRIBERS(没有匹配的订阅者) – 消息一切正常。它已被接受并且正在发布,但没有人订阅其主题名称。此原因代码仅由代理发送并且是可选的,这意味着代理可以发送此原因代码而不是成功。
UNSPECIFIED ERROR(未指定错误) - 消息被拒绝,但发布者不想透露原因,或者没有其他更具体的原因代码适合描述原因。
IMPLEMENTATION SPECIFIC ERROR(实现特定错误) - 消息一切正常,但发布者不想发布它。标准没有提供有关此原因代码语义的更多细节,但我们可以推断不发布的原因不在协议范围内,这意味着它是特定于应用程序的。
NOT AUTHORIZED(未经授权)- 不言自明。
TOPIC NAME INVALID(主题名称无效)- 消息的所有内容都正常,包括主题名称,它是一个格式正确、编码良好的 UTF-8 字符串。但是发布者(无论是客户端还是代理)不接受这个主题名称。再次,我们可以推断不发布的原因是特定于应用程序的。
PACKET IDENTIFIER IN USE(数据包标识符正在使用中) - 消息一切正常,但客户端和代理之间的会话状态可能不匹配,因为我们在 PUBLISH 中发送的数据包 ID 已被使用。
QUOTA EXCEEDED(超出配额) - 不言自明。再次强调,拒绝的原因不属于协议范围。它是用于特定应用程序的。
PAYLOAD FORMAT INVALID(有效负载格式无效)- 消息一切正常,但我们在发布中发送的有效负载格式指示器属性与实际的有效负载格式不同。
除了原因代码之外,PUBACK 数据包可能还包含原因字符串和用户属性。
原因字符串是人类可读的 UTF-8 编码字符串,旨在帮助诊断。它不是为了被接收者解析的。相反,它的目的是携带可以记录、打印、附加到报告等的附加信息。值得注意的是,如果原因字符串的包含导致数据包大小超出连接时(CONNECT 数据包)指定的最大数据包大小,任何兼容的服务器或客户端都不会发送原因字符串。
PUBACK 还可以具有任意数量的键:值对,这些键:值对被编码为用户属性。这些对可用于提供有关错误的附加信息,并且也是特定于应用程序的。也就是说,协议没有定义它们的语义。
我们的客户端“必须将 PUBLISH 数据包视为“unacknowledged(未确认)”,直到它从接收方收到相应的 PUBACK 数据包。”
CPuback 类
我们的 CPuback 类遵循与 CPublish 类相同的蓝图。它还实现了 IControlPacket 接口,该接口作为对象层次结构的框架根。
PUBACK 数据包作为对 QoS 1 的 PUBLISH 数据包的响应发送。它的两字节固定报头仅在第一个字节上有控制包标识符,在第二个字节上有数据包的剩余长度。在此版本的协议中,其位标志全部设置为 RESERVED(保留)。
图 03 -MQTT-5.0-PUBACK 数据包固定报头的结构
PUBACK 数据包的可变报头按顺序包含以下字段:正在确认的 PUBLISH 数据包中的数据包标识符、PUBACK 原因代码、属性长度和属性。”
图 04 -MQTT-5.0-PUBACK 数据包的变量报头结构
到目前为止,我们只是将客户作为 发送者来处理;从现在开始,我们还需要考虑 接收者的角色。这是因为
“传输协议是对称的,[...] 客户端和服务器都可以充当发送方或接收方的角色。”
我们需要编写一个测试,用于获取被确认数据包的标识符
- 从代理收到 PUBACK 时发送的返回数据包中
- 或者从我们的持久化系统发送 PUBACK
如果没有对应的 PUBACK,QoS 为 1 的 PUBLISH 数据包就没有意义,而 PUBACK 又需要某种持久性来存储其对应 PUBLISH 数据包的数据包 ID。但是,尽管我们已经知道在某些时候我们需要使用真正的数据库作为持久层,但目前我们还不需要它。为了测试和开发我们的功能,我们需要的是一个像数据库一样的东西,当查询时,它返回等待确认的 PUBLISH 数据包的标识符。为了避免意外,让我们创建一个名为 GetPendingPublishIDs(ushort &result[]) 的函数,并将其保存在名为 DB.mqh 的文件中。
void GetPendingPublishIDs(ushort &result[]) { ArrayResize(result, 3); result[0] = 1; result[1] = 255; // one byte result[2] = 65535; // two bytes }
有了我们的“持久层”,我们就可以专注于手头的任务:编写一个函数,当传递代理发送的 PUBACK 字节数组(数据包)时,获取正在确认的 PUBLISH 的标识符,并将其与存储在我们持久层上的待处理的 PUBLISH ID 进行检查。如果 ID 匹配则返回“True”。稍后,在实现协议操作行为时,我们会从真实的存储中释放这个匹配的ID。
有了上述 PUBACK 可变报头结构,我们现在需要做的就是读取前两个字节来获取被确认的数据包的 ID。
ushort CPuback::GetPacketID(uchar &pkt[]) { return (pkt[0] * 256) + pkt[1]; }
让我们记住,数据包标识符按大端(或网络)序编码为两字节整数,其中最高有效字节(MSB)首先显示。为了对其进行编码,我们使用了左移位运算(<<)。为了对其进行解码,我们 将最高有效字节的值乘以 256,并添加最低有效字节。
上面的函数目前已经足够了。稍后,当在开放网络中针对真实代理进行测试时,我们可能需要处理字节顺序问题,但我们现在不会对它们进行测试。让我们继续朝着我们有吸引力的目标前进,完成手头的任务。
bool CPuback::IsPendingPkt(uchar &pkt[]) { ushort pending_ids[]; GetPendingPublishIDs(pending_ids); ushort packet_id = GetPacketID(pkt); for(uint i = 0; i < pending_ids.Size(); i++) { if(pending_ids[i] == packet_id) { return true; } } return false; }
上述函数是接收一个字节数组作为参数的,该字节数组是PUBACK数据包的可变报头。然后,它将来自我们的存储/数据库的尚未确认的数据包标识符数组存储在本地变量(pending_ids)中。最后,它读取代理发送的字节数组上的数据包 ID,并将其与待处理 ID 数组进行比较。如果数据包在数组中,我们的函数将返回“True”,然后我们可以释放 ID。
相同的逻辑将允许我们发布用于使用 QoS 2 的 PUBLISH 的 PUBREC、PUBREL 和 PUBCOMP 数据包标识符。此外,稍后我们将用真实的数据库替换我们虚拟的文件中单函数“持久层”,但该函数的主要逻辑将保留。此时,另一个开发人员可能正在处理持久层,而我们以完全独立的方式开发我们的数据包类。
我们还需要能够从 PUBACK 可变报头中读取原因代码。由于该字段具有固定的位置和大小,我们需要做的就是读取特定的字节。
uchar CPuback::GetReasonCode(uchar &pkt[]) { return pkt[2]; }
因为我们仅在客户端的接收端进行工作 - 即尚未发送 PUBACK - 上面的函数足以进行我们的下一个功能测试。现在,处理真正的代理。
结论
持续重构是 TDD 实践的一部分。它的目的不仅在于实现完整的功能,而且还要实现整洁的代码:单一的责任单元和函数(这里是类和方法)、可读的标识符(类、方法和变量名),并避免冗余(“不要重复自己”)。这是一个过程,而不是一次性完成的任务。因此,我们已经确信,我们将不断重构,直到我们拥有功能齐全的 MQTT 5.0 客户端。
现在我们准备开始针对真实的 MQTT 代理编写我们的第一个功能测试,以查看我们的 CONNECT、CONNACK、PUBLISH 和 PUBACK 数据包是否按预期工作。
PUBACK 数据包是 QoS 1 的 PUBLISH 数据包的对应部分。QoS 2 的 PUBLISH 数据包将需要 PUBREC、PUBCOMP 和 PUBREL 数据包作为其对应数据包。它们是我们下一篇文章的主题。
如果您对 MQL5 有深入的了解并且可以为这个开源 MQTT 客户端的开发做出贡献,请在下面的评论中或我们的社区聊天中留言。
本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/14391


