English 中文 Deutsch 日本語
preview
Разработка инструментария для анализа Price Action (Часть 35): Обучение и развертывание прогнозных моделей

Разработка инструментария для анализа Price Action (Часть 35): Обучение и развертывание прогнозных моделей

MetaTrader 5Торговые системы |
324 1
Christian Benjamin
Christian Benjamin

Введение

В предыдущей статье мы построили надежный пайплайн для потоковой передачи исторических данных из скрипта MQL5 в Python и их сохранения на диске. В той статье мы намеренно остановились на уровне приема данных: показали, что рыночные бары можно получать, сериализовать и затем повторно считывать с диска, но до обучения модели не дошли.

Здесь мы продолжим ровно с того места, на котором остановились. Теперь мы выйдем за рамки простого хранения данных и покажем, как:

  • обучать прогностические модели на принимаемых данных;
  • упаковывать и кэшировать эти модели по символам;
  • разворачивать их через облегченный REST API, к которому советник MQL5 может обращаться в реальном времени.

Для этого мы объединяем сильные стороны Python-экосистемы машинного обучения со скоростью исполнения MetaTrader 5. Советник взаимодействует с рынком, а сервис на Python отвечает за создание признаков, инференс (получение предсказаний) модели и, при необходимости, периодическое переобучение.

"Обучаемая модель" в этом контексте – это любой алгоритм, внутренние параметры которого можно оптимизировать по данным. Классические методы (через scikit-learn), такие как градиентный бустинг или метод опорных векторов, подходят для табличных наборов признаков, тогда как фреймворки глубокого обучения (TensorFlow, PyTorch) поддерживают более сложные архитектуры, когда это необходимо. Благодаря обширному набору библиотек, понятному синтаксису и активному сообществу Python становится естественным выбором для этого этапа пайплайна.

В таблице ниже кратко показано, за что отвечает каждый компонент готовой системы:

Компонент Роль в процессе
Советник MQL5 Собирает бары в реальном времени и состояние счета; отправляет запросы на расчет признаков; исполняет торговые сигналы, возвращенные API.
Python-скрипт загрузки данных Получает исторические чанки из MetaTrader 5, очищает и сохраняет их в формате Parquet.
Модуль создания признаков
Преобразует исходные данные OHLC в технические и статистические признаки.
Модуль обучения Обучает или обновляет модели по символам; сериализует их с помощью joblib.
REST-сервисы на Flask Обрабатывает /predict, /upload_history и т.д.; управляет кэшем моделей в памяти, чтобы обеспечивать ответ за миллисекунды.

Статья построена следующим образом:

Приступим.


Краткий обзор пайплайна приема данных

Как было показано в предыдущей статье, наш скрипт приема истории упрощает весь процесс передачи данных из MetaTrader 5 в Python: сначала он получает нужные исторические бары через CopyRates, затем раскладывает временные метки, максимумы, минимумы и цены закрытия по массивам, а после этого собирает каждую порцию в JSON-пакет с помощью BuildJSON. Чтобы не выйти за пределы ограничений WebRequest по размеру в MetaTrader 5, скрипт автоматически разбивает данные на чанки приемлемого размера, при необходимости уменьшая их объем вдвое вплоть до заданного минимума, и отправляет каждую порцию на наш Python-эндпоинт через PostChunk - с повторными попытками и контролем таймаута. По ходу работы он пишет в лог вкладки "Эксперты" каждый шаг и каждую ошибку, корректно завершает работу при сбое и подтверждает завершение после полной загрузки данных, создавая надежную основу для нашего пайплайна обнаружения всплесков.

Рассмотрим диаграмму ниже и разберем функции в скрипте MQL5.

На стороне Python пайплайн приема данных состоит из четырех ключевых компонентов: HTTP Receiver (upload_history) разбирает каждый JSON-чанк, отправленный из скрипта MQL5, извлекая символы, временные метки и ценовые данные; модуль обогащения признаков Feature Enricher (prophet_delta при поддержке compile_prophet) поддерживает кэшированную модель Prophet для каждого символа, чтобы на лету генерировать дельты прогноза; модуль расчета признаков Feature Calculator (внутри upload_history) вычисляет набор технических метрик – величину всплеска, MACD, RSI, ATR, полосы индикатора Envelopes, наклон по Калману и т.д. – и присваивает метку "BUY/SELL/WAIT"; наконец, модуль сохранения данных Data Persister (append_rows) записывает эти обогащенные и размеченные строки в training_set.csv, создавая файл и заголовок, если их еще нет.

Каждый раз, когда скрипт MQL5 отправляет чанк исторических баров, обработчик upload_history на стороне Python вычисляет все технические признаки и метки, а затем вызывает append_rows, чтобы записать эти данные в training_set.csv, создавая файл и заголовок, если их еще нет. С каждой новой загрузкой формируется полный набор данных с временными метками, готовый для обучения модели. Именно training_set.csv мы и будем использовать для обучения нашей модели обнаружения всплесков.


Реализация на MQL5 и Python

В этой статье мы переходим от простого скрипта к полноценному советнику, чтобы обеспечить непрерывный мониторинг и связь в реальном времени с Python-бэкендом – то, с чем отдельный скрипт не справляется достаточно эффективно. Советник Spike Detector в MetaTrader 5 работает в клиент-серверной архитектуре: он выступает клиентом, а сервер Flask на Python – бэкендом. Советник постоянно отслеживает формирование новых свечей. Через заданные интервалы он собирает заданное количество исторических свечей (данные OHLCV и временные метки), сериализует их в JSON и отправляет на Python-сервер с помощью HTTP POST-запроса.

Python-бэкенд, который обычно содержит либо модель машинного обучения, либо логику на основе правил, анализирует входящие рыночные данные и возвращает сигнал: BUY, SELL, CLOSE или WAIT. Получив этот ответ, советник интерпретирует сигнал и действует согласно настройкам пользователя: рисует стрелки на графике, открывает сделки или закрывает существующие позиции. Такой цикл обратной связи позволяет MetaTrader в реальном времени расширять свои встроенные возможности за счет внешней аналитики, фактически объединяя движок исполнения MetaTrader 5 с вычислительной мощностью Python.

