English Deutsch 日本語
preview
Разработка MQTT-клиента для MetaTrader 5: методология TDD (Часть 5)

Разработка MQTT-клиента для MetaTrader 5: методология TDD (Часть 5)

MetaTrader 5Интеграция | 30 апреля 2024, 13:37
559 0
Jocimar Lopes
Jocimar Lopes

"Преждевременная оптимизация — корень всех зол" (Дональд Кнут)

Введение

MQTT — это протокол обмена сообщениями публикации и подписки. Таким образом, мы можем ожидать, что его ядром являются пакеты PUBLISH (публикация) и SUBSCRIBE (подписка). Все остальные типы пакетов являются вспомогательными.

Помимо возможности создавать пакеты PUBLISH, мы также должны иметь возможность их читать, поскольку сообщения, которые наш Клиент будет получать от других Клиентов, также являются пакетами PUBLISH. Это связано с тем, что протокол доставки симметричен.

"Пакет PUBLISH отправляется от клиента на сервер или от сервера к клиенту для транспортировки сообщения приложения (Application Message)".

Пакеты PUBLISH имеют другой фиксированный заголовок с флагами публикации (Publish Flags) и изменяемый заголовок с обязательным названием темы, закодированным как строка UTF-8, и обязательным идентификатором пакета (если QoS > 0). Кроме того, со временем он может использовать почти все свойства (в том числе и пользовательские), представленные в MQTT 5.0, включая свойства, связанные с режимом взаимодействия "Запрос/Ответ" (Request/Response).

В статье мы рассмотрим структуру заголовков, а также тестирование и реализацию флагов публикации, имен тем и идентификаторов пакетов. 

В последующих описаниях мы используем слова НЕОБХОДИМО/ДОЛЖНО (MUST) и МОЖЕТ/МОЖНО (MAY) так, как они используются в Стандарте OASIS, который, в свою очередь, использует их, как описано в документе IETF RFC 2119.

Также, если не указано иное, все цитаты взяты из Стандарта OASIS.


Структура фиксированного заголовка пакета MQTT 5.0 PUBLISH

Фиксированный заголовок пакета PUBLISH имеет ту же двухбайтовую базовую структуру, что и все другие типы пакетов управления. Первый байт предназначен для передачи типа пакета. Второй байт — это хост оставшейся длины (Remaining Length) пакета, закодированный как переменное целое число (Variable Byte Integer).

Но в то время как все другие типы пакетов имеют первые четыре бита первого байта в состоянии ЗАРЕЗЕРВИРОВАНО (RESERVED), пакет PUBLISH использует эти четыре бита для кодирования трех функций: RETAIN, QoS Level и DUP.

Пакет управления MQTT Флаги фиксированного заголовка Bit 3 Bit 2 Bit 1 Bit 0
CONNECT Зарезервировано 0 0 0 0
CONNACK Зарезервировано
0 0 0 0
PUBLISH Используется в MQTT v5.0 DUP QoS 2 QoS 1 RETAIN
PUBACK Зарезервировано
0 0 0 0
PUBREC Зарезервировано
0 0 0 0
PUBREL Зарезервировано
0 0 1 0
PUBCOMP Зарезервировано
0 0 0 0
SUBSCRIBE Зарезервировано
0 0 1 0
SUBACK Зарезервировано 0 0 0 0
UNSUBSCRIBE Зарезервировано
0 0 1 0
UNSUBACK Зарезервировано
0 0 0 0
PINGREQ Зарезервировано
0 0 0 0
PINGRESP Зарезервировано
0 0 0 0
DISCONNECT Зарезервировано
0 0 0 0
AUTH Зарезервировано
0 0 0 0

Таблица 1. Воспроизведение битов флагов таблицы 2-3 из Стандарта MQTT 5.0 Oasis

"Если бит флага помечен как "Зарезервирован", он находится в запасе для будущего использования и ДОЛЖЕН быть установлен в указанное значение".

Из-за этой фиксированной разницы в заголовках между пакетами PUBLISH и всеми остальными управляющими пакетами функцию, которую мы использовали для генерации фиксированных заголовков, здесь использовать нельзя.

//+------------------------------------------------------------------+
//|                     SetFixedHeader                               |
//+------------------------------------------------------------------+
void SetFixedHeader(ENUM_PKT_TYPE pkt_type, uchar& buf[], uchar& dest_buf[])
  {
   dest_buf[0] = (uchar)pkt_type << 4;
   dest_buf[1] = EncodeVariableByteInteger(buf);
  }

Как видите, параметры функции имеют только тип пакета и ссылки на два массива, один из которых является источником, а другой — местом назначения массива с фиксированным заголовком. Затем первая строка берет целочисленное значение типа пакета из Enum и сдвигает его влево на четыре бита, присваивая результат побитовой операции первому байту массива фиксированного заголовка (dest_buf[0]). Эта побитовая операция гарантирует, что первые четыре бита останутся неназначенными, или "зарезервированными", согласно Стандарту.

