communication via named pipe stops / freezes after some time - Need help!

 

Dear all,

I have set up a C++ named pipe server and a client on MT5 based on example on the documentations on this site. Also I have set the client on MT5 terminal send a message to the server  every second. This works fine.

Problems is that it stops functioning after sending about 15 messages or something. I tried different coding for the client. But no change in the behavior. Is this because some limit imposed by MT5? Can you pls help me to correct this?

(Note:- I created a client with C++. It keeps communicating with the same named pipe server to which client on MT5 connected previously. )

Thanks in advance

 
Correct what...you have not provided any useful information, log or code.
 
Alain Verleyen #:
Correct what...you have not provided any useful information, log or code.

Attached below  are codes I have used for client and server and the header file.

While sending messages to the sever client has not generated any error( errorcode =0).

Server stops getting messages after few messages as shown below. 

Please help me on this!

Messages received at server end.


Client Code

#include    <CNamedPipes.mqh>

CNamedPipe  pipe;
MqlTick     last_tick;
string      outMessage, inMessage;
string      PipeName="mynamedpipe";

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+
int OnInit()
  {
      EventSetMillisecondTimer(500);
      
      //----Create Named Pipe----------------------------------------
      bool     pipeCreated=false;
      bool     pipeOpened=false;
      
      while(true)
        {
            pipeCreated=pipe.Open(PipeName);
            if(pipeCreated) break;
            Sleep(500);
        }
         
      if(pipeCreated || pipeOpened)return(INIT_SUCCEEDED);
      else return (INIT_FAILED);
  }
  
 void OnTick()
   {
      ResetLastError();
      pipe.WriteANSI("Signal:" + rand());
      Alert(GetLastError());
      Sleep(500);
   }
//+------------------------------------------------------------------+
//| Expert deinitialization function                                 |
//+------------------------------------------------------------------+
void OnDeinit(const int reason)
  {
      pipe.Close();
      EventKillTimer();
  }


Server code!

#include <windows.h> 
#include <stdio.h> 
#include <tchar.h>
#include <strsafe.h>

#define BUFSIZE 512
 
DWORD WINAPI InstanceThread(LPVOID); 
VOID GetAnswerToRequest(LPTSTR, LPTSTR, LPDWORD); 
 
int _tmain(VOID) 
{ 
   BOOL   fConnected = FALSE; 
   DWORD  dwThreadId = 0; 
   HANDLE hPipe = INVALID_HANDLE_VALUE, hThread = NULL; 
   LPCTSTR lpszPipename = TEXT("\\\\.\\pipe\\mynamedpipe"); 
 
// The main loop creates an instance of the named pipe and 
// then waits for a client to connect to it. When the client 
// connects, a thread is created to handle communications 
// with that client, and this loop is free to wait for the
// next client connect request. It is an infinite loop.
 
   for (;;) 
   { 
      _tprintf( TEXT("\nPipe Server: Main thread awaiting client connection on %s\n"), lpszPipename);
      hPipe = CreateNamedPipe( 
          lpszPipename,             // pipe name 
          PIPE_ACCESS_DUPLEX,       // read/write access 
          PIPE_TYPE_MESSAGE |       // message type pipe 
          PIPE_READMODE_MESSAGE |   // message-read mode 
          PIPE_WAIT,                // blocking mode 
          PIPE_UNLIMITED_INSTANCES, // max. instances  
          BUFSIZE,                  // output buffer size 
          BUFSIZE,                  // input buffer size 
          0,                        // client time-out 
          NULL);                    // default security attribute 

      if (hPipe == INVALID_HANDLE_VALUE) 
      {
          _tprintf(TEXT("CreateNamedPipe failed, GLE=%d.\n"), GetLastError()); 
          return -1;
      }
 
      // Wait for the client to connect; if it succeeds, 
      // the function returns a nonzero value. If the function
      // returns zero, GetLastError returns ERROR_PIPE_CONNECTED. 
 
      fConnected = ConnectNamedPipe(hPipe, NULL) ? 
         TRUE : (GetLastError() == ERROR_PIPE_CONNECTED); 
 
      if (fConnected) 
      { 
         printf("Client connected, creating a processing thread.\n"); 
      
         // Create a thread for this client. 
         hThread = CreateThread( 
            NULL,              // no security attribute 
            0,                 // default stack size 
            InstanceThread,    // thread proc
            (LPVOID) hPipe,    // thread parameter 
            0,                 // not suspended 
            &dwThreadId);      // returns thread ID 

         if (hThread == NULL) 
         {
            _tprintf(TEXT("CreateThread failed, GLE=%d.\n"), GetLastError()); 
            return -1;
         }
         else CloseHandle(hThread); 
       } 
      else 
        // The client could not connect, so close the pipe. 
         CloseHandle(hPipe); 
   } 

   return 0; 
} 
 