Реализация на MQL5

Метаданные скрипта и строгий режим

В самом верху файла MQL5 объявляются свойства метаданных – такие как #property copyright, #property link и #property version, – чтобы встроить сведения об авторстве и версии прямо в скомпилированный советник. Включение #property strict задает максимально строгие проверки на этапе компиляции, помогает заранее выявлять синтаксические ошибки и ошибки типов и гарантирует, что код соответствует лучшим практикам.

#property copyright "Copyright 2025, MetaQuotes Ltd."
#property link      "https://www.mql5.com/ru/users/lynnchris"
#property version   "1.0"
#property strict

Подключение торговой библиотеки

Подключив <Trade\Trade.mqh> и создав объект CTrade, вы получаете доступ к встроенному API MetaTrader для управления торговлей. Эта встроенная библиотека предоставляет методы для работы с рыночными и стоп-ордерами, закрытия позиций и других ключевых операций, поэтому сделки можно открывать, изменять и закрывать программно в ответ на сигналы сервера.

#include <Trade\Trade.mqh>
static CTrade trade;

Определение входных параметров

Все настраиваемые пользователем параметры – от URL REST-эндпоинта (InpServerURL) и количества отправляемых баров (InpBufferBars) до параметров разметки графика и флагов исполнения сделок – заранее объявляются через input. У каждого параметра есть встроенный комментарий с пояснением его назначения, поэтому советник остается самодокументируемым, а трейдер может тонко настраивать его поведение прямо в интерфейсе MetaTrader 5, не трогая код.

// REST endpoint & polling
input string InpServerURL      = "http://127.0.0.1:5000/analyze";
input int    InpBufferBars     = 200;
input int    MinSecsBetweenReq = 10;

// Visual & trading options
input color  ColorBuy          = clrLime;
input color  ColorSell         = clrRed;
input bool   DrawSLTPLines     = true;
input bool   EnableTrading     = true;
input double FixedLots         = 0.10;

// Debug & retry controls
input int    MaxRetry          = 3;
input bool   DebugPrintJSON    = true;
input bool   DebugPrintReply   = true;

Глобальные переменные состояния

Используется несколько глобальных переменных: lastBarTime и lastReqTime – для ограничения частоты запросов, retryCount – для логики повторных HTTP-запросов, а digits и tickSize – для точного форматирования цен. Строка objPrefix, сформированная на основе ID текущего графика, назначает всем графическим объектам этого советника (стрелкам и линиям) уникальный префикс, чтобы их можно было однозначно идентифицировать и удалить.

datetime lastBarTime = 0;
datetime lastReqTime = 0;
int      retryCount  = 0;
int      _digits;
double   tickSize;
string   objPrefix;

Инициализация в OnInit

Когда советник запускается, один раз вызывается функция OnInit(): функция проверяет входные параметры (например, что запрошено как минимум два бара), кэширует свойства символа (SYMBOL_DIGITS и SYMBOL_POINT) и формирует уникальный префикс объектов. Стартовое сообщение выводит в лог количество баров для отправки и URL целевого сервера, подтверждая, что советник готов начать цикл опроса.

int OnInit()
{
   if(InpBufferBars < 2)
      return INIT_FAILED;

   _digits   = (int)SymbolInfoInteger(_Symbol, SYMBOL_DIGITS);
   tickSize  = SymbolInfoDouble(_Symbol, SYMBOL_POINT);
   objPrefix = StringFormat("SpikeEA_%I64d_", ChartID());

   PrintFormat("[SpikeEA] Initialized: posting %d bars → %s",
               InpBufferBars, InpServerURL);
   return INIT_SUCCEEDED;
}

Очистка в OnDeinit

При удалении советника или завершении работы функция OnDeinit() проходит по всем графическим объектам в обратном порядке и удаляет те, чьи имена начинаются с objPrefix. Это гарантирует, что после отключения советника на графике не останутся случайные стрелки или линии SL/TP, а рабочее пространство останется чистым.

void OnDeinit(const int reason)
{
   for(int i = ObjectsTotal(0) - 1; i >= 0; --i)
   {
      string name = ObjectName(0, i);
      if(StringFind(name, objPrefix) == 0)
         ObjectDelete(0, name);
   }
}

Опрос и формирование полезной нагрузки в OnTick

На каждом тике советник проверяет, сформировался ли новый бар (если включен PollOnNewBarOnly), и следит за тем, чтобы с момента последнего запроса прошло не меньше MinSecsBetweenReq. Затем он получает последние InpBufferBars через CopyRates, разворачивает массив в режим series и вызывает BuildJSON(), чтобы сериализовать цены закрытия и временные метки в JSON-пакет. Если включена отладка, необработанный JSON выводится в лог вкладки "Эксперты" перед отправкой.

void OnTick()
{
   datetime barTime = iTime(_Symbol, _Period, 0);
   if(barTime == lastBarTime) return;
   lastBarTime = barTime;

   if(TimeCurrent() - lastReqTime < MinSecsBetweenReq) return;

   MqlRates rates[];
   if(CopyRates(_Symbol, _Period, 0, InpBufferBars, rates) != InpBufferBars)
      return;
   ArraySetAsSeries(rates, true);

   string payload = BuildJSON(rates);
   if(DebugPrintJSON) PrintFormat("[SpikeEA] >>> %s", payload);

   SServerMsg msg;
   if(CallServer(payload, msg))
      ActOnSignal(msg);

   lastReqTime = TimeCurrent();
}

Формирование JSON в BuildJSON

Вспомогательная функция BuildJSON() принимает массив MqlRates и формирует компактную JSON-строку с именем символа, массивом цен закрытия (с правильным количеством знаков после запятой) и параллельным массивом временных меток в формате UNIX. Экранирование строки нужно для обработки специальных символов в имени инструмента, чтобы на выходе получался корректный JSON.

string BuildJSON(const MqlRates &r[])
{
   string j = StringFormat("{\"symbol\":\"%s\",\"prices\":[", _Symbol);
   for(int i = 0; i < InpBufferBars; i++)
      j += DoubleToString(r[i].close, _digits) + (i+1<InpBufferBars?",":"");
   j += "],\"timestamps\":[";
   for(int i = 0; i < InpBufferBars; i++)
      j += IntegerToString(r[i].time) + (i+1<InpBufferBars?",":"");
   j += "]}";
   return j;
}