Вторая строка вызывает функцию, которая вычисляет оставшуюся длину пакета, присваивая значение второму байту массива фиксированного заголовка (dest_buf[1]), закодированному как переменное целое число.

Но эта функция не предоставляет никаких средств для установки флагов публикации.

Рис. 1. Фиксированный заголовок пакета PUBLISH MQTT 5.0 RETAIN, флаг QoS Level и DUP

Рис. 1. Фиксированный заголовок пакета PUBLISH MQTT 5.0 RETAIN, флаг QoS Level и DUP

Таким образом, мы добавили переключатель (Switch) для размещения пакетов PUBLISH и последний параметр для получения флагов публикации. Мы могли бы перегрузить функцию для получения флагов публикации, слегка изменив ее тело для реализации особенностей пакетов PUBLISH. Но это идеальный вариант использования Switch, поскольку у нас есть только одно исключение (PUBLISH), а во всех остальных случаях по умолчанию используется предыдущая реализация.

Последний параметр по умолчанию равен нулю, что означает, что его можно игнорировать при настройке всех фиксированных заголовков пакета. dest_buf изменится, только если установлены какие-либо флаги публикации.

//+------------------------------------------------------------------+
//|                     SetFixedHeader                               |
//+------------------------------------------------------------------+
void SetFixedHeader(ENUM_PKT_TYPE pkt_type,
                    uchar& buf[], uchar& dest_buf[], uchar publish_flags = 0)
  {
   switch(pkt_type)
     {
      case PUBLISH:
         dest_buf[0] = (uchar)pkt_type << 4;
         dest_buf[0] |= publish_flags;
         dest_buf[1] = EncodeVariableByteInteger(buf);
         break;
      default:
         dest_buf[0] = (uchar)pkt_type << 4;
         dest_buf[1] = EncodeVariableByteInteger(buf);
         break;
     }
  }

Как вы можете видеть, буфер назначения, содержащий фиксированный заголовок, модифицируется посредством побитовой операции ИЛИ (OR) в сочетании с присвоением его первому байту. Мы широко использовали этот шаблон для переключения флагов подключения, а теперь используем тот же шаблон для переключения флагов публикации.

Например, флаг RETAIN устанавливается/сбрасывается с помощью следующего кода. 

//+------------------------------------------------------------------+
//|               CPktPublish::SetRetain                             |
//+------------------------------------------------------------------+
void CPktPublish::SetRetain(const bool retain)
  {
   retain ? m_publish_flags |= RETAIN_FLAG : m_publish_flags &= ~RETAIN_FLAG;
   SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);
  }

Флаг уровня QoS_1 (лишенный аналогичной функциональной сигнатуры).

QoS_1 ? m_publish_flags |= QoS_1_FLAG : m_publish_flags &= ~QoS_1_FLAG;   
SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);

Флаг уровня QoS_2.

QoS_2 ? m_publish_flags |= QoS_2_FLAG : m_publish_flags &= ~QoS_2_FLAG;
SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);

Флаг DUP.

dup ? m_publish_flags |= DUP_FLAG : m_publish_flags &= ~DUP_FLAG;
SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);

Значения флагов (маски флагов) представляют собой константы, определенные в Enum как значения степени числа два в соответствии с положением соответствующего бита в переключаемом байте.

//+------------------------------------------------------------------+
//|             PUBLISH - FIXED HEADER - PUBLISH FLAGS               |
//+------------------------------------------------------------------+
enum ENUM_PUBLISH_FLAGS
  {
   RETAIN_FLAG  	= 0x01,
   QoS_1_FLAG           = 0x02,
   QoS_2_FLAG           = 0x04,
   DUP_FLAG             = 0x08
  };

Таким образом, флаги имеют следующие двоичные значения и позиции в байте.

RETAIN

Decimal 1 0 0 0 0 0 0 0 1

QoS 1

Decimal 2 0 0 0 0 0 0 1 0

QoS 2

Decimal 4 0 0 0 0 0 1 0 0

DUP

Decimal 8 0 0 0 0 1 0 0 0

Десятичное значение пакета PUBLISH равно 3.

Decimal 3 0 0 0 0 0 0 1 1

Мы сместили значение типа пакета на четыре бита влево (dest_buf[0] = (uchar)pkt_type << 4).

Decimal 48 0 0 1 1 0 0 0 0

Когда мы применяем побитовую операцию ИЛИ ( dest_buf[0] |= publish_flags; ) к двоичному представлению значения типа пакета и флагов, мы, по сути, объединяем биты. Таким образом, двоичное представление смещенного влево значения пакета PUBLISH с установленным флагом DUP становится следующим.

Decimal 56 0 0 1 1 1 0 0 0

