MQL5 中的 WebSocket 协议

我们之前已经研究过 WebSocket 协议的理论基础。完整的规范非常广泛,要详细介绍其实现方式需要占用大量篇幅。因此,我们提出了现成类的一般结构及其编程接口。所有文件都位于 MQL5/Include/MQL5Book/ws/ 目录中。

  • wsinterfaces.mqh – 所有接口、常量和类型的一般抽象描述。
  • wstransport.mqh – MqlWebSocketTransport 类,可基于 MQL5 套接字函数实现 IWebSocketTransport 低级网络数据传输接口;
  • wsframe.mqh – WebSocketFrame 和 WebSocketFrameHixie 类,可实现 IWebSocketFrame 接口,该接口隐藏了分别为 Hybi 和 Hixie 协议生成(编码和解码)帧的算法;
  • wsmessage.mqh – WebSocketMessage 和 WebSocketMessageHixie 类,它们实现 IWebSocketMessage 接口,分别用于规范化基于 Hybi 和 Hixie 协议从帧构建消息的过程;
  • wsprotocol.mqh – 继承自 IWebSocketConnection 的 WebSocketConnectionHybi、WebSocketConnectionHixie 类;这些类根据规范对帧、消息、问候和断开的形成进行协调管理,为此使用了上述接口;
  • wsclient.mqh – WebSocket 客户端的现成实现;WebSocketClient 模板类,支持 IWebSocketObserver 接口(用于事件处理)并期望 WebSocketConnectionHybi 或 WebSocketConnectionHixie 作为参数化类型;
  • wstools.mqh – WsTools 名称空间中有用的实用工具。

这些头文件都将作为 #include 指令的依赖项自动包含在我们未来的 mqporj 项目中。

MQL5 中的 WebSocket 类图

MQL5 中的 WebSocket 类图

底层网络接口 IWebSocketTransport 有以下方法。

interface IWebSocketTransport
{
   int write(const uchar &data[]); // write the array of bytes to the network
   int read(uchar &buffer[]);      // read data from network into byte array
   bool isConnected(voidconst;   // check for connection
   bool isReadable(voidconst;    // check for the ability to read from the network
   bool isWritable(voidconst;    // check for the possibility of writing to the network
   int getHandle(voidconst;      // system socket descriptor
   void close(void);               // close connection
};

从这些方法的名称中不难猜出将使用哪些 MQL5 API 套接字函数来构建它们。但如果需要,也可以通过他们自己的方式实现这个接口,例如,通过一个 DLL。

实现此接口的 MqlWebSocketTransport 类需要在创建实例时建立网络连接的协议、主机名和端口号。此外,你可以指定超时值。

帧类型在 WS_FRAME_OPCODE 枚举中收集。

enum WS_FRAME_OPCODE
{
   WS_DEFAULT = 0,
   WS_CONTINUATION_FRAME = 0x00,
   WS_TEXT_FRAME = 0x01,
   WS_BINARY_FRAME = 0x02,
   WS_CLOSE_FRAME = 0x08,
   WS_PING_FRAME = 0x09,
   WS_PONG_FRAME = 0x0A
};

用于处理框架的接口包含与框架实例相关的静态和常规方法。静态方法充当工厂,由发送方创建所需类型的帧 (create) 和传入帧 (decode)。

class IWebSocketFrame
{
public:
   class StaticCreator
   {
   public:
      virtual IWebSocketFrame *decode(uchar &data[], IWebSocketFrame *head = NULL) = 0;
      virtual IWebSocketFrame *create(WS_FRAME_OPCODE typeconst string data = NULL,
         const bool deflate = false) = 0;
      virtual IWebSocketFrame *create(WS_FRAME_OPCODE typeconst uchar &data[],
         const bool deflate = false) = 0;
   };
   ...

由于存在 Creator 模板和返回该模板的 getCreator 方法的实例(假设返回 "singleton"),则工厂方法在子类中是强制存在的。

protected:
   template<typename P>
   class Creatorpublic StaticCreator
   {
   public:
     // decode received binary data in IWebSocketFrame
     // (in case of continuation, previous frame in 'head')
      virtual IWebSocketFrame *decode(uchar &data[],
         IWebSocketFrame *head = NULLoverride
      {
         return P::decode(datahead);
      }
      // create a frame of the desired type (text/closing/other) with optional text
      virtual IWebSocketFrame *create(WS_FRAME_OPCODE typeconst string data = NULL,
         const bool deflate = falseoverride
      {
         return P::create(typedatadeflate);
      };
      // create a frame of the desired type (binary/text/closure/other) with data
      virtual IWebSocketFrame *create(WS_FRAME_OPCODE typeconst uchar &data[],
         const bool deflate = falseoverride
      {
         return P::create(typedatadeflate);
      };
   };
public:
   // require a Creator instance
   virtual IWebSocketFrame::StaticCreator *getCreator() = 0;
   ...

接口的其余方法提供了对帧中数据的所有必要操作(编码/解码、接收数据和各种标志)。

   // encode the "clean" contents of the frame into data for transmission over the network
   virtual int encode(uchar &encoded[]) = 0;
   
   // get data as text
   virtual string getData() = 0;
   
   // get data as bytes, return size
   virtual int getData(uchar &buf[]) = 0;
   
   // return frame type (opcode)
   virtual WS_FRAME_OPCODE getType() = 0;
  
   // check if the frame is a control frame or with data:
   // control frames are processed inside classes
   virtual bool isControlFrame()
   {
      return (getType() >= WS_CLOSE_FRAME);
   }
   
   virtual bool isReady() { return true; }
   virtual bool isFinal() { return true; }
   virtual bool isMasked() { return false; }
   virtual bool isCompressed() { return false; }
};

IWebSocketMessage 接口包含用于执行类似操作的方法,但是是在消息层级。

class IWebSocketMessage
{
public:
   // get an array of frames that make up this message
   virtual void getFrames(IWebSocketFrame *&frames[]) = 0;
   
   // set text as message content
   virtual bool setString(const string &data) = 0;
  
   // return message content as text
   virtual string getString() = 0;
  
   // set binary data as message content
   virtual bool setData(const uchar &data[]) = 0;
   
   // return the contents of the message in "raw" binary form
   virtual bool getData(uchar &data[]) = 0;
  
   // sign of completeness of the message (all frames received)
   virtual bool isFinalised() = 0;
  
   // add a frame to the message
   virtual bool takeFrame(IWebSocketFrame *frame) = 0;
};

考虑到帧和消息的接口,定义了 WebSocket 连接 IWebSocketConnection

interface IWebSocketConnection
{
   // open a connection with the specified URL and its parts,
   // and optional custom headers
   bool handshake(const string urlconst string hostconst string origin,
      const string custom = NULL);
   
   // low-level read frames from the server
   int readFrame(IWebSocketFrame *&frames[]);
   
   // low-level send frame (e.g. close or ping)
   bool sendFrame(IWebSocketFrame *frame);
   
   // low-level message sending
   bool sendMessage(IWebSocketMessage *msg);
   
   // custom check for new messages (event generation)
   int checkMessages();
   
   // custom text submission
   bool sendString(const string msg);
   
   // custom posting of binary data
   bool sendData(const uchar &data[]);
   
   // close the connection
   bool disconnect(void);
};

通过 IWebSocketObserver 接口方法接收关于断开连接和新消息的通知。

interface IWebSocketObserver
{
  void onConnected();
  void onDisconnect();
  void onMessage(IWebSocketMessage *msg);
};

特别是,WebSocketClient 类是这个接口的继承者,默认情况下,它只是将信息输出到日志。类构造函数需要一个地址来连接到 wswss 协议。

template<typename T>
class WebSocketClientpublic IWebSocketObserver
{
protected:
   IWebSocketMessage *messages[];
   
   string scheme;
   string host;
   string port;
   string origin;
   string url;
   int timeOut;
   ...
public:
   WebSocketClient(const string address)
   {
      string parts[];
      URL::parse(addressparts);
   
      url = address;
      timeOut = 5000;
  
      scheme = parts[URL_SCHEME];
      if(scheme != "ws" && scheme != "wss")
      {
        Print("WebSocket invalid url scheme: "scheme);
        scheme = "ws";
      }
  
      host = parts[URL_HOST];
      port = parts[URL_PORT];
  
      origin = (scheme == "wss" ? "https://" : "http://") + host;
   }
   ...
  
   void onDisconnect() override
   {
      Print(" > Disconnected "url);
   }
  
   void onConnected() override
   {
      Print(" > Connected "url);
   }
  
   void onMessage(IWebSocketMessage *msgoverride
   {
      // NB: message can be binary, print it just for notification
      Print(" > Message "url" " , msg.getString());
      WsTools::push(messagesmsg);
   }
   ...
};

WebSocketClient 类将所有消息对象收集到一个数组中并负责删除这些对象(如果 MQL 程序未执行删除)。

连接是在 open 方法中创建的。

template<typename T>
class WebSocketClientpublic IWebSocketObserver
{
protected:
   IWebSocketTransport *socket;
   IWebSocketConnection *connection;
   ...
public:
   ...
   bool open(const string custom_headers = NULL)
   {
      uint _port = (uint)StringToInteger(port);
      if(_port == 0)
      {
         if(scheme == "ws"_port = 80;
         else _port = 443;
      }
  
      socket = MqlWebSocketTransport::create(schemehost_porttimeOut);
      if(!socket || !socket.isConnected())
      {
         return false;
      }
  
      connection = new T(&thissocket);
      return connection.handshake(urlhostorigincustom_headers);
   }
   ...

文本和二进制数据的重载 send 方法提供了发送数据的最便捷的方式。

   bool send(const string str)
   {
      return connection ? connection.sendString(str) : false;
   }
    
   bool send(const uchar &data[])
   {
      return connection ? connection.sendData(data) : false;
   }

若要检查新的传入消息,可以调用 checkMessages 方法。根据其 blocking 参数,该方法在循环中等待消息直至超时,或者在没有消息时立即返回。消息将转到 IWebSocketObserver::onMessage 处理程序。

   void checkMessages(const bool blocking = true)
   {
      if(connection == NULLreturn;
      
      uint stop = GetTickCount() + (blocking ? timeOut : 1);
      while(ArraySize(messages) == 0 && GetTickCount() < stop && isConnected())
      {
         // all frames are collected into the appropriate messages, and they become
         // available through event notifications IWebSocketObserver::onMessage,
         // however, control frames have already been internally processed and removed by now
         if(!connection.checkMessages()) // while no messages, let's make micro-pause
         {
            Sleep(100);
         }
      }
   }

另一种接收消息的方法是在 readMessage 方法中实现的:它返回一个指向调用代码的消息的指针(也就是说,onMessage 应用处理程序不是必要的)。之后,MQL 程序负责释放对象。

   IWebSocketMessage *readMessage(const bool blocking = true)
   {
      if(ArraySize(messages) == 0checkMessages(blocking);
      
      if(ArraySize(messages) > 0)
      {
         IWebSocketMessage *top = messages[0];
         ArrayRemove(messages01);
         return top;
      }
      return NULL;
   }

该类还允许你更改超时、检查连接并关闭连接。

   void setTimeOut(const int ms)
   {
      timeOut = fabs(ms);
   }
   
   bool isConnected() const
   {
      return socket && socket.isConnected();
   }
   
   void close()
   {
      if(isConnected())
      {
         if(connection)
         {
            connection.disconnect(); // this will close socket after server acknowledge
            delete connection;
            connection = NULL;
         }
         if(socket)
         {
            delete socket;
            socket = NULL;
         }
      }
   }
};

上述类库允许为回显和聊天服务创建客户端应用程序。