DWORD WINAPI InstanceThread(LPVOID lpvParam)
// This routine is a thread processing function to read from and reply to a client
// via the open pipe connection passed from the main loop. Note this allows
// the main loop to continue executing, potentially creating more threads of
// of this procedure to run concurrently, depending on the number of incoming
// client connections.
{ 
   HANDLE hHeap      = GetProcessHeap();
   TCHAR* pchRequest = (TCHAR*)HeapAlloc(hHeap, 0, BUFSIZE*sizeof(TCHAR));
   TCHAR* pchReply   = (TCHAR*)HeapAlloc(hHeap, 0, BUFSIZE*sizeof(TCHAR));

   DWORD cbBytesRead = 0, cbReplyBytes = 0, cbWritten = 0; 
   BOOL fSuccess = FALSE;
   HANDLE hPipe  = NULL;

   // Do some extra error checking since the app will keep running even if this
   // thread fails.

   if (lpvParam == NULL)
   {
       printf( "\nERROR - Pipe Server Failure:\n");
       printf( "   InstanceThread got an unexpected NULL value in lpvParam.\n");
       printf( "   InstanceThread exitting.\n");
       if (pchReply != NULL) HeapFree(hHeap, 0, pchReply);
       if (pchRequest != NULL) HeapFree(hHeap, 0, pchRequest);
       return (DWORD)-1;
   }

   if (pchRequest == NULL)
   {
       printf( "\nERROR - Pipe Server Failure:\n");
       printf( "   InstanceThread got an unexpected NULL heap allocation.\n");
       printf( "   InstanceThread exitting.\n");
       if (pchReply != NULL) HeapFree(hHeap, 0, pchReply);
       return (DWORD)-1;
   }

   if (pchReply == NULL)
   {
       printf( "\nERROR - Pipe Server Failure:\n");
       printf( "   InstanceThread got an unexpected NULL heap allocation.\n");
       printf( "   InstanceThread exitting.\n");
       if (pchRequest != NULL) HeapFree(hHeap, 0, pchRequest);
       return (DWORD)-1;
   }

   // Print verbose messages. In production code, this should be for debugging only.
   printf("InstanceThread created, receiving and processing messages.\n");

// The thread's parameter is a handle to a pipe object instance. 
 
   hPipe = (HANDLE) lpvParam; 

// Loop until done reading
   while (1) 
   { 
   // Read client requests from the pipe. This simplistic code only allows messages
   // up to BUFSIZE characters in length.
      fSuccess = ReadFile( 
         hPipe,        // handle to pipe 
         pchRequest,    // buffer to receive data 
         BUFSIZE*sizeof(TCHAR), // size of buffer 
         &cbBytesRead, // number of bytes read 
         NULL);        // not overlapped I/O 

      if (!fSuccess || cbBytesRead == 0)
      {   
          if (GetLastError() == ERROR_BROKEN_PIPE)
          {
              _tprintf(TEXT("InstanceThread: client disconnected.\n")); 
          }
          else
          {
              _tprintf(TEXT("InstanceThread ReadFile failed, GLE=%d.\n"), GetLastError()); 
          }
          break;
      }

   // Process the incoming message.
      GetAnswerToRequest(pchRequest, pchReply, &cbReplyBytes); 
 
   // Write the reply to the pipe. 
      fSuccess = WriteFile( 
         hPipe,        // handle to pipe 
         pchReply,     // buffer to write from 
         cbReplyBytes, // number of bytes to write 
         &cbWritten,   // number of bytes written 
         NULL);        // not overlapped I/O 

      if (!fSuccess || cbReplyBytes != cbWritten)
      {   
          _tprintf(TEXT("InstanceThread WriteFile failed, GLE=%d.\n"), GetLastError()); 
          break;
      }
  }

// Flush the pipe to allow the client to read the pipe's contents 
// before disconnecting. Then disconnect the pipe, and close the 
// handle to this pipe instance. 
 
   FlushFileBuffers(hPipe); 
   DisconnectNamedPipe(hPipe); 
   CloseHandle(hPipe); 

   HeapFree(hHeap, 0, pchRequest);
   HeapFree(hHeap, 0, pchReply);

   printf("InstanceThread exiting.\n");
   return 1;
}