Если установлены флаги RETAIN и QoS 2, биты первого байта фиксированного заголовка будут выглядеть следующим образом.

Decimal 53 0 0 1 1 0 1 0 1

И наоборот, побитовая операция И (AND) между значением типа пакета и его дополнением (~) двоичного представления флагов снимает флаг ( m_publish_flags &= ~RETAIN_FLAG ).

Итак, если бы байт был установлен с QoS 1 без DUP или RETAIN, это выглядело бы так.

Decimal 50 0 0 1 1 0 0 1 0

Дополнением к флагу QoS 1, приведенному выше, является значение всех его перевернутых битов.

Флаг QoS_1 0 0 1 0
Флаг ~QoS_1 1 1 0 1

Поскольку любое значение И ноль равно нулю, мы фактически снимаем флаг.

Теперь обратите внимание, что двоичное значение байта, очевидно, меняется по мере установки флагов. Если все флаги не установлены, он равен десятичному значению 48 после сдвига влево десятичного значения 3 на четыре бита. Когда мы устанавливаем флаг RETAIN, он имеет десятичное значение 49. Значение становится 51 при RETAIN и QoS 1. И так далее.

Эти десятичные значения — это те значения, которые мы ищем при изучении всех возможных комбинаций установки/снятия флагов в наших тестах.

//+------------------------------------------------------------------+
//|              TEST_SetFixedHeader_DUP_QoS2_RETAIN                 |
//+------------------------------------------------------------------+
bool TEST_SetFixedHeader_DUP_QoS2_RETAIN()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] = {61, 0};
   uchar buf[] = {};
//--- Act
   CPktPublish *cut = new CPktPublish(buf);
   cut.SetDup(true);
   cut.SetQoS_2(true);
   cut.SetRetain(true);
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = AssertEqual(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return isTrue;
  }

Эти несколько наивные тесты (а также другие, более сложные), написанные до реализации, позволяют нам сосредоточиться на поставленной задаче, а также работают как "сеть безопасности", когда нам нужно изменить или реорганизовать код. Многие из них вы найдете в прикрепленных файлах. 

Запустив тесты, вы должны увидеть примерно следующее.

Рис. 2. Фиксированный заголовок тестового вывода MQTT 5.0 PUBLISH

Рис. 2. Фиксированный заголовок тестового вывода MQTT 5.0 PUBLISH

Если цикл публикации/подписки является ядром протокола, эти три функции (RETAIN, DUP и QoS) являются ядром рабочего поведения протокола. Возможно, они окажут большое влияние на управление состоянием сессии. Итак, давайте немного выйдем за рамки строгой спецификации протокола и попытаемся понять их семантику.

RETAIN

Как мы видели в первой части этой серии, шаблон публикации/подписки привязан к определенному названию темы: клиент публикует сообщение с именем темы или подписывается на имя темы, и все клиенты получают сообщения, опубликованные под именем темы, на которую они подписались. 

При публикации мы можем использовать флаг RETAIN, установленный на 1 (true), чтобы указать серверу сохранить сообщение и доставить его как "сохраненное сообщение" новым подписчикам. Всегда существует только одно сохраненное сообщение, и мы устанавливаем RETAIN равным 1 для хранения/замены существующих сохраненных сообщений. Мы отправляем полезную нагрузку нулевого байта с этим флагом, установленным в 1, чтобы очистить сохраненные сообщения. Мы сбрасываем его на 0, чтобы указать серверу ничего не делать с сохраненными сообщениями под этим именем темы - не сохранять, не заменять и не очищать.

При подписке на название темы мы получаем сохраненное сообщение. В случае общих подписок (Shared Subscriptions) сохраненное сообщение будет отправлено только одному из клиентов общего фильтра тем (Topic Filter). Мы углубимся в общие подписки при работе с пакетами SUBSCRIBE.

Эта функция работает совместно с флагами Retain Available (сохранение доступно) и Retain Not Supported (сохранение не поддерживается) в пакетах CONNACK, отправляемых с сервера. 

Срок действия сохраненных сообщений истекает, как и любое другое сообщение, в соответствии с интервалом истечения срока действия сообщения, установленным в параметре PUBLISH или в свойствах Will полезной нагрузки CONNECT.

Мы должны принять во внимание, что RETAIN — это функция динамического брокера. Это означает, что она может измениться с "доступно" на "не поддерживается" и наоборот в пределах одного сеанса.

QoS Level

Мы уже говорили об уровне QoS (QoS Level) во вступительной статье этой серии, перечисляя некоторые проектные решения, принятые создателями протокола.

"Несмотря на то, что протокол должен был отличаться надежностью, быстротой и дешевизной из-за технологических ограничений и высоких сетевых затрат, от него требовалось обеспечить качественную доставку данных с непрерывным отображением данных о сеансе (session awareness), что позволяло бы ему справляться с ненадежным или даже прерывистым Интернет-соединением".