Связь с сервером в CallServer

Функция CallServer() преобразует JSON-строку в буфер uchar[], а затем через WebRequest() выполняет HTTP POST на InpServerURL. Если запросы завершаются неудачно, функция обрабатывает таймауты и коды состояния, отличные от 200, повторяя попытку до MaxRetry раз и выводя ошибки. При успехе функция получает необработанный текстовый ответ, при необходимости пишет его в лог и передает в ParseJSONLite() для разбора.

bool CallServer(const string &payload, SServerMsg &out)
{
   uchar body[];
   int len = StringToCharArray(payload, body, 0, WHOLE_ARRAY, CP_UTF8);
   ArrayResize(body, len);

   string hdr = "Content-Type: application/json\r\n";
   uchar reply[]; string resp_hdr;
   int status = WebRequest("POST", InpServerURL, hdr,
                           InpTimeoutMs, body, reply, resp_hdr);

   if(status <= 0)
   {
      PrintFormat("WebRequest error %d (retry %d/%d)",
                  GetLastError(), retryCount+1, MaxRetry);
      ResetLastError();
      if(++retryCount >= MaxRetry) retryCount = 0;
      return false;
   }
   retryCount = 0;

   string resp = CharArrayToString(reply);
   if(DebugPrintReply)
      PrintFormat("[SpikeEA] <<< HTTP %d – %s", status, resp);
   if(status != 200) return false;

   return ParseJSONLite(resp, out);
}

Облегченный разбор JSON в ParseJSONLite

Вместо полноценной JSON-библиотеки ParseJSONLite() использует простой поиск по строке через StringFind, чтобы находить ключевые фрагменты вроде "signal":"BUY" и числовые ключи "conf":, "sl": и "tp":.

bool ParseJSONLite(const string &txt, SServerMsg &o)
{
   o.code = SIG_WAIT; o.conf = o.sl = o.tp = 0.0;

   if(StringFind(txt, "\"signal\":\"BUY\"")   >= 0) o.code = SIG_BUY;
   if(StringFind(txt, "\"signal\":\"SELL\"")  >= 0) o.code = SIG_SELL;
   if(StringFind(txt, "\"signal\":\"CLOSE\"") >= 0) o.code = SIG_CLOSE;

   // extract numeric values
   ParseJSONDouble(txt, "\"conf\":", o.conf);
   ParseJSONDouble(txt, "\"sl\":",   o.sl);
   ParseJSONDouble(txt, "\"tp\":",   o.tp);

   return true;
}
Функция извлекает эти подстроки и преобразует их в структуру SServerMsg, задавая код сигнала советника, значение уверенности, а также уровни стоп-лосса и тейк-профита.
void ParseJSONDouble(const string &txt, const string &key, double &out)
{
   int p = StringFind(txt, key);
   if(p >= 0)
      out = StringToDouble(StringSubstr(txt, p + StringLen(key)));
}

Обработка сигналов в ActOnSignal

Когда поступает новый сигнал, ActOnSignal() сначала удаляет все предыдущие стрелки и линии, имена которых соответствуют objPrefix. Затем функция рисует новую стрелку по текущей цене Bid, выбирая код иконки, цвет и размер в зависимости от типа сигнала, и при необходимости добавляет горизонтальные линии SL и TP с метками. Наконец, если включена реальная торговля, функция использует торговый объект для открытия или закрытия позиций в соответствии с сигналом: Buy(), Sell() или PositionClose().

void ActOnSignal(const SServerMsg &m)
{
   static ESignal last = SIG_WAIT;
   if(m.code == SIG_WAIT || m.code == last) return;
   last = m.code;

   // remove old objects
   for(int i=ObjectsTotal(0)-1;i>=0;--i)
      if(StringFind(ObjectName(0,i),objPrefix)==0)
         ObjectDelete(0,ObjectName(0,i));

   // draw arrow
   int    arrow = (m.code==SIG_BUY ? 233 : m.code==SIG_SELL ? 234 : 158);
   color  clr   = (m.code==SIG_BUY ? ColorBuy : m.code==SIG_SELL ? ColorSell : ColorClose);
   string id    = objPrefix + "Arr_" + TimeToString(TimeCurrent(),TIME_SECONDS);
   double y     = SymbolInfoDouble(_Symbol, SYMBOL_BID);
   if(ObjectCreate(0,id,OBJ_ARROW,0,TimeCurrent(),y))
   {
      ObjectSetInteger(0,id,OBJPROP_ARROWCODE,arrow);
      ObjectSetInteger(0,id,OBJPROP_COLOR,clr);
      ObjectSetInteger(0,id,OBJPROP_WIDTH,ArrowSize);
      PlaySound("alert.wav");
   }

   // draw SL/TP lines
   if(DrawSLTPLines && m.sl>0)
      ObjectCreate(0,objPrefix+"SL_"+id,OBJ_HLINE,0,0,m.sl);
   if(DrawSLTPLines && m.tp>0)
      ObjectCreate(0,objPrefix+"TP_"+id,OBJ_HLINE,0,0,m.tp);

   // execute trade
   if(EnableTrading)
   {
      bool hasPos = PositionSelect(_Symbol);
      if(m.code==SIG_BUY  && !hasPos) trade.Buy(FixedLots,_Symbol,0,m.sl,m.tp);
      if(m.code==SIG_SELL && !hasPos) trade.Sell(FixedLots,_Symbol,0,m.sl,m.tp);
      if(m.code==SIG_CLOSE&&  hasPos) trade.PositionClose(_Symbol,SlippagePoints);
   }
}

Компиляция и развертывание

В завершение вставьте код советника в MetaEditor, сохраните его в разделе "Советники" и нажмите F7. После того как появится "0 errors, 0 warnings", вернитесь в MetaTrader 5, найдите советник в окне "Навигатор", перетащите его на график и настройте параметры во всплывающем окне. После этого вкладки "Эксперты" и "Журнал" будут показывать логи JSON POST-запросов, разобранные сигналы, созданные графические объекты и все торговые операции в реальном времени.

Реализация на Python