VOID GetAnswerToRequest( LPTSTR pchRequest, 
                         LPTSTR pchReply, 
                         LPDWORD pchBytes )
// This routine is a simple function to print the client request to the console
// and populate the reply buffer with a default data string. This is where you
// would put the actual client request processing code that runs in the context
// of an instance thread. Keep in mind the main thread will continue to wait for
// and receive other client connections while the instance thread is working.
{
    _tprintf( TEXT("Client Request String:\"%s\"\n"), pchRequest );

    // Check the outgoing message to make sure it's not too long for the buffer.
    if (FAILED(StringCchCopy( pchReply, BUFSIZE, TEXT("default answer from server") )))
    {
        *pchBytes = 0;
        pchReply[0] = 0;
        printf("StringCchCopy failed, no outgoing message.\n");
        return;
    }
    *pchBytes = (lstrlen(pchReply)+1)*sizeof(TCHAR);
}


Header code

//+------------------------------------------------------------------+
//|                                                  CNamedPipes.mqh |
//|                                      Copyright 2010, Investeo.pl |
//|                                                http:/Investeo.pl |
//+------------------------------------------------------------------+
#property copyright "Copyright 2010, Investeo.pl"
#property link      "http:/Investeo.pl"
//+------------------------------------------------------------------+
//| defines                                                          |
//+------------------------------------------------------------------+

enum ENUM_PIPE_ACCESS
  {
   PIPE_ACCESS_INBOUND=1,
   PIPE_ACCESS_OUTBOUND=2,
   PIPE_ACCESS_DUPLEX=3,
  };
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
enum ENUM_PIPE_MODE
  {
   PIPE_TYPE_RW_BYTE=0,
   PIPE_TYPE_READ_MESSAGE=2,
   PIPE_TYPE_WRITE_MESSAGE=4,
  };

#define PIPE_WAIT 0
#define PIPE_NOWAIT 1

#define ERROR_PIPE_CONNECTED 535
#define ERROR_BROKEN_PIPE 109

#define INVALID_HANDLE_VALUE -1
#define GENERIC_READ  0x80000000
#define GENERIC_WRITE  0x40000000
#define OPEN_EXISTING  3
#define PIPE_UNLIMITED_INSTANCES 255
#define MQLTICK_SIZE 40
#define PIPE_BUFFER_SIZE 8192
#define STR_SIZE 255