В контексте флагов подключения мы рассматривали таблицу с определением каждого уровня QoS.

Значение QoS Bit 2 Bit 1 Описание
0 0 0 В основном в момент доставки
1 0 1 По меньшей мере в момент доставки
2 1 0 Точно в момент доставки
- 1 1 Зарезервировано – нельзя использовать

Таблица 2. Воспроизведение таблицы 3-9 определений QoS из Стандарта MQTT 5.0 Oasis

Описывая использование уровней QoS и других функций, мы использовали термины "сервер" и "брокер" для обозначения службы, которая будет распространять наши сообщения. Но согласно Стандарту,

"Протокол доставки симметричный, в описании ниже как Клиент, так и Сервер могут выступать в роли отправителя или получателя. Протокол доставки касается исключительно доставки сообщения приложения от одного отправителя одному получателю. Когда Сервер доставляет Сообщение Приложения более чем одному Клиенту, каждый Клиент обрабатывается независимо. Уровень QoS, используемый для доставки исходящего сообщения приложения клиенту, может отличаться от уровня входящего сообщения приложения." (выделено мной)

Итак, использование терминов "сервер" и "брокер" в том смысле, в котором мы использовали его до сих пор, оправдано, поскольку мы говорим с точки зрения Клиента в широком смысле, но мы должны помнить об этой симметрии в протоколе доставки.

Уровень QoS по умолчанию равен 0. Это означает, что если мы не установим этот флаг, мы сообщим серверу, что 0 (ноль) — это максимальный уровень QoS, который мы готовы принять. Любой брокер, соответствующий требованиям, принимает этот уровень. Это публикация по принципу "выстрелил и забыл" (fire and forget), при которой отправитель признает, что при доставке могут произойти как потеря, так и дублирование.

Рис. 3. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 0

Рис. 3. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 0

Уровень QoS 1 допускает возможность дублирования при доставке, но не допускает потерь. Сервер подтверждает сообщение PUBACK.

Рис. 4. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 1

Рис. 4. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 1

Уровень QoS 2 не требует потерь или дублирования. На этом уровне участвуют четыре пакета. Сервер распознает начало доставки с помощью PUBREC. Затем клиент запросит выпуск этого конкретного идентификатора пакета с помощью PUBREL, и, наконец, сервер уведомит о завершении доставки с помощью PUBCOMP.

Рис. 5. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 2

Рис. 5. MQTT 5.0 - блок-схема клиент-сервер QoS уровня 2

Аналогия взята из предыдущей статьи, в которой мы говорили о флагах подключения:

"Этот уровень [QoS 2] можно представить как отправку заказной посылки. Почтовая система выдает вам квитанцию, когда вы передаете посылку им в руки, признавая, что на данный момент они несут ответственность за доставку ее по нужному адресу. И когда это происходит, когда они доставляют посылку, они отправляют вам подписанную квитанцию от получателя, подтверждающую доставку посылки".

Качество обслуживания (Quality of Service) может потребоваться для Will-сообщения, для подписки (включая общие подписки) или для конкретного сообщения. 

Will-сообщение Подписка Сообщение
CONNECT Will QoS Варианты подписки SUBSCRIBE Флаг уровня PUBLISH QoS

Таблица 3. Пакеты MQTT 5.0 и флаги, для которых можно установить уровень QoS

Внимательный читатель, возможно, заметил, что QoS 1 и QoS 2 включают в себя то или иное состояние сеанса (Session state). О состоянии Session и соответствующем уровне сохранения (persistence layer) мы поговорим в отдельной статье.

DUP

Если флаг DUP установлен, это означает, что мы вновь пытаемся отправить предыдущий неудачный пакет PUBLISH. Он ДОЛЖЕН быть установлен на 0 (ноль) для всех сообщений QoS 0. Дублирование относится к самому пакету, а не к сообщению.


Переменный заголовок пакета MQTT 5.0 PUBLISH: имя темы, идентификатор пакета и свойства.

Переменный заголовок пакета MQTT 5.0 PUBLISH ДОЛЖЕН иметь имя темы, а если QoS больше 0 (нуля), он также ДОЛЖЕН иметь идентификатор пакета. За этими двумя полями обычно следует набор свойств и полезные данные, но пакет PUBLISH без свойств и полезных данных нулевой длины является действительным пакетом. Другими словами, самый простой действительный пакет PUBLISH — это пакет с фиксированным заголовком с QoS 0, без флагов DUP и RETAIN, а также с переменным заголовком, содержащим только имя темы.

Название темы