Заголовок файла и требования

В самом верху engine.py размещаются строка Unix shebang (#!/usr/bin/env python3) и описательный блок комментариев, который кратко перечисляет возможности бэкенда: прием векторизованной истории, нормализацию CSV, кэширование Prophet, обучение, тестирование на исторических данных и режимы интерфейса командной строки (CLI), а также команду pip install для всех необходимых зависимостей. Этот заголовок не только сразу показывает, что делает скрипт, но и дает разработчику точный список библиотек, необходимых для быстрого запуска системы.

#!/usr/bin/env python3
# engine.py – Boom/Crash/Vol-75 ML back-end
# • vectorised /upload_history
# • /upload_spike_csv
# • Prophet cache (1h)
# • robust CSV writer
# • train() drops bad rows
# • SL/TP with ATR or fallback
# • backtest defaults to 30 days
# • CLI: collect · history · train · backtest · serve · info
#
# REQS: pip install numpy pandas ta prophet cmdstanpy pykalman \
#            scikit-learn flask MetaTrader5 joblib pytz

Параметры, настраиваемые пользователем

Сразу после заголовка определяются константы для параметров входа в терминал (TERM_PATH, LOGIN, PASSWORD, SERVER), а также массив SYMBOLS, который будет обрабатывать система. Также задаются параметры, управляющие окном просмотра для разметки (LOOKAHEAD, THRESH_LABEL), интервалами опроса (STEP_SECONDS), порогами открытия и закрытия сделок (THR_BC_OPEN, THR_O_OPEN, THR_O_CLOSE) и множителями для стоп-лосса и тейк-профита на основе ATR (ATR_PERIOD, SL_MULT, TP_MULT, ATR_FALLBACK_P). Благодаря тому, что все эти значения собраны в одном месте, пользователь может быстро настроить параметры риска стратегии, окна данных и список символов, не погружаясь в логику кода.

TERM_PATH  = r""
LOGIN      = 123456
PASSWORD   = "passwd"
SERVER     = "DemoServer"

SYMBOLS = [
    "Boom 900 Index",  "Crash 1000 Index",
    "Volatility 75 (1s) Index"
]

LOOKAHEAD    = 10       # minutes
THRESH_LABEL = 0.0015   # 0.15 %
STEP_SECONDS = 60       # live collect interval

ATR_PERIOD     = 14
SL_MULT        = 1.0
TP_MULT        = 2.0
ATR_FALLBACK_P = 0.002

Пути к файлам и заголовок CSV

Затем задаются константы файловой системы: BASE_DIR – корневая папка анализа, CSV_FILE – путь к агрегированному обучающему набору данных, MODEL_DIR – каталог для артефактов моделей по символам, а GLOBAL_PKL – файл общей модели. Также определяется CSV_HEADER – фиксированный список имен столбцов, который гарантирует, что каждая записанная строка будет содержать одни и те же 12 полей. Этот раздел задает единую структуру хранения данных и обеспечивает согласованность CSV-файлов, что критически важно для последующего обучения и анализа без лишних сбоев.

BASE_DIR   = r"C:\Analysis EA"
CSV_FILE   = rf"{BASE_DIR}\training_set.csv"
MODEL_DIR  = rf"{BASE_DIR}\models"
GLOBAL_PKL = rf"{MODEL_DIR}\_global.pkl"

CSV_HEADER = [
    "timestamp","symbol","price","spike_mag","macd","rsi",
    "atr","slope","env_low","env_up","delta","label"
]

Импорт и настройка логирования

Мы импортируем стандартные библиотеки (os, sys, time, threading и т.д.), пакеты для анализа данных (numpy, pandas, ta, joblib), модули Prophet и фильтра Калмана, Flask для нашего API и Python-обертку MetaTrader 5. Предупреждения подавляются, чтобы не засорять вывод, а логирование настраивается так, чтобы выводить временные метки, уровни логов и сообщения в удобочитаемом формате. Наконец, мы убеждаемся, что каталог моделей существует, и меняем рабочий каталог на BASE_DIR, чтобы все операции с относительными путями выполнялись из одного известного места.

import os,sys,time,logging,warnings,argparse,threading,io
import datetime as dt
from pathlib import Path
import numpy as np, pandas as pd, ta, joblib, pytz
from flask import Flask, request, jsonify, abort
from prophet import Prophet
from pykalman import KalmanFilter
import MetaTrader5 as mt5

warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO,
                    format="%(asctime)s %(levelname)-7s %(message)s",
                    datefmt="%H:%M:%S")
Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)
os.chdir(BASE_DIR)

Вспомогательные функции инициализации MetaTrader 5

Функция init_mt5() безопасно инициализирует соединение с MetaTrader 5: сначала пробует вызов по умолчанию, а при необходимости выполняет вход по учетным данным; при сбое выполняется корректный выход с сообщением об ошибке. Функция ensure_symbol(sym) – это простая обертка над mt5.symbol_select, которая гарантирует, что каждый инструмент активирован перед запросами данных. Блокировка (_mt5_lock) защищает все многопоточные вызовы к MetaTrader 5 и сохраняет потокобезопасность, когда сервер запускает фоновые задачи.

_mt5_lock = threading.Lock()
def init_mt5():
    if mt5.initialize(): return
    if not mt5.initialize(path=TERM_PATH, login=LOGIN,
                          password=PASSWORD, server=SERVER):
        sys.exit(f"MT5 init failed {mt5.last_error()}")

def ensure_symbol(sym):
    return mt5.symbol_select(sym, True)

Кэш Prophet и дельта прогноза

Чтобы не перекомпилировать модель Prophet при каждом запросе, мы поддерживаем потокобезопасный словарь PROP, который сопоставляет символам либо None (компиляция ожидается), либо кортеж (model, timestamp). Функция _compile_prophet(df, sym) обучает новую модель Prophet на исторических данных и фиксирует время. Функция prophet_delta(prices, times, sym) проверяет кэш: если записи нет или она устарела (старше одного часа), запускается фоновая компиляция; если модель уже доступна, функция строит прогноз на одну секунду вперед и возвращает предсказанную дельту. Такая архитектура обеспечивает отзывчивую работу прогнозирования и не блокирует входящие запросы.

