MQLのソケットの使用およびシグナルプロバイダになる方法
--- | 14 9月, 2016
簡単な説明
現代の情報社会はソケットの存在なしに成り立つでしょうか?1982年に登場し現在までほぼ変わることなく、ソケットは私達の為に毎秒動いています。これは私達が暮らすマトリックスの神経終末ネットワークのベースです。
あなたが朝、MetaTraderをオンにすると、すぐにソケットが作成されサーバーに接続します。あなたがブラウザを開くと、数十のソケットの接続が形成され、ウェブからの情報を届けたり、メールの送信や正確な時間のシグナル、分散コンピューティングのギガバイトの為に消えます。
それでは、少し理論を読んでみましょう。WikiやMSDNを見てみてください。関連記事には、構造体や関数に必要不可欠なものが全て書かれており、クライアントやサーバーの作成例も紹介されています。
この記事ではこれらの知識のMQLへの応用を紹介します。
1. 関数と構造体のWinAPIからのポート
WinAPIがC言語の為に書かれたものであることは秘密ではありません。MQL言語は、事実上その親戚と言えます(動作スタイルでも、その精神でも)。主要MQLプログラムで使用するこれらのWinAPI関数の為のmqhファイルを作成しましょう。私達が行うのは、必要に応じたポートです。
TCPクライアントの為のいくつかの関数が必要になります。
- WSAStartup()ーライブラリを初期化する。
- socket()ーソケットを作成する。
- ioctlsocket()ーデータを待っている間にフリーズしないように、非ブロックモードに切り替える。
- connect()ーサーバに接続する。
- recvー聞く、もしくはsend()ープログラムの終了または接続の中断までデータを送信する。
- 動作後にソケットを閉じーclosesocket()、ライブラリを非初期化するーWSACleanup()。
TCPサーバの為に、特定のポートに接続しソケットを接続待機モードに変えることを除いて、同様の関数が必要になります。以下の事を実行します。
- WSAStartup()ーライブラリの初期化。
- socket()ーソケットを作成する。
- ioctlsocket()ー非ブロックモードに変える。
- bind()ーポートに接続する。
- listen()ー接続待機モードに変える。
- 作成成功後、accept()ー聞く。
- クライアントの接続を作成し、これらを使ってプログラムの終了もしくは接続の中断までrecv()/send()モードで作業する。
- 動作後にサーバと接続されたクライアントのリスニングソケットを閉じーclosesocket()、ライブラリを非初期化するーWSACleanup()。
UDPソケットの場合の手順はもう少し短くなります(実質、クライアントとサーバの『握手がない』)。UDPクライアント:
- WSAStartup()ーライブラリの初期化。
- socket()ーソケットを作成する。
- ioctlsocket()ーデータ待機中にフリーズしないように、非ブロックモードに切り替える。
- データの送信ーsendto()/データの受信ーrecvfrom()。
- 動作後に、ソケットを閉じーclosesocket()、ライブラリを非初期化するーWSACleanup()。
UDPサーバではbind関数のみ追加されます。
- WSAStartup()ーライブラリの初期化。
- socket()ーソケットを作成する。
- ioctlsocket()ー非ブロックモードに変える。
- bind()ーポートに接続する。
- recvfrom()ー受信/sendto()ー送信。
- 動作後にサーバと接続されたクライアントのリスニングソケットを閉じーclosesocket()、ライブラリを非初期化するーWSACleanup()。
お分かりのように、手順はそんなに難しいものではありませんが、各関数の呼び出しの為に構造体を記入する必要があります。
a) WSAStartup()
これをMSDNで見てみましょう:
WINAPI:
int WSAAPI WSAStartup(_In_ WORD wVersionRequested, _Out_ LPWSADATA lpWSAData);
_In_、_Out_ — これはパラメータの適用範囲を指定する空の定義です。WSAAPI – これはパラメータの引き渡しのルールを記述しますが、私達はこれも空にすることができます。
資料から分かる通り、最初のパラメータの必要なバージョンを指定する為のMAKEWORDマクロや、LPWSADATA構造体へのポインタを指定する必要があります。マクロは難しいものではありません。これをヘッダファイルからコピーしましょう。
#define MAKEWORD(a, b) ((WORD)(((BYTE)(((DWORD_PTR)(a)) & 0xff)) | ((WORD)((BYTE)(((DWORD_PTR)(b)) & 0xff))) << 8))また全データタイプをMQLで容易に書くことができます。
#define BYTE uchar #define WORD ushort #define DWORD int #define DWORD_PTR ulongMSDNからWSADATA構造をコピーします。これらは上記ですでに定義されているので、データタイプのほとんどの名前は読み取りやすくする為に残します。
struct WSAData { WORD wVersion; WORD wHighVersion; char szDescription[WSADESCRIPTION_LEN+1]; char szSystemStatus[WSASYS_STATUS_LEN+1]; ushort iMaxSockets; ushort iMaxUdpDg; char lpVendorInfo[]; }最後のパラメータlpVendorInfoがMQLでは配列として記載されていることにご注意ください(C言語ではこれはchar*へのポインタ)。同様に定義する配列サイズの定数を記述します。最後に、構造体へのポインタを次のように記述します。
#define LPWSADATA char&
なぜこのようにするのか?答えは簡単です。任意の構造体は、限定されたメモリのピース以外の何物でもありません。これは任意の形式で表現することができ、例えば、同じパラメータを他の構造体の形式で、または同じ配列サイズで表現することができます。私は配列の形式での表現を使用するので、全関数のchar&タイプは、配列のアドレスとなり、そのサイズは必要とする構造体のサイズに対応します。MQLでの関数の宣言は次のようになります。
MQL:
int WSAStartup(WORD wVersionRequested, LPWSADATA lpWSAData[]);
関数の呼び出しとWSAData構造体への結果の取得はこのようになります。char wsaData[]; // 将来の構造体のバイトの配列 ArrayResize(wsaData, sizeof(WSAData)); // そのサイズを構造体のサイズに変更します WSAStartup(MAKEWORD(2,2), wsaData); // 関数を呼び出します
データはwsaDataバイト配列に送信され、キャストを使用することでそこから簡単に情報を収集することができます。
この部分が貴方にとって難しすぎないことを願います。まだ最初の関数ですし、これから沢山のことをしなければいけません。しかし、これで基本原理は理解できたと思うので、ここからは簡単かつ面白くなると思います。
b) socket()
WINAPI: SOCKET WSAAPI socket(_In_ int af, _In_ int type, _In_ int protocol);
同じように行い、MSDNからデータをコピーします。
IPv4の為にTCPソケットを使用する為、すぐにこの関数のパラメータの定数を設定します。
#define SOCKET uint #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #define NO_ERROR 0 #define AF_INET 2 // internetwork: UDP, TCP, etc. #define SOCK_STREAM 1 #define IPPROTO_TCP 6
c) ioctlsocket()
MQL: int ioctlsocket(SOCKET s, int cmd, int &argp);
ここでの最後の引数はポインタからアドレスに置き換えられたものです。
d) connect()
WINAPI: int connect(_In_ SOCKET s, _In_ const struct sockaddr *name, _In_ int namelen);
ここにはsockaddr構造体の引き渡しというちょっとした困難がありますが、私達はすでに基本原理を理解しています。構造体をバイト配列に置き換え、これらをWinAPI関数でのデータの転送に使用します。
構造体を変更を加えることなく、MSDNから引用します。
struct sockaddr { ushort sa_family; // Address family. char sa_data[14]; // Up to 14 bytes of direct address. };そこへのポインタは、配列のアドレスを介して行います。
#define LPSOCKADDR char&MSDNの例ではsockaddr_in構造体が使用されます。これはサイズは同様のものですが、パラメータは異なる方法で記述されています。
struct sockaddr_in { short sin_family; ushort sin_port; struct in_addr sin_addr; char sin_zero[8]; };sin_addrの為のデータはunionであり、この表現の1つは8バイトの整数です。
struct in_addr { ulong s_addr; };MQLでの関数の記述は次の通りです。
MQL: int connect(SOCKET s, LPSOCKADDR name[], int namelen);
これで、クライアントソケットの作成への準備は整いました。あとは、データの受信/送信関数だけです。
プロトタイプは次のようになります。
WINAPI: int send(_In_ SOCKET s, _In_ const char* buf, _In_ int len, _In_ int flags); int recv(_In_ SOCKET s, _Out_ char* buf, _In_ int len, _In_ int flags); MQL: int send(SOCKET s, char& buf[], int len, int flags); int recv(SOCKET s, char& buf[], int len, int flags);二つ目のパラメータがchar*ポインタからchar& []配列に置き換えられたのが分かります
f) UPDの為のrecvfrom()とsendto()
MQLでのプロトタイプは次のようになります。<
WINAPI: int recvfrom(_In_ SOCKET s, _Out_ char* buf, _In_ int len, _In_ int flags, _Out_ struct sockaddr *from, _Inout_opt_ int *fromlen); int sendto(_In_ SOCKET s, _In_ const char* buf, _In_ int len, _In_ int flags, _In_ const struct sockaddr *to, _In_ int tolen); MQL: int recvfrom(SOCKET s,char &buf[],int len,int flags,LPSOCKADDR from[],int &fromlen); int sendto(SOCKET s,const char &buf[],int len,int flags,LPSOCKADDR to[],int tolen);
最後にくるのは、動作後にハンドルを閉じクリアする二つの重要な関数です。
g) closesocket()とWSACleanup()
MQL: int closesocket(SOCKET s); int WSACleanup();
ポートされたWinAPI関数の結果ファイルは次のようになります。
#define BYTE uchar #define WORD ushort #define DWORD int #define DWORD_PTR ulong #define SOCKET uint #define MAKEWORD(a, b) ((WORD)(((BYTE)(((DWORD_PTR)(a)) & 0xff)) | ((WORD)((BYTE)(((DWORD_PTR)(b)) & 0xff))) << 8)) #define WSADESCRIPTION_LEN 256 #define WSASYS_STATUS_LEN 128 #define INVALID_SOCKET (SOCKET)(~0) #define SOCKET_ERROR (-1) #define NO_ERROR 0 #define SOMAXCONN 128 #define AF_INET 2 // internetwork: UDP, TCP, etc. #define SOCK_STREAM 1 #define IPPROTO_TCP 6 #define SD_RECEIVE 0x00 #define SD_SEND 0x01 #define SD_BOTH 0x02 #define IOCPARM_MASK 0x7f /* parameters must be < 128 bytes */ #define IOC_IN 0x80000000 /* copy in parameters */ #define _IOW(x,y,t) (IOC_IN|(((int)sizeof(t)&IOCPARM_MASK)<<16)|((x)<<8)|(y)) #define FIONBIO _IOW('f', 126, int) /* set/clear non-blocking i/o */ //------------------------------------------------------------------ struct WSAData struct WSAData { WORD wVersion; WORD wHighVersion; char szDescription[WSADESCRIPTION_LEN+1]; char szSystemStatus[WSASYS_STATUS_LEN+1]; ushort iMaxSockets; ushort iMaxUdpDg; char lpVendorInfo[]; }; #define LPWSADATA char& //------------------------------------------------------------------ struct sockaddr_in struct sockaddr_in { ushort sin_family; ushort sin_port; ulong sin_addr; //struct in_addr { ulong s_addr; }; char sin_zero[8]; }; //------------------------------------------------------------------ struct sockaddr struct sockaddr { ushort sa_family; // Address family. char sa_data[14]; // Up to 14 bytes of direct address. }; #define LPSOCKADDR char& struct ref_sockaddr { char ref[2+14]; }; //------------------------------------------------------------------ import Ws2_32.dll #import "Ws2_32.dll" int WSAStartup(WORD wVersionRequested,LPWSADATA lpWSAData[]); int WSACleanup(); int WSAGetLastError(); ushort htons(ushort hostshort); ulong inet_addr(char& cp[]); string inet_ntop(int Family,ulong &pAddr,char &pStringBuf[],uint StringBufSize); ushort ntohs(ushort netshort); SOCKET socket(int af,int type,int protocol); int ioctlsocket(SOCKET s,int cmd,int &argp); int shutdown(SOCKET s,int how); int closesocket(SOCKET s); // サーバ関数 int bind(SOCKET s,LPSOCKADDR name[],int namelen); int listen(SOCKET s,int backlog); SOCKET accept(SOCKET s,LPSOCKADDR addr[],int &addrlen); // クライアント関数 int connect(SOCKET s,LPSOCKADDR name[],int namelen); int send(SOCKET s,char &buf[],int len,int flags); int recv(SOCKET s,char &buf[],int len,int flags); #import
2. クライアントとサーバの作成
次の検証の為にどんな形式でソケットを使用するかを考え、クラスのない関数を伴う動作をデモンストレーションすることにしました。第一に、これによってここでは線形で分岐していないプログラミングとは何かということを早く理解することができます。第二に、これによって任意のOOPイデオロギーや自分のニーズに合わせて関数のリファクタリングを行うことができます。経験から言えることは、プログラマーは全てがどのように動作するかを理解する為に、単純なクラスに移動します。
重要!全ての検証において、サーバコードの強制終了時にバインドされたポートが自動的に開放されないことを忘れないでください。これは二回目のソケット作成とbindの呼び出しの試みで「Address already in use」エラーを引き起こします。この問題を解決するには、ソケットでSO_REUSEADDRオプションを使用するか、ターミナルを再起動(最も簡単な方法)する必要があります。貴方のOSで作成したソケットを追跡する為のTCPViewerなどのモニタリングのユーティリティを使用してください。
またサーバがNATまたはクライアント/サーバの陰に隠れておらず、このポートがOSまたはルータでブロックされていない場合に、クライアントはサーバへ接続することができることを理解する必要があります。
最初に一つのコンピュータでクライアントやサーバの検証を行うことができます。しかし、多くのクライアントを完全に使用する為に、最低でも、真っ新な外部IPアドレスと使用する外部ポートを持つVPSでサーバを起動する必要があります。
例1.クライアントへチャートのレイアウトを送信する
簡単な相互作用から始めましょう。サーバからのtplファイルの単発の送信です。
今回のケースでは、接続時に一つのデータだけを取得し、その後で接続を中断する必要があるので、クライアントの側でsend/recvサイクルをサポートする必要はありません。データの送信後すぐに接続は中断されます。
つまり、サーバはクライアントサーバへの接続時にSendを行い、ソケットを終了し、クライアントはこの時Recvを行い、同様にソケットを終了します。勿論、より面白いケースでは、チャート変更の持続的な中継、つまり瞬間的なクライアントとサーバのチャートの同期を行うことができます。これは自分の教え子にチャートをオンラインで見せる、トレーディングの第一人者の為に有益なものでした。しかし今日では、これは異なるウェビナーソフトやスカイプで、画面を通じたビデオストリームの配信で行うことができます。したがって、このテーマはフォーラムでの議題として残しておきます。
誰に、またどんな状況においてこのコード例は有益なのでしょうか?例えば、貴方が毎日、毎時、または毎分、チャート上に自分のインディケータまたはグラフィックオブジェクトを配置しているとします。さらに、別個のチャート上でクライアントの接続を聞き、それらに必要なシンボルと期間の現在のtplを与えるエキスパートサーバが稼働しているとします。
満足しているクライアントは、これからは貴方からのトレードシグナルと目的を通知されることになります。彼らはチャート上で適用されるtplサーバから結びつけるスクリプトを定期的に起動すれば十分になります。
それでは、サーバから始めましょう。全てはエキスパートアドバイザの『流れ』の関数の役割を果たすOnTimerイベントで動作します。1秒に1回サーバの主要なブロックをチェックします(クライアントのリスニング→その人へのデータの送信→接続の終了)。また、サーバのソケット自体の活動もチェックし、中断の場合にはサーバソケットの再作成が行われます。
残念ながら、保存されたtplのテンプレートはファイルサンドボックスから使用できません。そのため、これをProfiles\Templatesフォルダから取り出すには、もう一度WinAPIを使用する必要があります。この部分の詳細は書きません。フルリストは以下のようになります。
//+------------------------------------------------------------------+ //| TplServer | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" input string Host="0.0.0.0"; input ushort Port=8080; uchar tpl[]; int iCnt=0; string exname=""; SOCKET server=INVALID_SOCKET; //------------------------------------------------------------------ OnInit int OnInit() { EventSetTimer(1); exname=MQLInfoString(MQL_PROGRAM_NAME)+".ex5"; return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnChartEvent void OnChartEvent(const int id,const long &lparam,const double &dparam,const string &sparam) { if(iCnt==0) // ファイル作成の制限ー1秒に1回を超えない { Print("Create TPL"); uchar buf[]; CreateTpl(buf); uchar smb[]; StringToCharArray(Symbol(),smb); ArrayResize(smb,10); uchar tf[]; StringToCharArray(IntegerToString(Period()),tf); ArrayResize(tf,10); // 送信データを作成します ArrayCopy(tpl,smb, ArraySize(tpl)); // シンボル名を追加しました ArrayCopy(tpl, tf, ArraySize(tpl)); // 期間の値を追加しました ArrayCopy(tpl,buf, ArraySize(tpl)); // テンプレート自体を追加しました } iCnt++; } //------------------------------------------------------------------ OnTimer void OnTimer() { iCnt=0; // テンプレートの作成カウンタをリセットしました if(server==INVALID_SOCKET) StartServer(Host,Port); else { // サイクル内で全クライアントを取得し、各人に現在のチャートテンプレートを送信します SOCKET client=INVALID_SOCKET; do { client=AcceptClient(); // Accept a client socket if(client==INVALID_SOCKET) return; int slen=ArraySize(tpl); int res=send(client,tpl,slen,0); if(res==SOCKET_ERROR) Print("-Send failed error: "+WSAErrorDescript(WSAGetLastError())); else printf("Sent %d bytes of %d",res,slen); if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-Shutdown failed error: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); } while(client!=INVALID_SOCKET); } } //------------------------------------------------------------------ StartServer void StartServer(string addr,ushort port) { // ライブラリを初期化します char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します server=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(server==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // アドレスとポートへバインドします Print("try bind..."+addr+":"+string(port)); char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-Connect failed error: "+WSAErrorDescript(err)+". Cleanup socket"); CloseClean(); return; } } // 非ブロックモードにします int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } // ポートを聞き、クライアントの接続を許可します if(listen(server,SOMAXCONN)==SOCKET_ERROR) { Print("Listen failed with error: ",WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } Print("start server ok"); } //------------------------------------------------------------------ Accept SOCKET AcceptClient() // Accept a client socket { if(server==INVALID_SOCKET) return INVALID_SOCKET; ref_sockaddr ch; int len=sizeof(ref_sockaddr); SOCKET new_sock=accept(server,ch.ref,len); //sockaddr_in aclient=(sockaddr_in)ch; 接続に関する詳細情報を取得する必要がある場合には、構造体に変換します if(new_sock==INVALID_SOCKET) { int err=WSAGetLastError(); if(err==WSAEWOULDBLOCK) Comment("\nWAITING CLIENT ("+string(TimeCurrent())+")"); else { Print("Accept failed with error: ",WSAErrorDescript(err)); CloseClean(); return INVALID_SOCKET; } } return new_sock; } //------------------------------------------------------------------ CloseClean void CloseClean() // close socket { if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } WSACleanup(); Print("stop server"); } //------------------------------------------------------------------ #import "kernel32.dll" int CreateFileW(string lpFileName,uint dwDesiredAccess,uint dwShareMode,uint lpSecurityAttributes,uint dwCreationDisposition,uint dwFlagsAndAttributes,int hTemplateFile); bool ReadFile(int h,ushort &lpBuffer[],uint nNumberOfBytesToRead,uint &lpNumberOfBytesRead,int lpOverlapped=0); uint SetFilePointer(int h,int lDistanceToMove,int,uint dwMoveMethod); bool CloseHandle(int h); uint GetFileSize(int h,int); #import #define FILE_BEGIN 0 #define OPEN_EXISTING 3 #define GENERIC_READ 0x80000000 #define FILE_ATTRIBUTE_NORMAL 0x00000080 #define FILE_SHARE_READ_ 0x00000001 //------------------------------------------------------------------ LoadTpl bool CreateTpl(uchar &abuf[]) { string path=TerminalInfoString(TERMINAL_PATH); string name="tcpsend.tpl"; // テンプレートを作成しました ChartSaveTemplate(0,name); // 配列内にテンプレートを読み込みます path+="\\Profiles\\Templates\\"+name; int h=CreateFileW(path, GENERIC_READ, FILE_SHARE_READ_, 0, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, 0); if(h==INVALID_HANDLE) return false; uint sz=GetFileSize(h,NULL); ushort rbuf[]; ArrayResize(rbuf,sz); ArrayInitialize(rbuf,0); SetFilePointer(h,0,NULL,FILE_BEGIN); // двинули в начало int r; ReadFile(h,rbuf,sz,r,NULL); CloseHandle(h); // テンプレートからエキスパートアドバイザ名を削除しました string a=ShortArrayToString(rbuf); ArrayResize(rbuf,0); StringReplace(a,exname," "); StringToShortArray(a,rbuf); // (Unicodeを保存しつつ)バイト配列へファイルをコピーしました sz=ArraySize(rbuf); ArrayResize(abuf,sz*2); for(uint i=0; i<sz;++i) { abuf[2*i]=(uchar)rbuf[i]; abuf[2*i+1]=(uchar)(rbuf[i]>>8); } return true; }
クライアントコードはもう少し簡単です。すでにこれは単発のファイルの取得と定義しているので、アクティブなソケットを伴い常に動くエキスパートアドバイザは私達には必要ありません。
クライアントはスクリプトの形で実装されています。全てはOnStartイベントで行われます。
//+------------------------------------------------------------------+ //| TplClient | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "..\Experts\SocketLib.mqh" input string Host="127.0.0.1"; input ushort Port=8080; SOCKET client=INVALID_SOCKET; //------------------------------------------------------------------ OnStart void OnStart() { // ライブラリを初期化します char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します client=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(client==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // サーバに接続します char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); ref_sockaddr ref=(ref_sockaddr)addrin; res=connect(client,ref.ref,sizeof(addrin)); if(res==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-Connect failed error: "+WSAErrorDescript(err)); CloseClean(); return; } } // 非ブロックモードにします int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } Print("connect OK"); // データを受信します uchar rdata[]; char rbuf[512]; int rlen=512; int rall=0; bool bNext=false; while(true) { res=recv(client,rbuf,rlen,0); if(res<0) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-Receive failed error: "+string(err)+" "+WSAErrorDescript(err)); CloseClean(); return; } } else if(res==0 && rall==0) { Print("-Receive. connection closed"); break; } else if(res>0) { rall+=res; ArrayCopy(rdata,rbuf,ArraySize(rdata),0,res); } if(res>=0 && res<rlen) break; } // ソケットを終了しました CloseClean(); printf("receive %d bytes",ArraySize(rdata)); // ファイルからシンボルと期間を取得しました string smb=CharArrayToString(rdata,0,10); string tf=CharArrayToString(rdata,10,10); // テンプレートファイルを保存しました int h=FileOpen("tcprecv.tpl", FILE_WRITE|FILE_SHARE_WRITE|FILE_BIN); if(h<=0) return; FileWriteArray(h,rdata,20); FileClose(h); // チャート上に適用しました ChartSetSymbolPeriod(0,smb,(ENUM_TIMEFRAMES)StringToInteger(tf)); ChartApplyTemplate(0,"\\Files\\tcprecv.tpl"); } //------------------------------------------------------------------ CloseClean void CloseClean() // close socket { if(client!=INVALID_SOCKET) { if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-Shutdown failed error: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("connect closed"); }
これらのコードの動作のデモンストレーション:
注意深い読者は、クライアントソケットはWebRequest関数のMQLの呼び出しで置き換えることができるということに気付いたと思います。これを行うには、サーバにHTTPヘッダーの文字列をいくつか追加し、クライアントターミナルの設定で許可するウェブアドレスを追加します。貴方自身でこれらを検証することができます。
重要!いくつかのケースでは、ターミナルの特別な動作が確認されています。WSACleanup 関数の呼び出し時に、MetaTraderは自分の接続を終了します。
もし検証中にこのような問題が起こった場合、コード内でWSAStartupとWSACleanupをコメントしてください。
例2. シンボルによる取引の同期
この例では、サーバは情報の送信時に接続を中断しません。クライアントの接続はしっかりと維持されます。また、サーバ上の任意の取引の変化のデータも、こちらでクライアントソケットを介して送信されます。次に、新しいデータを受信するクライアントは、ここで自分のポジションを受信したサーバのポジションと同期します。
前の例のサーバとクライアントのコードをベースにします。ポジションを使用する関数を追加します。
サーバから始めましょう。
//+------------------------------------------------------------------+ //| SignalServer | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" input string Host="0.0.0.0"; input ushort Port=8081; bool bChangeTrades; uchar data[]; SOCKET server=INVALID_SOCKET; SOCKET conns[]; //------------------------------------------------------------------ OnInit int OnInit() { OnTrade(); EventSetTimer(1); return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnTrade void OnTrade() { double lot=GetSymbolLot(Symbol()); StringToCharArray("<<"+Symbol()+"|"+DoubleToString(lot,2)+">>",data); // 文字列をバイト配列に変換します bChangeTrades=true; } //------------------------------------------------------------------ OnTimer void OnTimer() { if(server==INVALID_SOCKET) StartServer(Host,Port); else { AcceptClients(); // 保留中のクライアントを追加しました if(bChangeTrades) { Print("send new posinfo to clients"); Send(); bChangeTrades=false; } } } //------------------------------------------------------------------ StartServer void StartServer(string addr,ushort port) { // ライブラリを初期化します char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します server=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(server==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // アドレスとポートへバインドします Print("try bind..."+addr+":"+string(port)); char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-Connect failed error: "+WSAErrorDescript(err)+". Cleanup socket"); CloseClean(); return; } } // 非ブロックモードにします int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } // ポートを聞き、クライアントの接続を許可します if(listen(server,SOMAXCONN)==SOCKET_ERROR) { Print("Listen failed with error: ",WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } Print("start server ok"); } //------------------------------------------------------------------ Accept void AcceptClients() // Accept a client socket { if(server==INVALID_SOCKET) return; // 全ての待機中のクライアントを追加します SOCKET client=INVALID_SOCKET; do { ref_sockaddr ch; int len=sizeof(ref_sockaddr); client=accept(server,ch.ref,len); if(client==INVALID_SOCKET) { int err=WSAGetLastError(); if(err==WSAEWOULDBLOCK) Comment("\nWAITING CLIENT ("+string(TimeCurrent())+")"); else { Print("Accept failed with error: ",WSAErrorDescript(err)); CloseClean(); } return; } // 非ブロックモードにします int non_block=1; int res=ioctlsocket(client, (int)FIONBIO, non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); continue; } // クライアントソケットを配列に追加します int n=ArraySize(conns); ArrayResize(conns,n+1); conns[n]=client; bChangeTrades=true; // ポジションに関する情報を送信する必要があるというフラグを立てます // クライアントについての情報を表示しました char ipstr[23]={0}; sockaddr_in aclient=(sockaddr_in)ch; //接続に関する詳細情報を取得するには構造体に変換します inet_ntop(aclient.sin_family,aclient.sin_addr,ipstr,sizeof(ipstr)); // アドレスを認知しました printf("Accept new client %s : %d",CharArrayToString(ipstr),ntohs(aclient.sin_port)); } while(client!=INVALID_SOCKET); } //------------------------------------------------------------------ SendClient void Send() { int len=ArraySize(data); for(int i=ArraySize(conns)-1; i>=0; --i) // クライアントに情報を送信します { if(conns[i]==INVALID_SOCKET) continue; // 終了をスキップ int res=send(conns[i],data,len,0); // 送信します if(res==SOCKET_ERROR) { Print("-Send failed error: "+WSAErrorDescript(WSAGetLastError())+". close socket"); Close(conns[i]); } } } //------------------------------------------------------------------ CloseClean void CloseClean() // 動作を終了しクリアします { printf("Shutdown server and %d connections",ArraySize(conns)); if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } // サーバを終了します for(int i=ArraySize(conns)-1; i>=0; --i) Close(conns[i]); // クライアントを閉じます ArrayResize(conns,0); WSACleanup(); } //------------------------------------------------------------------ Close void Close(SOCKET &asock) // 1つのソケットを閉じます { if(asock==INVALID_SOCKET) return; if(shutdown(asock,SD_BOTH)==SOCKET_ERROR) Print("-Shutdown failed error: "+WSAErrorDescript(WSAGetLastError())); closesocket(asock); asock=INVALID_SOCKET; } //------------------------------------------------------------------ GetSymbolLot double GetSymbolLot(string smb) { double slot=0; int n=PositionsTotal(); for(int i=0; i<n;++i) { PositionSelectByTicket(PositionGetTicket(i)); if(PositionGetString(POSITION_SYMBOL)!=smb) continue; // サーバがある現在のシンボルのポジションをフィルタリングします double lot=PositionGetDouble(POSITION_VOLUME); // ボリュームを取得しました if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_SELL) lot=-lot; // 方向を考慮しました slot+=lot; // 合計を追加しました } return slot; }
1秒に1回サーバの主要なブロックをチェックします(クライアントの接続とそれの全体的な配列への追加→全員に新しいデータを送信)。また、サーバのソケット自体の活動もチェックし、中断の場合にはサーバソケットの再作成が行われます。
クライアントにエキスパートアドバイザが動作しているシンボル名とそのポジション数量を送信します。
取引操作ごとにシンボルと数量をメッセージ形式で送信します。
<<GBPUSD|0.25>>
<<GBPUSD|0.00>>
送信は各取引イベント時および新しいクライアントの接続時に行われます。
クライアントコードは、常に接続をサポートする必要があるので、すでにエキスパートアドバイザとして実行されています。クライアントはサーバから新しいデータを受信し、それを既存のものに追加します。次に、クライアントは始めの兆候<< とメッセージの終わり>>を探し、それを解析し、指定されたシンボルで自分の数量をサーバのものまで調整します。
//+------------------------------------------------------------------+ //| SignalClient | //| programming & development - Alexey Sergeev | //+------------------------------------------------------------------+ #property copyright "© 2006-2016 Alexey Sergeev" #property link "profy.mql@gmail.com" #property version "1.00" #include "SocketLib.mqh" #include <Trade\Trade.mqh> input string Host="127.0.0.1"; input ushort Port=8081; SOCKET client=INVALID_SOCKET; // クライアントソケット string msg=""; // メッセージ受信のキュー //------------------------------------------------------------------ OnInit int OnInit() { if(AccountInfoInteger(ACCOUNT_MARGIN_MODE)==ACCOUNT_MARGIN_MODE_RETAIL_HEDGING) { Alert("Client work only with Netting accounts"); return INIT_FAILED; } EventSetTimer(1); return INIT_SUCCEEDED; } //------------------------------------------------------------------ OnInit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnInit void OnTimer() { if(client==INVALID_SOCKET) StartClient(Host,Port); else { uchar data[]; if(Receive(data)>0) // データを受信 { msg+=CharArrayToString(data); // 何か受信した場合、総文字列に追加します printf("received msg from server: %s",msg); } CheckMessage(); } } //------------------------------------------------------------------ CloseClean void StartClient(string addr,ushort port) { // ライブラリを初期化します int res=0; char wsaData[]; ArrayResize(wsaData, sizeof(WSAData)); res=WSAStartup(MAKEWORD(2,2), wsaData); if (res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します client=socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); if(client==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // サーバに接続します char ch[]; StringToCharArray(addr,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(port); ref_sockaddr ref=(ref_sockaddr)addrin; res=connect(client,ref.ref,sizeof(addrin)); if(res==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-Connect failed error: "+WSAErrorDescript(err)); CloseClean(); return; } } // 非ブロックモードにします int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } Print("connect OK"); } //------------------------------------------------------------------ Receive int Receive(uchar &rdata[]) // Receive until the peer closes the connection { if(client==INVALID_SOCKET) return 0; // もしソケットがまだ開いていない場合 char rbuf[512]; int rlen=512; int r=0,res=0; do { res=recv(client,rbuf,rlen,0); if(res<0) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-Receive failed error: "+string(err)+" "+WSAErrorDescript(err)); CloseClean(); return -1; } break; } if(res==0 && r==0) { Print("-Receive. connection closed"); CloseClean(); return -1; } r+=res; ArrayCopy(rdata,rbuf,ArraySize(rdata),0,res); } while(res>0 && res>=rlen); return r; } //------------------------------------------------------------------ CloseClean void CloseClean() // close socket { if(client!=INVALID_SOCKET) { if(shutdown(client,SD_BOTH)==SOCKET_ERROR) Print("-Shutdown failed error: "+WSAErrorDescript(WSAGetLastError())); closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("close socket"); } //------------------------------------------------------------------ CheckMessage void CheckMessage() { string pos; while(FindNextPos(pos)) { printf("server position: %s",pos); }; // サーバの最新の変更を取得しました if(StringLen(pos)<=0) return; // メッセージからデータを取得しました string res[]; if(StringSplit(pos,'|',res)!=2) { printf("-wrong pos info: %s",pos); return; } string smb=res[0]; double lot=NormalizeDouble(StringToDouble(res[1]),2); // 数量を同期します if(!SyncSymbolLot(smb,lot)) msg="<<"+pos+">>"+msg; // エラーの場合、メッセージを『流れ』の始めに返します } //------------------------------------------------------------------ SyncSymbolLot bool SyncSymbolLot(string smb,double nlot) { // サーバとクライアントの数量を同期します CTrade trade; double clot=GetSymbolLot(smb); // シンボルの現在のロットを取得しました if(clot==nlot) { Print("nothing change"); return true; } // 数量が等しい場合、何も行いません // 初めにサーバ上にポジションがない特殊なケースをチェックしました if(nlot==0 && clot!=0) { Print("full close position"); return trade.PositionClose(smb); } // サーバ上にポジションがある場合、クライアント側で変更します double dif=NormalizeDouble(nlot-clot,2); // 差に応じて買いまたは売りを行います if(dif>0) { Print("add Buy position"); return trade.Buy(dif,smb); } else { Print("add Sell position"); return trade.Sell(-dif,smb); } } //------------------------------------------------------------------ FindNextPos bool FindNextPos(string &pos) { int b=StringFind(msg, "<<"); if(b<0) return false; // メッセージの最初がない int e=StringFind(msg, ">>"); if(e<0) return false; // メッセージの終わりがない pos=StringSubstr(msg,b+2,e-b-2); // 情報ブロックを取得しました msg=StringSubstr(msg,e+2); // これをメッセージから削除しました return true; } //------------------------------------------------------------------ GetSymbolLot double GetSymbolLot(string smb) { double slot=0; int n=PositionsTotal(); for(int i=0; i<n;++i) { PositionSelectByTicket(PositionGetTicket(i)); if(PositionGetString(POSITION_SYMBOL)!=smb) continue; // サーバがある現在のシンボルのポジションをフィルタリングします double lot=PositionGetDouble(POSITION_VOLUME); // ボリュームを取得しました if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_SELL) lot=-lot; // 方向を考慮しました slot+=lot; // 合計を追加しました } return NormalizeDouble(slot,2); }
そして最後に、サーバとクライアントの動作をペアでデモンストレーションします。
例3. ティックコレクター
この例はUDPソケットをデモンストレーションします。ここではサーバはクライアントからシンボルのデータを待ちます。
クライアントについての情報を保存したり、クライアントの接続を待つ必要がないため、サーバコードはとても単純なものです。ソケットでのデータのチェックは、ミリ秒タイマーを使用して少し速めます。
input string Host="0.0.0.0"; input ushort Port=8082; SOCKET server=INVALID_SOCKET; //------------------------------------------------------------------ OnInit int OnInit() { EventSetMillisecondTimer(300); return 0; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { EventKillTimer(); CloseClean(); } //------------------------------------------------------------------ OnTimer void OnTimer() { if(server!=INVALID_SOCKET) { char buf[1024]={0}; ref_sockaddr ref={0}; int len=ArraySize(ref.ref); int res=recvfrom(server,buf,1024,0,ref.ref,len); if (res>=0) // データを受信し表示します Print("receive tick from client: ", CharArrayToString(buf)); else { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-receive failed error: "+WSAErrorDescript(err)+". Cleanup socket"); CloseClean(); return; } } } else // そうでない場合サーバを起動します { // ライブラリを初期化します char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); int res=WSAStartup(MAKEWORD(2,2), wsaData); if(res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します server=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); if(server==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // アドレスとポートへバインドします Print("try bind..."+Host+":"+string(Port)); char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); ref_sockaddr ref=(ref_sockaddr)addrin; if(bind(server,ref.ref,sizeof(addrin))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEISCONN) { Print("-Connect failed error: "+WSAErrorDescript(err)+". Cleanup socket"); CloseClean(); return; } } // 非ブロックモードにします int non_block=1; res=ioctlsocket(server,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } Print("start server ok"); } } //------------------------------------------------------------------ CloseClean void CloseClean() // 動作を終了しクリアします { printf("Shutdown server"); if(server!=INVALID_SOCKET) { closesocket(server); server=INVALID_SOCKET; } // サーバを終了します WSACleanup(); }
クライアントコードも単純なものです。全ての動作はティック受信のイベントで起こります。
input string Host="127.0.0.1"; input ushort Port=8082; SOCKET client=INVALID_SOCKET; // クライアントソケット ref_sockaddr srvaddr={0}; // サーバへの接続の為の構造 //------------------------------------------------------------------ OnInit int OnInit() { // サーバの為に構造を記入します char ch[]; StringToCharArray(Host,ch); sockaddr_in addrin; addrin.sin_family=AF_INET; addrin.sin_addr=inet_addr(ch); addrin.sin_port=htons(Port); srvaddr=(ref_sockaddr)addrin; OnTick(); // すぐにソケットを作成します return INIT_SUCCEEDED; } //------------------------------------------------------------------ OnDeinit void OnDeinit(const int reason) { CloseClean(); } //------------------------------------------------------------------ OnTick void OnTick() { if(client!=INVALID_SOCKET) // ソケットがすでに作成されている場合、送信します { uchar data[]; StringToCharArray(Symbol()+" "+DoubleToString(SymbolInfoDouble(Symbol(),SYMBOL_BID),Digits()),data); if(sendto(client,data,ArraySize(data),0,srvaddr.ref,ArraySize(srvaddr.ref))==SOCKET_ERROR) { int err=WSAGetLastError(); if(err!=WSAEWOULDBLOCK) { Print("-Send failed error: "+WSAErrorDescript(err)); CloseClean(); } } else Print("send "+Symbol()+" tick to server"); } else // クライアントソケットを作成します { int res=0; char wsaData[]; ArrayResize(wsaData,sizeof(WSAData)); res=WSAStartup(MAKEWORD(2,2),wsaData); if(res!=0) { Print("-WSAStartup failed error: "+string(res)); return; } // ソケットを作成します client=socket(AF_INET,SOCK_DGRAM,IPPROTO_UDP); if(client==INVALID_SOCKET) { Print("-Create failed error: "+WSAErrorDescript(WSAGetLastError())); CloseClean(); return; } // 非ブロックモードにします int non_block=1; res=ioctlsocket(client,(int)FIONBIO,non_block); if(res!=NO_ERROR) { Print("ioctlsocket failed error: "+string(res)); CloseClean(); return; } Print("create socket OK"); } } //------------------------------------------------------------------ CloseClean void CloseClean() // close socket { if(client!=INVALID_SOCKET) { closesocket(client); client=INVALID_SOCKET; } WSACleanup(); Print("close socket"); }
ここで、最終的な動作をデモンストレーションします。
3. これからのサーバ強化の方法
任意のクライアントに情報を送信するサーバのこれらの例は最適でないことがわかります。貴方はきっと貴方の情報へのアクセスを制限したくなった場合には、少なくとも必須要件に以下のものを含める必要があります。
- クライアント認証(ログイン/パスワード)
- パスワード推測からの保護(禁止/ログインまたはIPのブロック)
また、サーバの全ての動作は一つのスレッド内(一つのエキスパートアドバイザのタイマーで)で実行されることに気付いたと思います。これは膨大な接続や情報量の時に重要になります。したがって、サーバの最適化をする為には、少なくともクライアント接続との相互作用が起こるエキスパートアドバイザのプール(それぞれ自分のタイマーを持つ)を追加する必要があります。これはある程度サーバをマルチスレッドにします。
MQLでこれを行うかを決めるのは貴方です。それを行う為の他の方法があって、そちらの方がより便利かもしれません。しかし、MQLは取引口座や相場への直接のアクセスという利点があります。これはサードパーティのDLLを使用しないMQLコードの開放性という紛れもない事実です。
まとめ
他にはどのようにMetaTraderにソケットを適用できるでしょうか?記事の執筆前、私にはいくつかの検証アイディアがありました。
- 市場のセンチメント指標(接続しているクライアントが自分のポジションの数量を送信し、全クライアントから受信した総数量として応答を取得する)
- または、例えば、サーバからのインディケータの計算をクライアントへ送信する(購読で)
- または反対に、クライアントが難しい計算の手伝いを行う(テスターエージェントネットワーク)
- サーバをクライアント間のデータ交換の為の『プロキシ』にする
いろんなバージョンを考えることができます。貴方にもこういったアイディアが浮かびましたら、記事のコメント欄に書いてください。それが面白いものなら、一緒に実装することもできます。
貴方に成功と利益がもたらされますように!