//+------------------------------------------------------------------+
//| DLL imports                                                      |
//+------------------------------------------------------------------+
#import "kernel32.dll"
int CreateNamedPipeW(string pipeName,int openMode,int pipeMode,int maxInstances,int outBufferSize,int inBufferSize,int defaultTimeOut,int security);
int WaitNamedPipeW(string lpNamedPipeName,int nTimeOut);
bool ConnectNamedPipe(int pipeHandle,int overlapped);
bool DisconnectNamedPipe(int pipeHandle);
int CreateFileW(string name,int desiredAccess,int SharedMode,int security,int creation,int flags,int templateFile);
int WriteFile(int fileHandle,short &buffer[],int bytes,int &numOfBytes,int overlapped);
int WriteFile(int fileHandle,char &buffer[],int bytes,int &numOfBytes,int overlapped);
int WriteFile(int fileHandle,MqlTick &outgoing,int bytes,int &numOfBytes,int overlapped);
int WriteFile(int fileHandle,int &var,int bytes,int &numOfBytes,int overlapped);
int ReadFile(int fileHandle,short &buffer[],int bytes,int &numOfBytes,int overlapped);
int ReadFile(int fileHandle,char &buffer[],int bytes,int &numOfBytes,int overlapped);
int ReadFile(int fileHandle,MqlTick &incoming,int bytes,int &numOfBytes,int overlapped);
int ReadFile(int fileHandle,int &incoming,int bytes,int &numOfBytes,int overlapped);
int CloseHandle(int fileHandle);
int GetLastError(void);
int FlushFileBuffers(int pipeHandle);
#import
//+------------------------------------------------------------------+
//|                                                                  |
//+------------------------------------------------------------------+
class CNamedPipe
  {
private:
   int               hPipe; // pipe handle
   string            pipeNumber;
   string            pipeNamePrefix;
   int               BufferSize;

protected:

public:
                     CNamedPipe();
                    ~CNamedPipe();

   bool              Create(string pipeName);
   bool              Connect();
   bool              Disconnect();
   bool              Open(string pipeName);
   int               Close();
   void              Flush();
   int               WriteUnicode(string message);
   string            ReadUnicode();
   int               WriteANSI(string message);
   string            ReadANSI();
   bool              ReadTick();
   bool              WriteTick(MqlTick &outgoing);
   string            GetPipeName();

   MqlTick           incoming;
  };
//+------------------------------------------------------------------+
//| CNamedPipe constructor
//+------------------------------------------------------------------+
CNamedPipe::CNamedPipe(void)
  {
   pipeNamePrefix="\\\\.\\pipe\\";
   BufferSize=PIPE_BUFFER_SIZE;
   hPipe=INVALID_HANDLE_VALUE;
   int err=kernel32::GetLastError();
  }
//+------------------------------------------------------------------+
//| CNamedPipe destructor
//+------------------------------------------------------------------+
CNamedPipe::~CNamedPipe(void)
  {
   if(hPipe!=INVALID_HANDLE_VALUE)
      CloseHandle(hPipe);

  }
//+------------------------------------------------------------------+
/// Create() : try to create a new instance of Named Pipe
/// \param account - source terminal account number  
/// \return true - if created, false otherwise
//+------------------------------------------------------------------+
bool CNamedPipe::Create(string pipeName)
  {
//   if(account==0)
//      pipeNumber=IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN));
//   else
//      pipeNumber=IntegerToString(account);
//
   string fullPipeName=pipeNamePrefix+pipeName;

   hPipe=CreateNamedPipeW(fullPipeName,
                          (int)GENERIC_READ|GENERIC_WRITE|(ENUM_PIPE_ACCESS)PIPE_ACCESS_DUPLEX,
                          (ENUM_PIPE_MODE)PIPE_TYPE_RW_BYTE,PIPE_UNLIMITED_INSTANCES,
                          BufferSize*sizeof(ushort),BufferSize*sizeof(ushort),0,NULL);

   if(hPipe==INVALID_HANDLE_VALUE) return false;
   else
      return true;

  }