_PROP_LOCK = threading.Lock()
_PROP = {}  # sym -> (model, timestamp) or None

def _compile_prophet(df, sym):
    mdl = Prophet(daily_seasonality=False, weekly_seasonality=False)
    mdl.fit(df)
    with _PROP_LOCK:
        _PROP[sym] = (mdl, time.time())

def prophet_delta(prices, times, sym):
    if len(prices) < 20: return 0.0
    with _PROP_LOCK:
        entry = _PROP.get(sym)
        if entry is None:
            _PROP[sym] = None
            df = pd.DataFrame({"ds": pd.to_datetime(times, unit='s'),
                               "y": prices})
            threading.Thread(target=_compile_prophet, args=(df, sym), daemon=True).start()
            return 0.0
        mdl, ts = entry
    if time.time() - ts > 3600:
        with _PROP_LOCK: _PROP[sym] = None
        return 0.0
    fut = mdl.make_future_dataframe(periods=1, freq='s')
    return float(mdl.predict(fut).iloc[-1]["yhat"] - prices[-1])

Вспомогательные функции для признаков

Мы определяем набор небольших функций – z_spike, macd_div, rsi_val, combo_spike и других, – которые вычисляют отдельные технические сигналы, такие как всплески по z-score, дивергенция MACD, RSI и комбинированный показатель всплеска. Каждая вспомогательная функция перед расчетом проверяет, достаточно ли истории, и, если данных не хватает, возвращает значение по умолчанию. За счет такого разделения основная логика приема данных остается чистой, а модульное тестирование каждого индикатора упрощается.

def z_spike(prices, win=20):
    if len(prices) < win: return False, 0.0
    r = np.diff(prices[-win:])
    z = (r[-1] - r.mean())/(r.std()+1e-6)
    return abs(z) > 2.5, float(z)

def macd_div(prices):
    if len(prices) < 35: return 0.0
    return float(ta.trend.macd_diff(pd.Series(prices)).iloc[-1])

def rsi_val(prices, l=14):
    if len(prices) < l+1: return 50.0
    return float(ta.momentum.rsi(pd.Series(prices), l).iloc[-1])

def combo_spike(prices):
    _, z = z_spike(prices)
    m = macd_div(prices)
    v = prices[-1] - prices[-4] if len(prices) >= 4 else 0.0
    s = abs(z) + abs(m) + abs(v)/(np.std(prices[-20:])+1e-6)
    return s > 3.0, s

Вспомогательная функция добавления в CSV и gen_row

Функция append_rows(rows) принимает список списков из 12 элементов и записывает их в training_set.csv, создавая файл с заголовками при первой записи и затем дописывая новые строки. Функция gen_row(i, closes, times, sym, highs=None, lows=None) формирует одну обучающую строку: вычисляет признаки по истории цены до индекса i (включая ATR и полосы индикатора Envelopes, если переданы массивы highs/lows), вызывает prophet_delta для прогноза и присваивает метку "BUY/SELL/WAIT" на основе будущего движения цены. Отделив генерацию строк от приема данных, мы можем повторно использовать gen_row и при работе в реальном времени, и при импорте истории.

def append_rows(rows):
    if not rows: return
    pd.DataFrame(rows, columns=CSV_HEADER)\
      .to_csv(CSV_FILE, mode="a", index=False,
              header=not Path(CSV_FILE).exists())

def gen_row(i, closes, times, sym, highs=None, lows=None):
    if i < LOOKAHEAD or i+LOOKAHEAD >= len(closes): return None
    seq = closes[:i]
    _, mag = combo_spike(seq)
    atr = ta.volatility.average_true_range(pd.Series(highs[:i+1]),
                                           pd.Series(lows[:i+1]),
                                           pd.Series(seq)).iloc[-1] if highs else 0.0
    row = [
        times[i], sym, closes[i], mag,
        macd_div(seq), rsi_val(seq),
        atr, 0.0, 0.0, 0.0,
        prophet_delta(seq, times[:i], sym)
    ]
    ch = (closes[i+LOOKAHEAD] - closes[i]) / closes[i]
    row.append("BUY" if ch > THRESH_LABEL else "SELL" if ch < -THRESH_LABEL else "WAIT")
    return row

Цикл сбора данных в реальном времени

В collect_loop() мы сначала убеждаемся, что CSV-файл существует, а затем входим в бесконечный цикл, который для каждого символа запрашивает последние LOOKAHEAD+1 баров через mt5.copy_rates_from_pos, пропускает дубликаты по временной метке и вызывает gen_row, чтобы сформировать и добавить новое размеченное наблюдение. Пауза длительностью STEP_SECONDS задает контролируемую частоту опроса. Этот цикл данных в реальном времени непрерывно пополняет обучающий набор новыми наблюдениями, пока пользователь его не прервет.

def collect_loop():
    if not Path(CSV_FILE).exists(): append_rows([])
    last = {}
    print("Collecting… CTRL-C to stop")
    init_mt5()
    while True:
        for sym in SYMBOLS:
            if not ensure_symbol(sym): continue
            bars = mt5.copy_rates_from_pos(sym, mt5.TIMEFRAME_M1, 0, LOOKAHEAD+1)
            if bars is None or len(bars) < LOOKAHEAD+1: continue
            if last.get(sym) == bars[-1]['time']: continue
            last[sym] = bars[-1]['time']
            closes = bars['close'].tolist()
            times  = bars['time'].tolist()
            row = gen_row(len(closes)-LOOKAHEAD-1, closes, times, sym)
            if row: append_rows([row])
        time.sleep(STEP_SECONDS)

Импорт истории из MetaTrader 5 и файла

Функции history_from_mt5(sym, start, end) и history_from_file(sym, path) позволяют дозаполнять CSV либо из сохраненной истории MetaTrader 5, либо из локального файла. Обе функции проходят по каждому бару с временной меткой, вызывают gen_row для генерации признаков и метки, собирают строки в чанки (например, по 5,000 за раз) и дописывают их через append_rows. Обертка history_cli(args) разбирает аргументы командной строки (--days, --from, --to или --file), чтобы автоматизировать прием полного набора данных для указанных символов и диапазонов дат.