Поскольку все взаимодействия между клиентами и сервером – и, соответственно, все взаимодействия между пользователями/устройствами – в протоколе обмена сообщениями публикации/подписки вращаются вокруг публикации в теме и подписки на тему, мы можем сказать, что поле "Имя темы" заслуживает здесь особого внимания. Во многих сервисах реального времени вместо названия темы мы встретим термин "канал". Это имеет смысл, поскольку название темы представляет собой информационный канал, на который подписаны клиенты.

Имя темы — это строка в кодировке UTF-8, организованная в иерархическую древовидную структуру. Косая черта ( / U+002F ) используется в качестве разделителя на уровне темы. 

broker1/account12345/EURUSD

Имя чувствительно к регистру. То есть ниже перед нами две разные темы.

  • broker1/account12345/EURUSD
  • broker1/account12345/eurusd

Эти разделители уровней имеют значение только в том случае, если в клиентской подписке присутствует любой из подстановочных знаков фильтра тем (см. ниже). Ограничений на количество уровней нет, кроме ограничения самой строки UTF-8. В конечном итоге имя темы может быть заменено псевдонимом темы (Topic Alias).

"Псевдоним темы — это целочисленное значение, которое используется для идентификации темы вместо использования имени темы. Это уменьшает размер пакета PUBLISH и полезно, когда имена тем длинные и одни и те же имена тем повторно используются в сетевом соединении".

Идентификатор пакета

Идентификатор пакета — это двухбайтовое целочисленное поле, необходимое для пакетов PUBLISH с QoS > 0. Он используется во всех пакетах, непосредственно участвующих в цикле публикации/подписки для управления состоянием сеанса. Идентификатор пакета НЕ ДОЛЖЕН использоваться в PUBLISH с QoS 0.

Он используется для соединения PUBLISH с соответствующими ACK.

Помните, что, поскольку протокол доставки симметричен, при использовании QoS 1 наш Клиент может получить PUBLISH от сервера с тем же идентификатором пакета до получения PUBACK, связанного с предыдущим отправленным PUBLISH.

"Клиент может отправить пакет PUBLISH с идентификатором пакета 0x1234, а затем получить другой пакет PUBLISH с идентификатором пакета 0x1234 от своего сервера, прежде чем он получит PUBACK для отправленного им пакета PUBLISH".

Стоит отметить, что идентификатор пакета также используется для соединения соответствующих подтверждений ACK в пакетах SUBSCRIBE и UNSUBSCRIBE.


Как мы пишем название(я) темы?

Название темы — это первое поле в заголовке переменной. Оно закодировано как строка UTF-8 с некоторыми недопустимыми кодовыми точками Unicode, и здесь есть одна загвоздка. Пожалуйста, взгляните на эти три утверждения с некоторыми требованиями для кодирования строки UTF-8 для MQTT 5.0.

"[…] символьные данные НЕ ДОЛЖНЫ включать кодировки кодовых точек между U+D800 и U+DFFF. Если клиент или сервер получает пакет управления MQTT, содержащий неверно сформированный код, это неправильно сформированный (Malformed) пакет".

"Строка в кодировке UTF-8 НЕ ДОЛЖНА включать кодировку нулевого символа U+0000. Если получатель (клиент или сервер) получает пакет управления MQTT, содержащий U+0000, это неправильно сформированный пакет".

"Данные НЕ ДОЛЖНЫ включать кодировки кодовых точек Unicode [Unicode], перечисленных ниже. Если получатель (клиент или сервер) получает пакет управления MQTT, содержащий любую из них, он МОЖЕТ обрабатывать пакет как неправильно сформированный. Ниже представлены недопустимые кодовые точки Unicode.

Управляющие символы U+0001..U+001F

Управляющие символы U+007F..U+009F

Кодовые точки, определенные в спецификации [Unicode], не являются символами"

Как вы можете видеть, и первый, и второй операторы выше являются строгими (НЕ ДОЛЖНЫ), что означает, что любая совместимая реализация будет проверять наличие запрещенных кодовых точек, тогда как третий оператор является рекомендацией (НЕ ДОЛЖЕН), что означает, что реализация может не проверять наличие запрещенных кодовых точек и при этом считаться соответствующей.

Поскольку неправильно сформированный пакет является причиной ОТКЛЮЧЕНИЯ (DISCONNECT), если мы разрешим эти кодовые точки в нашем Клиенте, а наш брокер решит не рассматривать их как неправильно сформированный пакет,мы можем вызвать отключение других клиентов, которые обеспечивают соблюдение рекомендации. Таким образом, несмотря на то, что исключение управляющих и несимвольных элементов Юникода является лишь рекомендацией, мы не разрешаем их использование в нашей реализации.

На данный момент наша функция для кодирования строк в UTF-8 выглядит следующим образом:

