
一个使用命名管道在 MetaTrader 5 客户端之间进行通信的无 DLL 解决方案
简介
有时,我想知道在 MetaTrader 5 客户端之间进行通信的几种可能方式。我的目标是使用价格变动指标并且在其中一个客户端上显示来自不同报价者的价格变动。
自然的解决方案是使用硬盘中的单独文件。一个客户端将数据写入到文件,另一个客户端会读取该文件。此方法尽管与发送单一消息有关,但是似乎不是针对流式报价的最有效方式。
之后,我偶然遇到了 Alexander 所写的一篇关于如何使用 WCF 服务将报价导出到 .NET 应用程序的好文章,在我即将到此为止的时候,又遇到 Sergeev 所写的另一篇文章。
两篇文章都接近我想要的,但是我需要一个能够被不同客户端(一个担当服务器,另一个担当客户端)使用的无 DLL 解决方案。在搜索 Web 的时候,我找到一条建议使用命名管道进行通信的说明,我仔细阅读了针对使用管道进行进程间通信的 MSDN 规范。
我发现命名管道支持在同一台计算机上或通过内部网在不同的计算机上进行的通信,因此我决定采用这种方法。
本文介绍了命名管道通信并说明了设计 CNamedPipes 类的过程。它还包含在 MetaTrader 5 客户端之间测试价格变动指标流以及整体系统吞吐能力。
1. 使用命名管道进行进程间通信
当我们考虑一个典型的管道时,我们会想到一种用于传输介质的缸体。这也是一个用于表示在操作系统中进行进程间通信的术语。您可以简单地想象一条连接两个进程的管道,在我们的例子中是交换数据的 MetaTrader 5 客户端。
管道可以是匿名的,也可以是有名称的。两者之间有两个主要的不同之处:第一个是匿名管道不能在网络中使用,第二个是两个进程必须是相关的。即一个进程必须是父进程,另一个进程必须是子进程。命名管道没有这个限制。
为了使用管道进行通信,服务器进程必须采用已知名称构建一条管道。管道名称是一个字符串,并且必须采取 \\servername\pipe\pipename 的形式。如果管道在同一台计算机上使用,则可以忽略 servername(服务器名),并用一个点来代替: \\.\pipe\pipename。
试图连接到管道的客户端必须知道其名称。为了区分客户端,我使用 \\.\pipe\mt[account_number] 的命名约定,但是命名约定可以任意改变。
2. 实施 CNamedPipes 类
我将以简短说明创建和连接命名管道的低级机制来开始。在 Windows 操作系统中,所有处理管道的函数都可以通过 kernel32.dll 库来获得。在服务器端创建命名管道实例的函数是 CreateNamedPipe()。
在创建管道之后,服务器调用 ConnectNamedPipe() 函数,等待客户端连接。如果连接成功,则 ConnectNamedPipe() 返回一个不等于 0 的整数。但是有可能客户端在调用 CreateNamedPipe() 之后以及调用 ConnectNamedPipe() 之前成功连接。在这种情况下,ConnectNamedPipe() 返回 0,并且 GetLastError() 返回错误 535 (0X217):ERROR_PIPE_CONNECTED。
写入管道和从管道读取是通过与文件存取相同的函数实现的:
BOOL WINAPI ReadFile( __in HANDLE hFile, __out LPVOID lpBuffer, __in DWORD nNumberOfBytesToRead, __out_opt LPDWORD lpNumberOfBytesRead, __inout_opt LPOVERLAPPED lpOverlapped ); BOOL WINAPI WriteFile( __in HANDLE hFile, __in LPCVOID lpBuffer, __in DWORD nNumberOfBytesToWrite, __out_opt LPDWORD lpNumberOfBytesWritten, __inout_opt LPOVERLAPPED lpOverlapped );
学习命名管道之后,我设计了 CNamedPipes 类来隐藏底层低级结构。
现在,足以将 CNamedPipes.mqh 文件放入客户端相应的 (/include) 文件夹中,并将其包含在源代码中,声明一个 CNamedPipe 对象。
我设计的类具有几个基本方法来处理命名管道:
Create()、Connect()、Disconnect()、Open()、Close()、WriteUnicode()、ReadUnicode()、WriteANSI()、ReadANSI()、WriteTick()、ReadTick()
可以依据其他要求进一步扩展类。
Create() 方法试图用指定名称创建管道。为了简化客户端之间的连接,输入参数 'account' 是要使用管道的客户端的帐号。
如果未输入帐户名,则方法尝试使用当前客户端的帐号打开一条管道。如果管道创建成功,Create() 函数返回 true。
//+------------------------------------------------------------------ /// Create() : 尝试创建命名管道的实例 /// \参数 account - 源终端帐号 /// \返回 true - 如果创建成功, false 否则 //+------------------------------------------------------------------ bool CNamedPipe::Create(int account=0) { if(account==0) pipeNumber=IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN)); else pipeNumber=IntegerToString(account); string fullPipeName=pipeNamePrefix+pipeNumber; hPipe=CreateNamedPipeW(fullPipeName, (int)GENERIC_READ|GENERIC_WRITE|(ENUM_PIPE_ACCESS)PIPE_ACCESS_DUPLEX, (ENUM_PIPE_MODE)PIPE_TYPE_RW_BYTE,PIPE_UNLIMITED_INSTANCES, BufferSize*sizeof(ushort),BufferSize*sizeof(ushort),0,NULL); if(hPipe==INVALID_HANDLE_VALUE) return false; else return true; }
Connect() 方法等待客户端连接到管道。如果客户端成功连接到管道,它返回 true。
//+------------------------------------------------------------------ /// Connect() : 等待客户端连接管道 /// \返回 true - 如果连接, false 否则. //+------------------------------------------------------------------ bool CNamedPipe::Connect(void) { if(ConnectNamedPipe(hPipe,NULL)==false) return(kernel32::GetLastError()==ERROR_PIPE_CONNECTED); else return true; }
Disconnect() 方法从管道断开服务器。
//+------------------------------------------------------------------ /// Disconnect(): 从通道断开 /// \返回 true - 如果断开, false 否则 //+------------------------------------------------------------------ bool CNamedPipe::Disconnect(void) { return DisconnectNamedPipe(hPipe); }
Open() 方法应由客户端使用,它尝试打开先前创建的管道。如果管道成功打开,则返回 true。如果出于某些原因它未能在 5 秒钟内连接到创建的管道或者打开管道失败,则返回 false。
//+------------------------------------------------------------------ /// Open() : 试图打开之前创建的管道 /// \参数 account - 源终端帐号 /// \返回 true - 如果成功, false 否则. //+------------------------------------------------------------------ bool CNamedPipe::Open(int account=0) { if(account==0) pipeName=IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN)); else pipeName=IntegerToString(account); string fullPipeName=pipeNamePrefix+pipeName; if(hPipe==INVALID_HANDLE_VALUE) { if(WaitNamedPipeW(fullPipeName,5000)==0) { Print("管道 "+fullPipeName+" 不可用..."); return false; } hPipe=CreateFileW(fullPipeName,GENERIC_READ|GENERIC_WRITE,0,NULL,OPEN_EXISTING,0,NULL); if(hPipe==INVALID_HANDLE_VALUE) { Print("管道打开失败"); return false; } } return true; }
Close() 方法关闭管道句柄。
//+------------------------------------------------------------------ /// Close() : 关闭管道句柄 /// \返回 0 如果成功, 非-0 否则 //+------------------------------------------------------------------ int CNamedPipe::Close(void) { return CloseHandle(hPipe); }
接下来的六种方法用于读写管道。前两对处理 Unicode 和 ANSI 格式的字符串,都能用于在客户端之间发送命令或消息。
MQL5 中的字符串变量存储在一个包含 Unicode 的对象中,因此自然的方式是提供 Unicode 方法,但是因为 MQL5 提供 UnicodeToANSI 方法,因此我也实施 ANSI 字符串通信。最后两种方法处理通过命名管道发送和接收 MqlTick 对象。
WriteUnicode() 方法写入由 Unicode 字符构成的消息。因为每个字符由两个字节构成,它作为 ushort 数组发送到管道。
//+------------------------------------------------------------------ /// WriteUnicode() : 写 Unicode 字符串至管道 /// \参数 message - 发送的字符串 /// \返回写入管道的字节数 //+------------------------------------------------------------------ int CNamedPipe::WriteUnicode(string message) { int ushortsToWrite, bytesWritten; ushort UNICODEarray[]; ushortsToWrite = StringToShortArray(message, UNICODEarray); WriteFile(hPipe,ushortsToWrite,sizeof(int),bytesWritten,0); WriteFile(hPipe,UNICODEarray,ushortsToWrite*sizeof(ushort),bytesWritten,0); return bytesWritten; }
ReadUnicode() 方法接收 ushort 数组,然后返回一个字符串对象。
//+------------------------------------------------------------------ /// ReadUnicode(): 从管道读 Unicode 字符串 /// \返回 Unicode 字符串 (MQL5 字符串) //+------------------------------------------------------------------ string CNamedPipe::ReadUnicode(void) { string ret; ushort UNICODEarray[STR_SIZE*sizeof(uint)]; int bytesRead, ushortsToRead; ReadFile(hPipe,ushortsToRead,sizeof(int),bytesRead,0); ReadFile(hPipe,UNICODEarray,ushortsToRead*sizeof(ushort),bytesRead,0); if(bytesRead!=0) ret = ShortArrayToString(UNICODEarray); return ret; }
WriteANSI() 方法将 ANSI uchar 数组写入管道。
//+------------------------------------------------------------------ /// WriteANSI() : 写 ANSI 字符串至管道 /// \参数 message - 发送的字符串 /// \返回写入管道的字节数 //+------------------------------------------------------------------ int CNamedPipe::WriteANSI(string message) { int bytesToWrite, bytesWritten; uchar ANSIarray[]; bytesToWrite = StringToCharArray(message, ANSIarray); WriteFile(hPipe,bytesToWrite,sizeof(int),bytesWritten,0); WriteFile(hPipe,ANSIarray,bytesToWrite,bytesWritten,0); return bytesWritten; }
ReadANSI() 方法从管道读取 uchar 数组,然后返回一个字符串对象。
//+------------------------------------------------------------------ /// ReadANSI(): 从管道读 ANSI 字符串 /// \返回 Unicode 字符串 (MQL5 字符串) //+------------------------------------------------------------------ string CNamedPipe::ReadANSI(void) { string ret; uchar ANSIarray[STR_SIZE]; int bytesRead, bytesToRead; ReadFile(hPipe,bytesToRead,sizeof(int),bytesRead,0); ReadFile(hPipe,ANSIarray,bytesToRead,bytesRead,0); if(bytesRead!=0) ret = CharArrayToString(ANSIarray); return ret; }
WriteTick() 方法将一个 MqlTick 对象写入管道。
//+------------------------------------------------------------------ /// WriteTick() : 写 MqlTick 至管道 /// \参数 发送的 MqlTick /// \返回 true 如果即时价写成功, false 否则 //+------------------------------------------------------------------ int CNamedPipe::WriteTick(MqlTick &outgoing) { int bytesWritten; WriteFile(hPipe,outgoing,MQLTICK_SIZE,bytesWritten,0); return bytesWritten; }
ReadTick() 方法从管道读取一个 MqlTick 对象。如果管道是空的,则返回 0,否则返回 MqlTick 对象的字节数。
//+------------------------------------------------------------------ /// ReadTick() : 从管道读 MqlTick /// \返回 true 如果即时价读成功, false 否则 //+------------------------------------------------------------------ int CNamedPipe::ReadTick(MqlTick &incoming) { int bytesRead; ReadFile(hPipe,incoming,MQLTICK_SIZE,bytesRead,NULL); return bytesRead; } //+------------------------------------------------------------------
知道处理命名管道的基本方法之后,我们可以用两个 MQL 程序开始:接收报价的简单脚本和发送报价的指标。
3. 接收报价的服务器脚本
示例服务器初始化命名管道并等待客户端连接。在客户端断开连接之后,它将显示该客户端总共收到多少价格变动,并且等待新的客户端连接。如果客户端断开连接,并且服务器发现全局变量 'gvar0',则它退出。如果变量 'gvar0' 不存在,则可以通过右击一个图表并选择 Expert List(EA 交易列表)选项来手动停止服务器。
//+------------------------------------------------------------------ //| NamedPipeServer.mq5 | //| Copyright 2010, Investeo.pl | //| http:/Investeo.pl | //+------------------------------------------------------------------ #property copyright "Copyright 2010, Investeo.pl" #property link "http:/Investeo.pl" #property version "1.00" #include <CNamedPipes.mqh> CNamedPipe pipe; //+------------------------------------------------------------------+ //| 交易程序初始函数 | //+------------------------------------------------------------------+ void OnStart() { bool tickReceived; int i=0; if(pipe.Create()==true) while (GlobalVariableCheck("gvar0")==false) { Print("等待客户端连接."); if (pipe.Connect()==true) Print("管道已连接"); while(true) { do { tickReceived=pipe.ReadTick(); if(tickReceived==false) { if(GetError()==ERROR_BROKEN_PIPE) { Print("客户端从管道断开 "+pipe.Name()); pipe.Disconnect(); break; } } else i++; Print(IntegerToString(i) + "即时价收到."); } while(tickReceived==true); if (i>0) { Print(IntegerToString(i) + "即时价收到."); i=0; }; if(GlobalVariableCheck("gvar0")==true || (GetError()==ERROR_BROKEN_PIPE)) break; } } pipe.Close(); }
4. 发送报价的简单指标
发送报价的指标在 OnInit() 方法中打开一条管道,然后在每次触发 OnCalculate() 方法时发送一个 MqlTick://+------------------------------------------------------------------+ //| SendTickPipeIndicator.mq5 | //| Copyright 2010, Investeo.pl | //| http:/Investeo.pl | //+------------------------------------------------------------------+ #property copyright "Copyright 2010, Investeo.pl" #property link "http:/Investeo.pl" #property version "1.00" #property indicator_chart_window #include <CNamedPipes.mqh> CNamedPipe pipe; int ctx; //+------------------------------------------------------------------+ //| 自定义指标初始化函数 | //+------------------------------------------------------------------+ int OnInit() { while (!pipe.Open(AccountInfoInteger(ACCOUNT_LOGIN))) { Print("管道未创建, 5 秒内重试..."); if (GlobalVariableCheck("gvar1")==true) break; } ctx = 0; return(0); } //+------------------------------------------------------------------+ //| 自定义指标迭代函数 | //+------------------------------------------------------------------+ int OnCalculate(const int rates_total, const int prev_calculated, const datetime& time[], const double& open[], const double& high[], const double& low[], const double& close[], const long& tick_volume[], const long& volume[], const int& spread[]) { ctx++; MqlTick outgoing; SymbolInfoTick(Symbol(), outgoing); pipe.WriteTick(outgoing); Print(IntegerToString(ctx)+" 即时价通过 SendTickPipeClick 发送至服务器."); return(rates_total); } //+------------------------------------------------------------------
5. 在单一客户端中运行来自多个提供程序的价格变动指标
情形变得更加复杂,因为我希望在单独的价格变动指标中显示传入的报价。通过实施在触发 EventChartCustom() 方法时向价格变动指标传送实时价格变动的管道服务器,我实现了此目的。
卖价和买价作为用分号分隔的字符串发送,例如 '1.20223;120225'。相应的指标在 OnChartEvent() 内处理自定义事件,并且显示价格变动图。
//+------------------------------------------------------------------+ //| NamedPipeServerBroadcaster.mq5 | //| Copyright 2010, Investeo.pl | //| http:/Investeo.pl | //+------------------------------------------------------------------+ #property copyright "Copyright 2010, Investeo.pl" #property link "http:/Investeo.pl" #property version "1.00" #property script_show_inputs #include <CNamedPipes.mqh> input int account = 0; CNamedPipe pipe; //+------------------------------------------------------------------+ //| 交易程序初始函数 | //+------------------------------------------------------------------+ void OnStart() { bool tickReceived; int i=0; if(pipe.Create(account)==true) while(GlobalVariableCheck("gvar0")==false) { if(pipe.Connect()==true) Print("管道已连接"); i=0; while(true) { do { tickReceived=pipe.ReadTick(); if(tickReceived==false) { if(kernel32::GetLastError()==ERROR_BROKEN_PIPE) { Print("客户端从管道断开 "+pipe.GetPipeName()); pipe.Disconnect(); break; } } else { i++; Print(IntegerToString(i)+" 服务器收到即时价."); string bidask=DoubleToString(pipe.incoming.bid)+";"+DoubleToString(pipe.incoming.ask); long currChart=ChartFirst(); int chart=0; while(chart<100) { EventChartCustom(currChart,6666,0,(double)account,bidask); currChart=ChartNext(currChart); if(currChart==0) break; // 到达图表清单结尾 chart++; } if(GlobalVariableCheck("gvar0")==true || (kernel32::GetLastError()==ERROR_BROKEN_PIPE)) break; } } while(tickReceived==true); if(i>0) { Print(IntegerToString(i)+"即时价收到."); i=0; }; if(GlobalVariableCheck("gvar0")==true || (kernel32::GetLastError()==ERROR_BROKEN_PIPE)) break; Sleep(100); } } pipe.Close(); }
为了显示价格变动,我选择了放在 MQLmagazine 中的价格变动指标,但是代替 OnCalculate() 方法,我在 OnChartEvent() 内实施处理,并添加了其他指令。只有在参数 dparam 等于管道编号并且事件 id 等于 CHARTEVENT_CUSTOM+6666 时才接受报价以供处理:
void OnChartEvent(const int id, const long &lparam, const double &dparam, const string &sparam) { if (dparam==(double)incomingPipe) if(id>CHARTEVENT_CUSTOM) { if(id==CHARTEVENT_CUSTOM+6666) { // 处理进入的即时价 } } else { // 处理用户时间 } }
在下面的屏幕截图中有三个价格变动指标。
其中两个显示通过管道收到的价格变动,第三个指标不使用管道,其运行旨在检查价格变动是否遗失。
图 1 通过命名管道收到的报价
请找到附带的截屏视频,该视频含有有关我如何运行指标的评论:
图 2 说明指标设置的截屏视频
6. 测试系统吞吐能力
因为管道使用共享内存,通信非常快速。我在两个 MetaTrader 5 客户端之间用一行进行了发送 100 000 和 1000 000 个价格变动的测试。发送脚本使用 WriteTick() 函数,测量时间跨度使用 GetTickCount() 函数:
Print("发送..."); uint start = GetTickCount(); for (int i=0;i<100000;i++) pipe.WriteTick(outgoing); uint stop = GetTickCount(); Print("发送时间" + IntegerToString(stop-start) + " [ms]"); pipe.Close();
服务器读取传入的报价。时间跨度从第一个传入报价到客户端断开连接测量:
//+------------------------------------------------------------------+ //| SpeedTestPipeServer.mq5 | //| Copyright 2010, Investeo.pl | //| http:/Investeo.pl | //+------------------------------------------------------------------+ #property copyright "Copyright 2010, Investeo.pl" #property link "http:/Investeo.pl" #property version "1.00" #property script_show_inputs #include <CNamedPipes.mqh> input int account=0; bool tickReceived; uint start,stop; CNamedPipe pipe; //+------------------------------------------------------------------+ //| 交易程序初始函数 | //+------------------------------------------------------------------+ void OnStart() { int i=0; if(pipe.Create(account)==true) if(pipe.Connect()==true) Print("管道已连接"); do { tickReceived=pipe.ReadTick(); if(i==0) start=GetTickCount(); if(tickReceived==false) { if(kernel32::GetLastError()==ERROR_BROKEN_PIPE) { Print("客户端从管道断开 "+pipe.GetPipeName()); pipe.Disconnect(); break; } } else i++; } while(tickReceived==true); stop=GetTickCount(); if(i>0) { Print(IntegerToString(i)+" ticks received."); i=0; }; pipe.Close(); Print("服务器: 接收时间 "+IntegerToString(stop-start)+" [ms]"); } //+------------------------------------------------------------------
10 次样本运行的结果如下所示:
运行 |
报价数量 |
发送时间 [ms] |
接收时间 [ms] |
---|---|---|---|
1 |
100000 |
624 |
624 |
2 | 100000 | 702 | 702 |
3 | 100000 | 687 | 687 |
4 | 100000 | 592 | 608 |
5 | 100000 | 624 | 624 |
6 | 1000000 | 5616 | 5616 |
7 | 1000000 | 5788 | 5788 |
8 | 1000000 | 5928 | 5913 |
9 |
1000000 | 5772 | 5756 |
10 |
1000000 | 5710 | 5710 |
表 1 吞吐速度测量
在运行 Windows Vista,采用 2.0GHz T4200 CPU 和 3GB RAM 的笔记本电脑上,发送 1000 000 次报价的平均速度为 170 000 次价格变动/秒。
总结
我介绍了一种使用命名管道在 MetaTrader 5 客户端之间进行通信的方法。该方法足以在客户端之间发送实时报价。
可以依据其他要求进一步扩展 CNamedPipes 类,例如可以在两个独立的帐户上对冲。请找到附带的 CNamedPipe 类源代码,以及采用 chm 格式的说明文档,以及我为撰写本文而实施的其他源代码。
本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/115
注意: MetaQuotes Ltd.将保留所有关于这些材料的权利。全部或部分复制或者转载这些材料将被禁止。
This article was written by a user of the site and reflects their personal views. MetaQuotes Ltd is not responsible for the accuracy of the information presented, nor for any consequences resulting from the use of the solutions, strategies or recommendations described.




在调用 writeTick、readTick......等方法时,我还遇到了访问违规错误。请查看附件中已修复的 CNamedPipes 文件。
我还在 CNamedPipe 类中添加了 WriteDouble 方法 :
附注:要在客户端读/写(例如从另一个 Metatrader 终端),我只是使用了标准的 MQL5 FilePipe 库。如何使用它们的示例可在此处下载:
https://www.mql5.com/zh/articles/503