def history_from_mt5(sym, start, end):
    init_mt5()
    r = mt5.copy_rates_range(sym, mt5.TIMEFRAME_M1,
                              start.replace(tzinfo=UTC),
                              end.replace(tzinfo=UTC))
    if r is None or len(r)==0: return
    closes, times = r['close'].tolist(), r['time'].tolist()
    highs, lows   = r['high'].tolist(), r['low'].tolist()
    rows = [gen_row(i, closes, times, sym, highs, lows)
            for i in range(len(closes)-LOOKAHEAD) if gen_row(i, closes, times, sym, highs, lows)]
    append_rows([rw for rw in rows if rw])
    print(sym, "imported", len(rows), "rows")

Обучение моделей

Функция train_models() читает training_set.csv, приводит столбцы признаков к числовому виду (отбрасывая некорректные строки), а затем проходит по подмножеству каждого символа: если там есть как минимум 400 строк, она строит пайплайн scikit-learn (стандартизация + градиентный бустинг), обучает его на размеченных данных и сохраняет модель в формате .pkl. Также функция обучает и сохраняет глобальную модель по всем символам. В результате получается каталог классификаторов, готовых к работе.

def build_pipe(X, y):
    pipe = Pipeline([
        ("sc", StandardScaler()),
        ("gb", GradientBoostingClassifier(n_estimators=400,
                                          learning_rate=0.05,
                                          max_depth=3,
                                          random_state=42))
    ])
    return pipe.fit(X, y)

def train_models():
    df = pd.read_csv(CSV_FILE)
    df = df.dropna(subset=FEATURES)
    for sym in SYMBOLS:
        d = df[df.symbol == sym]
        if len(d) < 400: continue
        model = build_pipe(d[FEATURES], d.label.map({"WAIT":0,"BUY":1,"SELL":2}))
        joblib.dump(model, Path(MODEL_DIR)/f"{sym.replace(' ','_')}.pkl")
    global_model = build_pipe(df[FEATURES], df.label.map({"WAIT":0,"BUY":1,"SELL":2}))
    joblib.dump(global_model, GLOBAL_PKL)

Эндпоинты Flask-сервера

Мы запускаем приложение Flask с тремя основными маршрутами:

/upload_history разбирает JSON-чанки баров, вычисляет те же признаки, что и gen_row, размечает каждую строку и вызывает append_rows.

/upload_spike_csv принимает необработанные логи советника (либо CSV-текст, либо JSON-массивы), преобразует их в наш формат из 12 столбцов и дописывает.

/analyze загружает подходящую модель через load_model(), вычисляет признаки в реальном времени по отправленным ценам и временным меткам, оценивает вероятности классов, применяет пороги открытия и закрытия и возвращает JSON-объект с сигналом, уровнем уверенности, уровнями стоп-лосса / тейк-профита и силой позиции.

app = Flask(__name__)
app.config["MAX_CONTENT_LENGTH"] = 32*1024*1024

@app.route("/upload_history", methods=["POST"])
def upload_history():
    j = request.get_json(force=True)
    close, ts = np.array(j["close"]), np.array(j["time"],dtype=int)
    high = np.array(j.get("high", close))
    low  = np.array(j.get("low", close))
    df = pd.DataFrame({"timestamp": ts, "price": close})
    # compute features as in gen_row…
    append_rows(df.assign(symbol=j["symbol"]).values.tolist())
    return jsonify(status="ok", rows_written=len(df))

@app.route("/upload_spike_csv", methods=["POST"])
def upload_spike_csv():
    j = request.get_json(force=True)
    df_ea = pd.read_csv(io.StringIO(j.get("csv","")), sep=",")
    # map EA columns → CSV_HEADER
    append_rows(mapped_rows)
    return jsonify(status="ok", rows_written=len(mapped_rows))

@app.route("/analyze", methods=["POST"])
def api_analyze():
    j = request.get_json(force=True)
    mdl = load_model(j["symbol"])
    feats = [...]  # compute from j["prices"], j["timestamps"]
    proba = mdl.predict_proba([feats])[0]
    signal = decide_open(proba[1], proba[2], j["symbol"])
    # build sl, tp, manage _trades…
    return jsonify(signal=signal, sl=sl, tp=tp, strength=max(proba))

Эти эндпоинты обеспечивают прием данных, дозаполнение истории и принятие решений в реальном времени для советника MQL5.

Утилиты для тестирования на исторических данных и получения информации

Функция backtest_one(sym, df) повторно использует офлайн-вспомогательные функции для расчета признаков и логику инференса модели, чтобы симулировать сделки на историческом DataFrame df, фиксируя P&L, когда срабатывают условия стоп-лосса, тейк-профита или досрочного закрытия. Функция backtest_cli(args) агрегирует результаты по всем символам и выводит сводный P&L. Функция info() просто показывает количество строк в CSV, распределение меток и число признаков у каждой модели – это удобно для быстрой проверки состояния данных.

def backtest_one(sym, df):
    mdl = load_model(sym)
    for i in range(len(df)):
        feats = [...]  # offline feature calcs
        pr = mdl.predict_proba([feats])[0]
        # open/close logic identical to /analyze
    return trades

def info():
    df = pd.read_csv(CSV_FILE)
    print("Rows:", len(df), "Labels:", df.label.value_counts())
    for pkl in Path(MODEL_DIR).glob("*.pkl"):
        mdl = joblib.load(pkl)
        print(pkl.name, "features", mdl.named_steps["sc"].n_features_in_)

Интерфейс командной строки

Наконец, блок if __name__ == "__main__": определяет интерфейс командной строки на argparse с шестью подкомандами - collect, history, train, backtest, serve и info, каждая из которых вызывает соответствующую функцию. Такой подход дает единый скрипт, в котором можно, например, запустить python engine.py history --days 180, чтобы дозагрузить данные за шесть месяцев, или python engine.py serve, чтобы поднять API в реальном времени для вашего советника.

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    subs = parser.add_subparsers(dest="mode", required=True)
    subs.add_parser("collect")
    subs.add_parser("history")
    subs.add_parser("train")
    subs.add_parser("backtest")
    subs.add_parser("serve")
    subs.add_parser("info")
    args = parser.parse_args()
    if args.mode == "collect": init_mt5(); collect_loop()
    elif args.mode == "history": history_cli(args)
    elif args.mode == "train": train_models()
    elif args.mode == "backtest": backtest_cli(args)
    elif args.mode == "serve": init_mt5(); app.run("0.0.0.0", 5000, threaded=True)
    elif args.mode == "info": info()