//+------------------------------------------------------------------+
/// Connect() : wait for a client to connect to a pipe               
/// \return true - if connected, false otherwise.
//+------------------------------------------------------------------+
bool CNamedPipe::Connect(void)
  {
   if(ConnectNamedPipe(hPipe,NULL)==false)
      return(kernel32::GetLastError()==ERROR_PIPE_CONNECTED);
   else return true;
  }
//+------------------------------------------------------------------+
/// Disconnect(): disconnect from a pipe
/// \return true - if disconnected, false otherwise    
//+------------------------------------------------------------------+
bool CNamedPipe::Disconnect(void)
  {
   return DisconnectNamedPipe(hPipe);
  }
//+------------------------------------------------------------------+
/// Open() : try to open previously created pipe
/// \param account - source terminal account number
/// \return true - if successfull, false otherwise
//+------------------------------------------------------------------+
bool CNamedPipe::Open(string pipeName)
  {
//   if(account==0)
//      pipeNumber=IntegerToString(AccountInfoInteger(ACCOUNT_LOGIN));
//   else
//      pipeNumber=IntegerToString(account);
//
//   string fullPipeName=pipeNamePrefix+pipeNumber;
   string fullPipeName=pipeNamePrefix+pipeName;

   if(hPipe==INVALID_HANDLE_VALUE)
     {
      if(WaitNamedPipeW(fullPipeName,5000)==0)
        {
         Print("Pipe "+fullPipeName+" busy.");
         return false;
        }

      hPipe=CreateFileW(fullPipeName,(int)GENERIC_READ|GENERIC_WRITE,0,NULL,OPEN_EXISTING,0,NULL);
      if(hPipe==INVALID_HANDLE_VALUE)
        {
         Print("Pipe open failed");
         return false;
        }

     }
   return true;
  }
//+------------------------------------------------------------------+
/// Close() : close pipe handle
/// \return 0 if successfull, non-zero otherwise  
//+------------------------------------------------------------------+
int CNamedPipe::Close(void)
  {
   return CloseHandle(hPipe);
  }
  
//+------------------------------------------------------------------+
/// WriteUnicode() : write unicode string to a pipe
/// \param message - string to send
/// \return number of bytes written to a pipe                                                                  |
//+------------------------------------------------------------------+
int CNamedPipe::WriteUnicode(string message)
  {
   int ushortsToWrite, bytesWritten;
   ushort UNICODEarray[];
   ushortsToWrite = StringToShortArray(message, UNICODEarray);
   WriteFile(hPipe,ushortsToWrite,sizeof(int),bytesWritten,0);
   WriteFile(hPipe,UNICODEarray,ushortsToWrite*sizeof(ushort),bytesWritten,0);
   return bytesWritten;
  }
//+------------------------------------------------------------------+
/// ReadUnicode(): read unicode string from a pipe
/// \return unicode string (MQL5 string)
//+------------------------------------------------------------------+
string CNamedPipe::ReadUnicode(void)
  {
   string ret;
   ushort UNICODEarray[STR_SIZE*sizeof(uint)];
   int bytesRead, ushortsToRead;
 
   ReadFile(hPipe,ushortsToRead,sizeof(int),bytesRead,0);
   ReadFile(hPipe,UNICODEarray,ushortsToRead*sizeof(ushort),bytesRead,0);
   if(bytesRead!=0)
      ret = ShortArrayToString(UNICODEarray);
   
   return ret;
  }
//+------------------------------------------------------------------+
/// WriteANSI() : write ANSI string to a pipe
/// \param message - string to send
/// \return number of bytes written to a pipe                                                                  |
//+------------------------------------------------------------------+
int CNamedPipe::WriteANSI(string message)
  {
   int bytesToWrite, bytesWritten;
   uchar ANSIarray[];
   bytesToWrite = StringToCharArray(message, ANSIarray);
   WriteFile(hPipe,bytesToWrite,sizeof(int),bytesWritten,0);
   WriteFile(hPipe,ANSIarray,bytesToWrite,bytesWritten,0);
   return bytesWritten;
  }