//+------------------------------------------------------------------+
//|                    Encode UTF-8 String                           |
//+------------------------------------------------------------------+
void EncodeUTF8String(string str, ushort& dest_buf[])
  {
   uint str_len = StringLen(str);
// check for disallowed Unicode code points
   uint iter_pos = 0;
   while(iter_pos < str_len)
     {
      Print("Checking disallowed code points");
      ushort code_point = StringGetCharacter(str, iter_pos);
      if(IsDisallowedCodePoint(code_point))
        {
         printf("Found disallowed code point at position %d", iter_pos);
         ZeroMemory(dest_buf);
         return;
        }
      printf("Iter position %d", iter_pos);
      iter_pos++;
     }
   if(str_len == 0)
     {
      Print("Cleaning buffer: string empty");
      ZeroMemory(dest_buf);
      return;
     }
// we have no disallowed code points and the string is not empty: encode it.
   printf("Encoding %d bytes ", str_len);
   ArrayResize(dest_buf, str_len + 2);
   dest_buf[0] = (char)str_len >> 8; // MSB
   dest_buf[1] = (char)str_len % 256; // LSB
   ushort char_array[];
   StringToShortArray(str, char_array, 0, str_len);// to Unicode
   ArrayCopy(dest_buf, char_array, 2);
   ZeroMemory(char_array);
  }

Если строка, переданная в эту функцию, имеет недопустимый код, мы регистрируем ее позицию в строке, передаем буфер назначения в ZeroMemory и немедленно возвращаемся. Поскольку имя темы имеет минимальную требуемую длину 1, если строка пуста, мы делаем то же самое: записываем в журнал, очищаем буфер и возвращаемся.

Кстати, обратите внимание, что мы используем StringToShortArray для преобразования строки в массив Unicode. Если бы мы конвертировали его в массив ASCII, мы бы использовали StringToCharArray. Подробное объяснение и многое другое вы можете найти вкниге, недавно добавленной в документацию, или в этой всеобъемлющей статье о строках MQL5.

Также обратите внимание, что в этом же вызове StringToShortArray мы используем длину строки в качестве последнего параметра вместо функции по умолчанию. Это потому, что нам не нужен нулевой символ (0x00) в нашем массиве, и, согласно документации функции: 

"Значение по умолчанию минус 1, что означает копирование до конца массива или до конечного 0. Конечный 0 также будет скопирован в массив получателей",

в то время как возвращаемое значение StringLen является

"Количеством символов в строке без конечного нуля".

Функция проверки недопустимых кодовых точек выглядит довольно привычно.

//+------------------------------------------------------------------+
//|              IsDisallowedCodePoint                               |
//|   https://unicode.org/faq/utf_bom.html#utf16-2                   |
//+------------------------------------------------------------------+
bool IsDisallowedCodePoint(ushort code_point)
  {
   if((code_point >= 0xD800 && code_point <= 0xDFFF) // Surrogates
      || (code_point > 0x00 && code_point <= 0x1F) // C0 - Control Characters
      || (code_point >= 0x7F && code_point <= 0x9F) // C0 - Control Characters
      || (code_point == 0xFFF0 || code_point == 0xFFFF)) // Specials - non-characters
     {
      return true;
     }
   return false;
  };

Помимо запрещенных кодовых точек, нам также необходимо проверить наличие двух подстановочных знаков, которые используются в фильтрах тем подписок, но запрещены в названии темы: знак плюса ('+' U+002B) и знак номера ('#' U+0023).

Функция проверки недопустимых кодовых точек будет широко использоваться для кодирования любой строки, поэтому она размещается в заголовке MQTT.mqh, а функция проверки наличия подстановочных знаков специфична для имени темы, поэтому она является частью нашего класса CPktPublish.

//+------------------------------------------------------------------+
//|            CPktPublish::HasWildcardChar                          |
//+------------------------------------------------------------------+
bool CPktPublish::HasWildcardChar(const string str)
  {
   if(StringFind(str, "#") > -1 || StringFind(str, "+") > -1)
     {
      printf("Wildcard char not allowed in Topic Names");
      return true;
     }
   return false;
  }

Встроенная функция StringFind возвращает начальную позицию соответствующей подстроки и -1, если соответствующая подстрока не найдена. Поэтому мы просто проверяем любое значение выше -1. Затем мы вызываем его из основной функции.

//+------------------------------------------------------------------+
//|            CPktPublish::SetTopicName                             |
//+------------------------------------------------------------------+
void CPktPublish::SetTopicName(const string topic_name)
  {
   if(HasWildcardChar(topic_name) || StringLen(topic_name) == 0)
     {
      ArrayFree(ByteArray);
      return;
     }
   ushort encoded_string[];
   EncodeUTF8String(topic_name, encoded_string);
   ArrayCopy(ByteArray, encoded_string, 2);
   ByteArray[1] = EncodeVariableByteInteger(encoded_string);
  }

На этом этапе, если подстановочный знак найден, мы делаем ту же "обработку ошибок", что и раньше: записываем информацию, очищаем буфер и немедленно возвращаемся. Позже мы сможем улучшить ее, например, выдавая оповещения.

Последняя строка функции присваивает оставшуюся длину пакета второму байту нашего фиксированного заголовка, используя алгоритм, предложенный Стандартом. Я писал об этом в первой статье серии.

Наши тесты также имеют ту же структуру.

//+------------------------------------------------------------------+
//|           TEST_SetTopicName_WildcardChar_NumberSign              |
//+------------------------------------------------------------------+
bool TEST_SetTopicName_WildcardChar_NumberSign()
  {
   Print(__FUNCTION__);
//--- Arrange
   static uchar expected[] = {};
   uchar payload[] = {};
//--- Act
   CPktPublish *cut = new CPktPublish(payload);
   cut.SetTopicName("a#");
   uchar result[];
   ArrayCopy(result, cut.ByteArray);
//--- Assert
   bool isTrue = AssertEqual(expected, result);
//--- cleanup
   delete cut;
   ZeroMemory(result);
   return isTrue;
  }

Если вы запустите тесты, вы должны увидеть что-то вроде этого:

Рис. 6. MQTT 5.0 - Название темы вывода теста PUBLISH

Рис. 6. MQTT 5.0 - Название темы вывода теста PUBLISH


Как мы пишем идентификатор(ы) пакета?

Идентификатор пакета НЕ предназначен для назначения пользователем. Вместо этого он ДОЛЖЕН быть назначен Клиентом любому пакету PUBLISH, где уровень QoS > 0, и НЕ ДОЛЖЕН назначаться в противном случае. Другими словами, каждый раз, когда мы создаем пакет PUBLISH с QoS 1 или QoS 2, мы должны установить его идентификатор пакета. 

Мы можем провести необходимый тест прямо сейчас. Все, что нам нужно, — это создать экземпляр пакета и установить для него необходимое имя темы и QoS равный 1 или 2. Результирующий массив байтов пакета должен иметь идентификатор пакета.

//+------------------------------------------------------------------+
//|            TEST_SetPacketID_QoS2_TopicName1Char                  |
//+------------------------------------------------------------------+
bool TEST_SetPacketID_QoS2_TopicName5Char()
  {
   Print(__FUNCTION__);
// Arrange
   uchar payload[] = {};
   uchar result[]; // expected {52, 9, 0, 1, 'a', 'b', 'c', 'd', 'e', pktID MSB, pktID LSB}
// Act
   CPktPublish *cut = new CPktPublish(payload);
// FIX: if we call SetQoS first this test breaks
   cut.SetTopicName("abcde");
   cut.SetQoS_2(true);
   ArrayCopy(result, cut.ByteArray);
// Assert
   ArrayPrint(result);
   bool is_true = result[9] > 0 || result[10] > 0;
// cleanup
   delete cut;
   ZeroMemory(result);
   return is_true;
  }

Обратите внимание, что мы не можем проверить значение сгенерированного идентификатора пакета, поскольку это (псевдо) случайное сгенерированное число, как вы можете видеть ниже в реализации заглушки. Вместо этого мы проверяем его наличие. Также обратите внимание, что нам нужно внести исправление (FIX). Порядок вызова функций SetTopicName и SetQoS_X неожиданным образом влияет на результирующий массив байтов. Не рекомендуется иметь зависимость порядка вызова между функциями. Это было бы ошибкой, но, как говорится, ошибка — это не написанный тест. Итак, мы напишем тест на отсутствие этой зависимости от порядка вызова в следующей итерации. На данный момент нас интересует только прохождение этого теста.

Конечно, тест даже не скомпилируется, пока у нас не будет реализации функции для установки идентификаторов пакетов. Поскольку идентификатор(ы) пакета требуются в нескольких управляющих пакетах, функция для его записи НЕ должна быть членом класса CPktPublish. Заголовок MQTT.mqh кажется более подходящим файлом для его размещения.

//+------------------------------------------------------------------+
//|            SetPacketID                                           |
//+------------------------------------------------------------------+
void SetPacketID(uchar& buf[], int start_idx)
  {
// MathRand - Before the first call of the function, it's necessary to call
// MathSrand to set the generator of pseudorandom numbers to the initial state.
   MathSrand((int)TimeLocal());
   int packet_id = MathRand();
   if(ArrayResize(buf, buf.Size() + 2) < 0)
     {
      printf("ERROR: failed to resize array at %s", __FUNCTION__);
      return;
     }
   buf[start_idx] = (uchar)packet_id >> 8; // MSB
   buf[start_idx + 1] = (uchar)packet_id % 256; //LSB
  }

Мы используем встроенную функцию MathRand для генерации идентификаторов пакетов. Для этого необходимо предварительно вызвать MathSrand. Мы должны передать этой функции "начальное число" (seed) для генератора случайных чисел. Мы выбрали TimeLocal в качестве начального значения, следуя рекомендации о генерации псевдослучайных чисел в MQL5 в книге, недавно добавленной в документацию.