Обучение модели в Python

Когда в нашем CSV уже накоплен достаточный объем исторических данных (за счет процедуры приема истории из MQL5 и Python-приемника), следующим этапом становится обучение модели. Убедитесь, что данные приняты для всех символов, заданных в наших скриптах MQL5 и Python. Когда прием данных будет завершен, можно переходить к следующему:
  • обучению моделей машинного обучения;
  • тестированию их работы на истории;
  • развертыванию полученных моделей для инференса в реальном времени.

На этом этапе мы обучаем классификатор градиентного бустинга для каждого символа (и одну глобальную модель), чтобы предсказывать, будет ли по окончании окна просмотра выдан сигнал BUY, SELL или WAIT. Градиентный бустинг последовательно строит ансамбль деревьев решений, где каждое новое дерево исправляет ошибки предыдущих, – это делает метод устойчивым к шумным финансовым данным и помогает улавливать нелинейные паттерны в наборе признаков. Мы оборачиваем его в пайплайн scikit-learn с StandardScaler, чтобы нормализовать признаки перед обучением.

# 3) TRAIN MODELS
def build_pipe(X, y):
    """
    Construct and fit a pipeline: StandardScaler → GradientBoostingClassifier.
    """
    pipe = Pipeline([
        ("sc", StandardScaler()),
        ("gb", GradientBoostingClassifier(
            n_estimators=400,      # number of boosting rounds
            learning_rate=0.05,    # shrinkage factor per tree
            max_depth=3,           # depth of each tree
            random_state=42        # reproducibility
        ))
    ])
    pipe.fit(X, y)
    return pipe

def train_models():
    """
    Load the CSV, clean it, train per-symbol and global Gradient Boosting models, and save to disk.
    """
    if not Path(CSV_FILE).exists():
        sys.exit("No training_set.csv")

    # Read and sanitize
    df = pd.read_csv(CSV_FILE)
    if "symbol" not in df.columns:
        sys.exit("CSV missing 'symbol' column")

    # Ensure numeric features
    for col in FEATURES:
        df[col] = pd.to_numeric(df[col], errors="coerce")
    bad = df[FEATURES].isna().any(axis=1).sum()
    if bad:
        print(f"Discarding {bad} malformed rows")
        df = df.dropna(subset=FEATURES)

    # Train a Gradient Boosting model for each symbol
    for sym in SYMBOLS:
        d = df[df.symbol == sym]
        if len(d) < 400:
            print("Skip", sym, "(few rows)")
            continue
        model = build_pipe(
            d[FEATURES],
            d.label.map({"WAIT": 0, "BUY": 1, "SELL": 2})
        )
        joblib.dump(model, Path(MODEL_DIR) / f"{sym.replace(' ', '_')}.pkl")
        print("model", sym, "saved")

    # Train and save a global Gradient Boosting model
    global_model = build_pipe(
        df[FEATURES],
        df.label.map({"WAIT": 0, "BUY": 1, "SELL": 2})
    )
    joblib.dump(global_model, GLOBAL_PKL)
    print("global model saved")
Запустите обучение командой:
python engine.py train
После запуска процедуры обучения мы получили в консоли следующий вывод:
C:\Users\hp\Pictures\Saved Pictures\Analysis EA>python engine.py train
Discarding 1152650 malformed rows
model Boom 900 Index saved
model Boom 1000 Index saved
model Boom 500 Index saved
model Crash 500 Index saved
model Boom 300 Index saved
....................................
....................................
All models saved

После того как обучение завершится и все модели будут сохранены, а это из-за объема данных может занять некоторое время, можно переходить к следующим шагам. Можно либо провести тестирование новых моделей на исторических данных, либо сразу перейти к развертыванию. В моем случае я сразу перешел к развертыванию модели, о чем и расскажу в следующем разделе.


Развертывание модели и инференс в реальном времени

Нажмите Ctrl+C, чтобы завершить процесс обучения. Затем запустите сервер инференса в реальном времени командой:
python engine.py serve

Эта команда разворачивает обученные модели и начинает выдавать торговые сигналы в реальном времени.

В MetaTrader 5 прикрепите советник к каждому символу, для которого была обучена модель. Затем в меню Сервис → Настройки → Советники терминала MetaTrader 5 включите Разрешить WebRequest для следующих URL и добавьте адрес вашего сервера в белый список.


Код состояния HTTP 200 означает "OK" – запрос был получен, понят и успешно обработан.

Во время тестов на работающем сервере каждый экземпляр советника успешно обращался к Python-бэкенду (HTTP 200) и возвращал торговую рекомендацию менее чем за 50 мс. Вот что показали логи:

Индекс Crash 1000 (M1)

В 00:31:59.717 модель показала вероятность BUY на уровне 0% и вероятность SELL на уровне 2,6%, а суммарный уровень уверенности (силы) составил всего 3%. Поскольку ни один порог не был превышен, советник корректно выбрал сигнал WAIT.

Индекс Boom 1000 (M1)

Всего через 37 мс, в 00:31:59.754, модель этого символа показала вероятность BUY на уровне 99,4%, а SELL – 0%. Такой высокий уровень уверенности сразу привел к сигналу "OPEN BUY". 

Эти логи подтверждают, что наш пайплайн развертывания работает от начала до конца.

2025.07.30 00:31:59.717 Spike DETECTOR (Crash 1000 Index,M1)    [SpikeEA] <<< HTTP 200 – {"Pbuy":0.0,"Psell":0.026,"scale_in":null
,"side":"NONE","signal":"WAIT","strength":0.03
2025.07.30 00:31:59.754 Spike DETECTOR (Boom 1000 Index,M1)     [SpikeEA] <<< HTTP 200 – {"Pbuy":0.994,"Psell":0.0,"scale_in":null
,"side":"BUY","signal":"OPEN"
, "strength":0.99

Вот более ранний тестовый запуск системы. Время от времени советник подает сигнал на вход – такие инструкции "OPEN" появляются в логах MetaTrader 5, хотя стрелка на графике не отображается. Появление визуальной метки зависит от силы сигнала.

Логи MetaTrader 5

2025.07.25 19:55:01.445 Spike DETECTOR (Boom 1000 Index,M1)     [SpikeEA] <<< HTTP 200 – {"Pbuy":0.999,"Psell":0.0,"scale_in":null
,"side":"BUY","signal":"OPEN","strength":1.0

График MetaTrader 5


Заключение

Объединив MQL5 и Python, мы получили мощную и гибкую торговую систему, которая использует сильные стороны обеих сред. На стороне MQL5 наш советник собирает величину всплесков, дивергенцию MACD, RSI, ATR, наклоны по фильтру Калмана и дельты Prophet, а затем сразу передает эти метрики в Python. На стороне Python основную работу по обучению модели и выдаче сигналов в реальном времени берет на себя единственный скрипт engine.py (с командами collect, history, train, backtest и serve). В нашей конфигурации мы опирались на советник MQL5 как на источник всех необходимых данных признаков, поэтому нам нужно было запускать только:

python engine.py train
python engine.py serve

Команды collect и history мы пропускаем, потому что наш советник уже сам поддерживает и предоставляет полный набор данных.

Что в итоге? Вскоре после запуска serve наши модели градиентного бустинга начинают возвращать в MetaTrader 5 сигналы BUY/SELL/WAIT в реальном времени – менее чем за 50 мс на бар, и логика ордеров советника уже может на них реагировать. Независимо от того, только начинаете ли вы работать с богатой библиотекой документации и примеров сообщества на сайте MQL5 или уже являетесь опытным квантом и хотите подключать новые генераторы признаков и алгоритмы, этот сквозной пайплайн можно легко масштабировать на различные символы и стратегии.

Спасибо сообществу MQL5, а также порталу mql5.com за множество примеров кода и полезных обсуждений – именно эти ресурсы сделали интеграцию простой и понятной. Я призываю всех идти дальше: настраивать гиперпараметры, добавлять новые индикаторы или даже упаковывать Python-сервер в контейнер для продакшена. И главное – делитесь своими результатами с сообществом, чтобы мы вместе продолжали развивать алгоритмическую торговлю на основе данных.




   
Chart Projector
Analytical Comment
Analytics Master
Analytics Forecaster 
Volatility Navigator
Mean Reversion Signal Reaper
Signal Pulse 
Metrics Board 
External Flow
VWAP
Heikin Ashi   FibVWAP  
RSI DIVERGENCE
Parabolic Stop and Reverse (PSAR) 
Скрипт Quarters Drawer
Intrusion Detector
TrendLoom Tool  Quarters Board 
ZigZag Analyzer  Correlation Pathfinder  Market Structure Flip Detector Tool
Correlation Dashboard   Currency Strength Meter 
PAQ Analysis Tool 
Dual EMA Fractal Breaker
Pin bar, Engulfing and RSI divergence
Liquidity Sweep Opening Range Breakout Tool Boom and Crash Interceptor CCI Zer-Line EA
Candlestick Recognition Candlestick Detection using TA-Lib Candle Range Tool MetaTrader 5 Data Ingestor Model Training and Deployment  

Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/18985

Прикрепленные файлы |
Spike_DETECTOR.mq5 (20.63 KB)
engine.py (23.34 KB)
Последние комментарии | Перейти к обсуждению на форуме трейдеров (1)
Helga Gustana Argita
Helga Gustana Argita | 12 авг. 2025 в 13:56
это потрясающе
но почему я не вижу объектов, созданных при наличии сигнала, даже не совершая сделки
Алгоритм оптимизации грифов — Buzzard Optimization Algorithm (BUZOA) Алгоритм оптимизации грифов — Buzzard Optimization Algorithm (BUZOA)
BUZOA — популяционный метаэвристический алгоритм, в котором каждый агент на каждой итерации случайно выбирает одну из трёх тактик охоты: узкий поиск вокруг личного рекорда, классический PSO-шаг к лидеру стаи или полную телепортацию в случайную точку пространства. В статье разбирается реализация алгоритма на MQL5, показывается найденная в оригинальной формулировке ошибка знака коэффициента и приводятся результаты бенчмарка на стандартном тестовом стенде.
Нейросети в трейдинге: Поиск устойчивых закономерностей в разнородных рыночных данных (Окончание) Нейросети в трейдинге: Поиск устойчивых закономерностей в разнородных рыночных данных (Окончание)
В статье представлена адаптация фреймворка INFNet в единый вычислительный конвейер для задач анализа финансовых временных рядов. Описана архитектура верхнеуровневого объекта, объединяющего последовательные, контекстные и сценарные потоки данных. Проведено тестирование на исторических данных EURUSD с оценкой устойчивости модели.
Dynamic Swing Architecture: Распознавание структуры рынка — от свингов до автоматического исполнения сделок Dynamic Swing Architecture: Распознавание структуры рынка — от свингов до автоматического исполнения сделок
В этой статье представлена полностью автоматизированная система на MQL5, предназначенная для точного определения свингов рынка и торговли ими. В отличие от традиционных индикаторов колебаний с фиксированным баром, эта система динамично адаптируется к меняющейся структуре цен, обнаруживая серию свинг-хай и свинг-лоу в режиме реального времени, чтобы улавливать возможности направления по мере их формирования.
Создание самооптимизирующихся советников на MQL5 (Часть 17): Ансамблевый интеллект Создание самооптимизирующихся советников на MQL5 (Часть 17): Ансамблевый интеллект
Все стратегии алгоритмической торговли сложны в настройке и обслуживании, независимо от их сложности — эта проблема актуальна как для новичков, так и для экспертов. В данной статье представлен коллективный интеллект, в которой модели с учителем и человеческая интуиция взаимодействуют друг с другом, чтобы преодолеть свои общие ограничения. Совместив стратегию на основе канала скользящих средних с моделью регрессии Риджа на тех же индикаторах, мы добиваемся централизованного управления, более быстрой самокорректировки и прибыльности систем, которые в противном случае были бы убыточными.