MQL5 中的 WebSocket 协议
我们之前已经研究过 WebSocket 协议的理论基础。完整的规范非常广泛,要详细介绍其实现方式需要占用大量篇幅。因此,我们提出了现成类的一般结构及其编程接口。所有文件都位于 MQL5/Include/MQL5Book/ws/ 目录中。
- wsinterfaces.mqh 所有接口、常量和类型的一般抽象描述。
- wstransport.mqh MqlWebSocketTransport 类,可基于 MQL5 套接字函数实现 IWebSocketTransport 低级网络数据传输接口;
- wsframe.mqh WebSocketFrame 和 WebSocketFrameHixie 类,可实现 IWebSocketFrame 接口,该接口隐藏了分别为 Hybi 和 Hixie 协议生成(编码和解码)帧的算法;
- wsmessage.mqh WebSocketMessage 和 WebSocketMessageHixie 类,它们实现 IWebSocketMessage 接口,分别用于规范化基于 Hybi 和 Hixie 协议从帧构建消息的过程;
- wsprotocol.mqh 继承自 IWebSocketConnection 的 WebSocketConnectionHybi、WebSocketConnectionHixie 类;这些类根据规范对帧、消息、问候和断开的形成进行协调管理,为此使用了上述接口;
- wsclient.mqh WebSocket 客户端的现成实现;WebSocketClient 模板类,支持 IWebSocketObserver 接口(用于事件处理)并期望 WebSocketConnectionHybi 或 WebSocketConnectionHixie 作为参数化类型;
- wstools.mqh WsTools 名称空间中有用的实用工具。
这些头文件都将作为 #include 指令的依赖项自动包含在我们未来的 mqporj 项目中。

MQL5 中的 WebSocket 类图
底层网络接口 IWebSocketTransport 有以下方法。
interface IWebSocketTransport
{
int write(const uchar &data[]); // write the array of bytes to the network
int read(uchar &buffer[]); // read data from network into byte array
bool isConnected(void) const; // check for connection
bool isReadable(void) const; // check for the ability to read from the network
bool isWritable(void) const; // check for the possibility of writing to the network
int getHandle(void) const; // system socket descriptor
void close(void); // close connection
};
|
从这些方法的名称中不难猜出将使用哪些 MQL5 API 套接字函数来构建它们。但如果需要,也可以通过他们自己的方式实现这个接口,例如,通过一个 DLL。
实现此接口的 MqlWebSocketTransport 类需要在创建实例时建立网络连接的协议、主机名和端口号。此外,你可以指定超时值。
帧类型在 WS_FRAME_OPCODE 枚举中收集。
enum WS_FRAME_OPCODE
{
WS_DEFAULT = 0,
WS_CONTINUATION_FRAME = 0x00,
WS_TEXT_FRAME = 0x01,
WS_BINARY_FRAME = 0x02,
WS_CLOSE_FRAME = 0x08,
WS_PING_FRAME = 0x09,
WS_PONG_FRAME = 0x0A
};
|
用于处理框架的接口包含与框架实例相关的静态和常规方法。静态方法充当工厂,由发送方创建所需类型的帧 (create) 和传入帧 (decode)。
class IWebSocketFrame
{
public:
class StaticCreator
{
public:
virtual IWebSocketFrame *decode(uchar &data[], IWebSocketFrame *head = NULL) = 0;
virtual IWebSocketFrame *create(WS_FRAME_OPCODE type, const string data = NULL,
const bool deflate = false) = 0;
virtual IWebSocketFrame *create(WS_FRAME_OPCODE type, const uchar &data[],
const bool deflate = false) = 0;
};
...
|
由于存在 Creator 模板和返回该模板的 getCreator 方法的实例(假设返回 "singleton"),则工厂方法在子类中是强制存在的。
protected:
template<typename P>
class Creator: public StaticCreator
{
public:
// decode received binary data in IWebSocketFrame
// (in case of continuation, previous frame in 'head')
virtual IWebSocketFrame *decode(uchar &data[],
IWebSocketFrame *head = NULL) override
{
return P::decode(data, head);
}
// create a frame of the desired type (text/closing/other) with optional text
virtual IWebSocketFrame *create(WS_FRAME_OPCODE type, const string data = NULL,
const bool deflate = false) override
{
return P::create(type, data, deflate);
};
// create a frame of the desired type (binary/text/closure/other) with data
virtual IWebSocketFrame *create(WS_FRAME_OPCODE type, const uchar &data[],
const bool deflate = false) override
{
return P::create(type, data, deflate);
};
};
public:
// require a Creator instance
virtual IWebSocketFrame::StaticCreator *getCreator() = 0;
...
|
接口的其余方法提供了对帧中数据的所有必要操作(编码/解码、接收数据和各种标志)。
// encode the "clean" contents of the frame into data for transmission over the network
virtual int encode(uchar &encoded[]) = 0;
// get data as text
virtual string getData() = 0;
// get data as bytes, return size
virtual int getData(uchar &buf[]) = 0;
// return frame type (opcode)
virtual WS_FRAME_OPCODE getType() = 0;
// check if the frame is a control frame or with data:
// control frames are processed inside classes
virtual bool isControlFrame()
{
return (getType() >= WS_CLOSE_FRAME);
}
virtual bool isReady() { return true; }
virtual bool isFinal() { return true; }
virtual bool isMasked() { return false; }
virtual bool isCompressed() { return false; }
};
|
IWebSocketMessage 接口包含用于执行类似操作的方法,但是是在消息层级。
class IWebSocketMessage
{
public:
// get an array of frames that make up this message
virtual void getFrames(IWebSocketFrame *&frames[]) = 0;
// set text as message content
virtual bool setString(const string &data) = 0;
// return message content as text
virtual string getString() = 0;
// set binary data as message content
virtual bool setData(const uchar &data[]) = 0;
// return the contents of the message in "raw" binary form
virtual bool getData(uchar &data[]) = 0;
// sign of completeness of the message (all frames received)
virtual bool isFinalised() = 0;
// add a frame to the message
virtual bool takeFrame(IWebSocketFrame *frame) = 0;
};
|
考虑到帧和消息的接口,定义了 WebSocket 连接 IWebSocketConnection。
interface IWebSocketConnection
{
// open a connection with the specified URL and its parts,
// and optional custom headers
bool handshake(const string url, const string host, const string origin,
const string custom = NULL);
// low-level read frames from the server
int readFrame(IWebSocketFrame *&frames[]);
// low-level send frame (e.g. close or ping)
bool sendFrame(IWebSocketFrame *frame);
// low-level message sending
bool sendMessage(IWebSocketMessage *msg);
// custom check for new messages (event generation)
int checkMessages();
// custom text submission
bool sendString(const string msg);
// custom posting of binary data
bool sendData(const uchar &data[]);
// close the connection
bool disconnect(void);
};
|
通过 IWebSocketObserver 接口方法接收关于断开连接和新消息的通知。
interface IWebSocketObserver
{
void onConnected();
void onDisconnect();
void onMessage(IWebSocketMessage *msg);
};
|
特别是,WebSocketClient 类是这个接口的继承者,默认情况下,它只是将信息输出到日志。类构造函数需要一个地址来连接到 ws 或 wss 协议。
template<typename T>
class WebSocketClient: public IWebSocketObserver
{
protected:
IWebSocketMessage *messages[];
string scheme;
string host;
string port;
string origin;
string url;
int timeOut;
...
public:
WebSocketClient(const string address)
{
string parts[];
URL::parse(address, parts);
url = address;
timeOut = 5000;
scheme = parts[URL_SCHEME];
if(scheme != "ws" && scheme != "wss")
{
Print("WebSocket invalid url scheme: ", scheme);
scheme = "ws";
}
host = parts[URL_HOST];
port = parts[URL_PORT];
origin = (scheme == "wss" ? "https://" : "http://") + host;
}
...
void onDisconnect() override
{
Print(" > Disconnected ", url);
}
void onConnected() override
{
Print(" > Connected ", url);
}
void onMessage(IWebSocketMessage *msg) override
{
// NB: message can be binary, print it just for notification
Print(" > Message ", url, " " , msg.getString());
WsTools::push(messages, msg);
}
...
};
|
WebSocketClient 类将所有消息对象收集到一个数组中并负责删除这些对象(如果 MQL 程序未执行删除)。
连接是在 open 方法中创建的。
template<typename T>
class WebSocketClient: public IWebSocketObserver
{
protected:
IWebSocketTransport *socket;
IWebSocketConnection *connection;
...
public:
...
bool open(const string custom_headers = NULL)
{
uint _port = (uint)StringToInteger(port);
if(_port == 0)
{
if(scheme == "ws") _port = 80;
else _port = 443;
}
socket = MqlWebSocketTransport::create(scheme, host, _port, timeOut);
if(!socket || !socket.isConnected())
{
return false;
}
connection = new T(&this, socket);
return connection.handshake(url, host, origin, custom_headers);
}
...
|
文本和二进制数据的重载 send 方法提供了发送数据的最便捷的方式。
bool send(const string str)
{
return connection ? connection.sendString(str) : false;
}
bool send(const uchar &data[])
{
return connection ? connection.sendData(data) : false;
}
|
若要检查新的传入消息,可以调用 checkMessages 方法。根据其 blocking 参数,该方法在循环中等待消息直至超时,或者在没有消息时立即返回。消息将转到 IWebSocketObserver::onMessage 处理程序。
void checkMessages(const bool blocking = true)
{
if(connection == NULL) return;
uint stop = GetTickCount() + (blocking ? timeOut : 1);
while(ArraySize(messages) == 0 && GetTickCount() < stop && isConnected())
{
// all frames are collected into the appropriate messages, and they become
// available through event notifications IWebSocketObserver::onMessage,
// however, control frames have already been internally processed and removed by now
if(!connection.checkMessages()) // while no messages, let's make micro-pause
{
Sleep(100);
}
}
}
|
另一种接收消息的方法是在 readMessage 方法中实现的:它返回一个指向调用代码的消息的指针(也就是说,onMessage 应用处理程序不是必要的)。之后,MQL 程序负责释放对象。
IWebSocketMessage *readMessage(const bool blocking = true)
{
if(ArraySize(messages) == 0) checkMessages(blocking);
if(ArraySize(messages) > 0)
{
IWebSocketMessage *top = messages[0];
ArrayRemove(messages, 0, 1);
return top;
}
return NULL;
}
|
该类还允许你更改超时、检查连接并关闭连接。
void setTimeOut(const int ms)
{
timeOut = fabs(ms);
}
bool isConnected() const
{
return socket && socket.isConnected();
}
void close()
{
if(isConnected())
{
if(connection)
{
connection.disconnect(); // this will close socket after server acknowledge
delete connection;
connection = NULL;
}
if(socket)
{
delete socket;
socket = NULL;
}
}
}
};
|
上述类库允许为回显和聊天服务创建客户端应用程序。