Чтобы установить идентификатор пакета, мы изменяем размер исходного массива байтов, чтобы освободить место для идентификатора пакета (двухбайтовое целое число), и устанавливаем значения старшего и младшего байта, начиная с позиции, переданной в качестве аргумента (start_idx). Последний шаг — вызвать функцию из нашего класса CPktPublish для методов SetQoS_1 и SetQoS_2.

//+------------------------------------------------------------------+
//|            CPktPublish::SetQoS_2                                 |
//+------------------------------------------------------------------+
void CPktPublish::SetQoS_2(const bool QoS_2)
  {
   QoS_2 ? m_publish_flags |= QoS_2_FLAG : m_publish_flags &= ~QoS_2_FLAG;
   SetFixedHeader(PUBLISH, m_buf, ByteArray, m_publish_flags);
   SetPacketID(ByteArray, ByteArray.Size());
  }

Запустив тесты, включенные в прикрепленные файлы, вы должны увидеть что-то вроде этого (часть строк для краткости вырезана):

Рис. 7. MQTT 5.0 - Идентификатор тестового выходного пакета PUBLISH

Рис. 7. MQTT 5.0 - Идентификатор тестового выходного пакета PUBLISH

Заключение

Поскольку пакеты PUBLISH лежат в основе протокола, их реализация немного более сложна: они имеют разные фиксированные заголовки, им требуется переменный заголовок с именем темы, закодированным как UTF-8 и защищенным от некоторых недопустимых кодовых точек, им требуется идентификатор пакета, если QoS > 0, и они могут использовать почти все пользовательские свойства, доступные в MQTT 5.0.

В этой статье мы рассказали, как мы создаем действительные заголовки PUBLISH с флагами публикации, названием темы и идентификатором пакета. В следующей статье этой серии мы рассмотрим создание свойств.

В качестве примечания о последних изменениях: если вы следите за разработкой этого MQTT-клиента, вы, возможно, заметили, что я изменил несколько сигнатур функций, имена переменных, уровни доступа к полям, тестовые приспособления и т. д. Некоторые из этих изменений ожидаемы при любой разработке программного обеспечения, но большинство из них связано с тем, что мы используем подход TDD и стремимся максимально придерживаться этой методологии. Мы можем ожидать больших изменений, прежде чем получим первый результат.

Как вы знаете, ни один разработчик в одиночку не знает всего, что необходимо для разработки такого Клиента для нашей базы кодов. TDD очень помогает в деле постепенной реализации обширного функционала. Если вы можете помочь, оставьте сообщение в чате сообщества или в комментариях ниже. Любая помощь приветствуется. Спасибо!

Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/13998

Прикрепленные файлы |
Алгоритмическая торговля с MetaTrader 5 и R для начинающих Алгоритмическая торговля с MetaTrader 5 и R для начинающих
В статье мы объединим финансовый анализ с алгоритмической торговлей, а также посмотрим, как можно подружить R и MetaTrader 5. Эта статья — руководство по объединению аналитической гибкости R с огромными торговыми возможностями MetaTrader 5.
Нейросети — это просто (Часть 88): Полносвязный Энкодер временных рядов (TiDE) Нейросети — это просто (Часть 88): Полносвязный Энкодер временных рядов (TiDE)
Желание получить наиболее точные прогнозы толкает исследователей к усложнению моделей прогнозирование. Что в свою очередь ведет к увеличению затрат на обучение и обслуживание модели. Но всегда ли это оправдано? В данной статье я предлагаю Вам познакомиться с алгоритмом, который использует простоту и скорость линейных моделей и демонстрирует результаты на уровне лучших с более сложной архитектурой.
Критерий однородности Смирнова как индикатор нестационарности временного ряда Критерий однородности Смирнова как индикатор нестационарности временного ряда
В статье рассматривается один из самых известных непараметрических критериев однородности — критерий Смирнова. Анализируются как модельные данные, так и реальные котировки. Приводится пример построения индикатора нестационарности (iSmirnovDistance).
Как разработать агент обучения с подкреплением на MQL5 с интеграцией RestAPI (Часть 3): Создание автоматических ходов и тестовых скриптов на MQL5 Как разработать агент обучения с подкреплением на MQL5 с интеграцией RestAPI (Часть 3): Создание автоматических ходов и тестовых скриптов на MQL5
В этой статье рассматривается реализация автоматических ходов в игре "Крестики-нолики" на языке Python, интегрированная с функциями MQL5 и модульными тестами. Цель - улучшить интерактивность игры и обеспечить надежность системы с помощью тестирования на MQL5. Изложение охватывает разработку игровой логики, интеграцию и практическое тестирование, а завершается созданием динамической игровой среды и надежной интегрированной системы.