//+------------------------------------------------------------------+
/// ReadANSI(): read ANSI string from a pipe
/// \return unicode string (MQL5 string)
//+------------------------------------------------------------------+
string CNamedPipe::ReadANSI(void)
  {
   string ret;
   uchar ANSIarray[STR_SIZE];
   int bytesRead, bytesToRead;
 
   ReadFile(hPipe,bytesToRead,sizeof(int),bytesRead,0);
   ReadFile(hPipe,ANSIarray,bytesToRead,bytesRead,0);
   if(bytesRead!=0)
      ret = CharArrayToString(ANSIarray);
   
   return ret;
  }
//+------------------------------------------------------------------+
/// WriteTick() : write MqlTick to a pipe
/// \param outgoing - MqlTick to send
/// \return true if tick was written correctly, false otherwise
//+------------------------------------------------------------------+
bool CNamedPipe::WriteTick(MqlTick &outgoing)
  {
   int bytesWritten;

   WriteFile(hPipe,outgoing,MQLTICK_SIZE,bytesWritten,0);

   return(bytesWritten==MQLTICK_SIZE);
  }
//+------------------------------------------------------------------+
/// ReadTick() : read MqlTick from a pipe
/// \return true if tick was read correctly, false otherwise
//+------------------------------------------------------------------+
bool CNamedPipe::ReadTick()
  {
   int bytesRead;

   ReadFile(hPipe,incoming,MQLTICK_SIZE,bytesRead,NULL);

   return(bytesRead==MQLTICK_SIZE);
  }
//+------------------------------------------------------------------+

//+------------------------------------------------------------------+
/// GetPipeName() : return pipe name
/// \return string containing pipe name
//+------------------------------------------------------------------+
string CNamedPipe::GetPipeName(void)
  {
   return pipeNumber;
  }
//+------------------------------------------------------------------+
/// Flush() : flush pipe buffer
//+------------------------------------------------------------------+
void CNamedPipe::Flush(void)
  {
   FlushFileBuffers(hPipe);
  }
//+------------------------------------------------------------------+
 
chandima #:

Attached below  are codes I have used for client and server and the header file.

While sending messages to the sever client has not generated any error( errorcode =0).

Server stops getting messages after few messages as shown below. 

Please help me on this!


Try to debug on the server side, breakpoints or OutputDebugString. I see various issues, time-out, broken output printing, but am not going to test it myself.

 
Jose Alberto Pupo Pascoa #:

Try to debug on the server side, breakpoints or OutputDebugString. I see various issues, but am not going to test it myself.

Hi, 

Thanks a lot for your time on this.

I set up a C++ client and got it connected to the same server to see how the server behaves. The concerned server kept on communicating without a problem. This made me to think that something is wrong with the client on MT5 terminal.

Do you think I still need to check on the server code?

Kind regards.

 
chandima #:

Hi, 

Thanks a lot for your time on this.

I set up a C++ client and got it connected to the same server to see how the server behaves. The concerned server kept on communicating without a problem. This made me to think that something is wrong with the client on MT5 terminal.

Do you think I still need to check on the server code?

Kind regards.

Yes, I think you should. Or remove the Sleep(500) from the client and see if it improves. As is the pipe times out in 50 miliseconds.
 
What about a writing and reading normal files to/from a RAM-Disk using normal file functions of Windows "kernel32.dll"? I think that's easier to handle and almost as fast as the other ways due to the fact that the latency is caused mainly by waiting and not by the speed of the system.
 
Carl Schreiber #:
What about a writing and reading normal files to/from a RAM-Disk using normal file functions of Windows "kernel32.dll"? I think that's easier to handle and almost as fast as the other ways due to the fact that the latency is caused mainly by waiting and not by the speed of the system.
Ok! I'll try that as well! Thanks for supporting me!!
Reason: