English Deutsch 日本語
preview
Статистический арбитраж с использованием коинтегрированных акций (Часть 4): Обновление параметров модели в реальном времени

Статистический арбитраж с использованием коинтегрированных акций (Часть 4): Обновление параметров модели в реальном времени

MetaTrader 5Торговые системы |
75 0
Jocimar Lopes
Jocimar Lopes

Введение

Пришло время начать обновлять нашу модель коинтеграции в режиме реального времени. Точнее говоря, пришло время начать корректировать относительный вес каждой акции в нашем торговом портфеле с учетом последних расчётов, пока советник работает. До сих пор мы использовали одинаковые веса портфеля в наших объяснениях и тестах на исторических данных. Они служили средством упрощения, облегчающим понимание процесса, а также смысла и принципов работы лежащих в их основе статистических тестов. Однако в реальной жизни эти веса в портфеле меняются практически при поступлении каждого нового значения, то есть, в нашем случае, при закрытии каждого нового бара.

Но сначала немного предыстории — краткий обзор, чтобы убедиться, что мы все на одной волне.

Мы разрабатываем своего рода «упрощенную» платформу для статистического арбитража — конвейер статистического арбитража, предназначенный для обычного розничного трейдера, использующего обычный ноутбук и имеющего стандартную пропускную способность сети. В этой серии статей мы приводим и описываем статистические тесты, используемые для поиска активов с высокой степенью корреляции для парной торговли, коинтегрированных акций для формирования портфеля, а также тесты, применяемые для оценки стационарности их относительных спредов, что необходимо для стратегий возврата к среднему значению, подобных нашей.

В предыдущей статье этой серии (часть 3) мы описали настройку базы данных и привели некоторые замечания относительно наших проектных решений. Эта статья является прямым продолжением предыдущей. Как только наша база данных будет настроена и будет регулярно обновляться, мы сможем отделить анализ данных от нашей торговой деятельности.

Теперь посмотрим:

  • как мы модифицировали наши скрипты на Python с учетом полученного опыта, при этом все тесты теперь обрабатываются за один проход, что упрощает выбор активов; 

  • как мы полностью переработали код нашего советника, чтобы он мог использовать актуальную базу данных во время работы, и как мы перенесли почти все функции в заголовочный файл MQL5, чтобы упростить управление и поддержку кода, оставив в основном файле только собственные обработчики событий; 

  • И, наконец, мы рассмотрим внесенные нами изменения в базу данных, чтобы она могла выступать в качестве интерфейса между системой анализа данных в реальном времени и средой онлайн-трейдинга.

Давайте начнём с последнего пункта, пункта 3, чтобы при описании остальных двух пунктов всё стало понятно.


База данных как единый источник истины

В предыдущей статье мы начали с простого определения того, какой ответ мы хотим получить от нашей базы данных для нашего советника, чтобы обновлять модель в режиме реального времени. Это типичный «чит-код», который часто используется при проектировании по принципу «снизу вверх». Мы выясняем, в чём заключается насущная потребность, и приступаем к её удовлетворению. Это позволяет избежать излишней сложности конструкции и упрощает корректировку курса. Вот соответствующий отрывок из предыдущей статьи.

В конечном итоге наша база данных должна дать ответ на один очень простой вопрос: чем нам сейчас торговать, чтобы получить максимально возможную прибыль?

Ссылаясь на предыдущую статью из этой серии, в которой мы определяли направления и объемы наших ордеров на основе весов в портфеле акций, один из возможных ответов мог бы выглядеть примерно так:

символы веса таймфрейм
«MU», «NVDA», «MPWR», «MCHP» 2,699439, 1,000000, -1,877447, -2,505294 D1 (суточный таймфрейм)

Таблица 1. Пример предполагаемого ответа на запрос для обновления модели в режиме реального времени (предыдущая статья)

Чтобы получить такой ответ, мы добавили в схему нашей базы данных новую таблицу — таблицу «strategy». Она содержит четыре поля типа TEXT
  • Название ПЕРВИЧНЫЙ КЛЮЧ
  • Символы
  • Веса
  • Сроки
-- strategy definition

CREATE TABLE strategy (
        name TEXT CHECK(LENGTH(name) <= 20) NOT NULL,
        symbols TEXT CHECK(LENGTH(symbols) <= 255) NOT NULL,
        weights TEXT CHECK(LENGTH(weights) <= 255),
        timeframe TEXT CHECK (
        timeframe IN (
            'M1',
            'M2',
            'M3',
            'M4',
            'M5',
            'M6',
            'M10',
            'M12',
            'M15',
            'M20',
            'M30',
            'H1',
            'H2',
            'H3',
            'H4',
            'H6',
            'H8',
            'H12',
            'D1',
            'W1',
            'MN1'
        )
    ),CONSTRAINT strategy_pk PRIMARY KEY (name)
) WITHOUT ROWID, STRICT;
Мы определили эту таблицу как таблицу WITHOUT ROWID. В SQLite таблицами rowid являются все обычные таблицы, за исключением виртуальных таблиц и таблиц с ограничением INTEGER PRIMARY KEY. В таблицах ROWID первичные ключи не являются настоящими первичными ключами.
«Ограничение PRIMARY KEY для таблицы с ROWID (если это не настоящий первичный ключ или INTEGER PRIMARY KEY) по сути является тем же самым, что и ограничение UNIQUE. «Поскольку это не является настоящим первичным ключом, значения в столбцах PRIMARY KEY могут быть равны NULL, что противоречит всем стандартам SQL» (выделено нами).

Таким образом, определив таблицу как таблицу WITHOUT ROWID, мы предотвращаем вставку значений NULL и, следовательно, не допускаем нарушения реляционной целостности. Недостатком является то, что этот пункт специфичен для SQLite, а это означает, что наша схема перестает быть полностью переносимой в другие СУБД. Чтобы использовать эту схему в других СУБД, нам придётся её адаптировать. Но я считаю, что преимущества, связанные с тем, что в поле первичного ключа не вставляются значения NULL, оправдывают связанные с этим затраты.

Как вы, возможно, заметили, «symbols» и «weights» — это массивы. Однако в SQLite нет типа данных ARRAY. В связи с этим ограничением мы будем вставлять эти данные в виде текста, но с форматированием в виде массива JSON. Эта мера облегчит нам работу в будущем, если мы перейдем на другую СУБД, поддерживающую тип данных ARRAY. 

название символы веса таймфрейм
Nasdaq_NVDA_Coint «["MU", "NVDA", "MPWR", "MCHP"]» «[“2,699439”, “1,000000”, “-1,877447”, “-2,505294”]» D1

Таблица 2. Данные о символах и весах вставляются в таблицу «strategy» в виде текста типа TEXT, отформатированного в виде массивов JSON

Для их извлечения мы будем использовать функцию json_extract() в SQLite. 

Итак, схема нашей базы данных, обновленная до версии statarb-0.2, теперь включает дополнительную таблицу «strategy», а её диаграмма «сущность-связь» выглядит следующим образом:

Рисунок 1 — Диаграмма «сущность-связь» (ERD) базы данных statarb-0.2

Рисунок 1. Диаграмма «сущность-связь» (ERD) базы данных statarb-0.2

СОВЕТ — При разработке с использованием SQLite я обычно НЕ удаляю старые версии файлов базы данных, а также старые версии файлов схемы. Поскольку речь идет об одном файле, его легко скопировать, переименовать копию в новую версию и хранить обе версии в системе контроля версий (VCS). Эта практика обеспечивает нам простой «путь отката» на всякий случай.

Рисунок 2 — Навигатор Metaeditor, отображающий различные версии файлов SQLite (баз данных) и соответствующие файлы схем

Рисунок 2. Навигатор Metaeditor, отображающий различные версии файлов SQLite (баз данных) и соответствующие файлы схем

Как видно из приведенной выше схемы ERD, наша новая таблица «strategy» пока не связана ни с одной другой таблицей. Ситуация изменится, как только мы начнём проводить бэктестинг/торговать с помощью этой новой версии советника, когда он будет подключен к таблице «trade». Поскольку мы работаем по принципу «снизу вверх», мы будем вносить изменения в нашу базу данных только тогда, когда в этом возникнет необходимость. На данный момент таблица «strategy» представляет собой не более чем интерфейс — средство связи между нашей системой анализа данных и торговыми платформами.

База данных будет служить интерфейсом между нашей системой анализа данных и нашим торговым советником, а также между нашей системой отбора — то есть применением результатов анализа данных — и торговым советником. Процесс будет выглядеть примерно так:

Рисунок 3 — Блок-схема, иллюстрирующая циклический характер нашего конвейера статистического арбитража

Рисунок 3. Блок-схема иллюстрирует циклический характер нашего конвейера статистического арбитража


Сторона Python

Мы перешли от отдельных задач, выполняемых в Jupyter, к единому скрипту на Python, который запускает комплексный конвейер статистического арбитража для коинтегрированных акций. Этот скрипт будет:

Проверьте, существует ли база данных SQLite.

def CheckForDb(db_path: str | Path) -> bool:
    """Return True if the SQLite database file exists on disk."""
    exists = Path(db_path).is_file()
    logger.info("Database exists: %s (%s)", exists, db_path)
    return exists

Если база данных не найдена, будет создана новая. Это удобно, если вы не следите за этой серией и просто хотите поэкспериментировать с конвейером статистического арбитража. Всё, что вам нужно сделать, — это предоставить схему базы данных (прилагается ниже).

def CreateDbIfMissing(db_path: str | Path, schema_path: Optional[str | Path] = None) -> None:
    """
    If the database does not exist and a schema is provided, create it from SQL.
    Otherwise, create an empty file (SQLite creates DB on first connection).
    """
    db_path = Path(db_path)
    if db_path.exists():
        logger.info("DB already present: %s", db_path)
        return
    logger.info("DB missing; creating: %s", db_path)

    with sqlite3.connect(db_path) as conn:
        if schema_path:
            schema_sql = Path(schema_path).read_text(encoding="utf-8")
            conn.executescript(schema_sql)
            logger.info("Applied schema from: %s", schema_path)

Подключитесь к терминалу MetaTrader 5.

def InitMT5(login: Optional[int] = None, password: Optional[str] = None,
            server: Optional[str] = None, path: Optional[str] = None) -> None:

    """Initialize connection to the running MetaTrader 5 terminal."""

    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")

    # if not mt5.initialize(path=path, login=login, password=password, server=server):
    if not mt5.initialize():
        raise RuntimeError(f"MT5 initialize() failed: {mt5.last_error()}")
    logger.info("MetaTrader5 initialized.")

Загрузите рыночные данные из MetaTrader 5.

def DownloadMarketData(symbols: List[str], timeframes: List[str], bars: int = 5000
                       ) -> Dict[Tuple[str, str], pd.DataFrame]:
    """
    Download OHLCV data from MT5 for each (symbol, timeframe).
    Returns dict with keys (symbol, timeframe) -> DataFrame columns:
    ['tstamp','timeframe','open','high','low','close','tick_volume','real_volume','spread']
    """
    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")
    out: Dict[Tuple[str, str], pd.DataFrame] = {}

    for sym in symbols:
        for tf in timeframes:
            tf_const = _mt5_timeframe_constant(tf)
            rates = mt5.copy_rates_from_pos(sym, tf_const, 0, bars)

            if rates is None or len(rates) == 0:
                logger.warning("No data for %s %s", sym, tf)
                continue

            df = pd.DataFrame(rates)

            # Rename & select
            df = df.rename(columns={
                "time": "tstamp",
                "open": "price_open",
                "high": "price_high",
                "low": "price_low",
                "close": "price_close",
                "real_volume": "real_volume",
            })

            df["timeframe"] = tf

            cols = ["tstamp","timeframe","price_open","price_high","price_low","price_close",
                    "tick_volume","real_volume","spread"]
            df = df[cols].copy()
            out[(sym, tf)] = df
            logger.info("Downloaded %d bars for %s %s", len(df), sym, tf)
    return out

Сохраните эти рыночные данные в базе данных SQLite.

def StoreMarketData(db_path: str | Path, data: Dict[Tuple[str, str], pd.DataFrame]) -> None:

    """Upsert market data into SQLite according to the provided schema."""
    with sqlite3.connect(db_path) as conn:
        for (sym, tf), df in data.items():
            symbol_id = _ensure_symbol(conn, sym)

            df = df.copy()
            df["symbol_id"] = symbol_id

            # Use executemany for performance
            sql = """
            INSERT OR REPLACE INTO market_data
            (tstamp, timeframe, price_open, price_high, price_low, price_close,
             tick_volume, real_volume, spread, symbol_id)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """

            rows = list(df[["tstamp","timeframe","price_open","price_high","price_low","price_close",
                            "tick_volume","real_volume","spread","symbol_id"]].itertuples(index=False, name=None))
            conn.executemany(sql, rows)
            conn.commit()

            logger.info("Stored %d rows for %s %s", len(rows), sym, tf)

Проверьте коэффициент корреляции Пирсона для выбранных символов.

def RunCorrelationTest(prices: pd.DataFrame, plot: bool = True) -> Tuple[pd.DataFrame, Tuple[str, str], float]:

    """Compute Pearson correlation matrix and return the most correlated pair (off-diagonal)."""
    corr = prices.corr(method="pearson")

    # Mask diagonal, find max
    corr_values = corr.where(~np.eye(len(corr), dtype=bool))
    idxmax = np.unravel_index(np.nanargmax(corr_values.values), corr_values.shape)
    symbols = corr.columns.tolist()
    pair = (symbols[idxmax[0]], symbols[idxmax[1]])
    value = corr_values.values[idxmax]

    logger.info("Most correlated pair: %s ~ %s (r=%.4f)", pair[0], pair[1], value)

    if plot:
        plt.figure(figsize=(10, 7))
        sns.heatmap(corr, annot=False, vmin=-1, vmax=1, cmap="coolwarm", square=True)
        plt.title("Pearson Correlation Heatmap")
        plt.tight_layout()
        plt.show()

    print(f"Most correlated pair: {pair[0]} ~ {pair[1]} (r={value:.4f})")

    return corr, pair, float(value)

Рассчитайте коэффициент коинтеграции Энгл-Грейнджера для выбранных символов и укажите пару с наибольшей корреляцией.

def RunEngleGrangerTest(prices: pd.DataFrame, plot: bool = True
                        ) -> Tuple[Tuple[str, str], float, float, pd.Series]:
    """
    Loop over all symbol pairs, run statsmodels.coint (Engle-Granger). Return:
        best_pair, best_pvalue, hedge_ratio (y_on_x), spread series (y - beta*x)
    """
    best_pair, best_p, best_beta, best_spread = None, 1.0, np.nan, None
    cols = list(prices.columns)
    for y_sym, x_sym in itertools.combinations(cols, 2):
        y = prices[y_sym].dropna()
        x = prices[x_sym].dropna()
        join = pd.concat([y, x], axis=1).dropna()

        if len(join) < 50:
            continue

        score, pvalue, _ = coint(join.iloc[:,0], join.iloc[:,1])

        if pvalue < best_p:
            beta = _ols_hedge_ratio(join.iloc[:,0], join.iloc[:,1])
            spread = join.iloc[:,0] - beta * join.iloc[:,1]
            best_pair, best_p, best_beta, best_spread = (y_sym, x_sym), pvalue, beta, spread

    if best_pair is None:
        raise ValueError("No cointegrated pairs found (Engle-Granger) with sufficient data.")

    print(f"Most cointegrated pair (Engle-Granger): {best_pair[0]} ~ {best_pair[1]} (p={best_p:.6f}, beta={best_beta:.4f})")

    if plot:
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(prices.index, prices[best_pair[0]], label=best_pair[0])
        ax.plot(prices.index, prices[best_pair[1]]*best_beta, label=f"{best_pair[1]} * beta")
        ax.set_title(f"Best EG Pair Overlay (beta={best_beta:.4f})")
        ax.legend()
        fig.tight_layout()
        plt.show()


      fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(best_spread.index, best_spread.values, label="Spread") ax2.axhline(0, line) ax2.set_title("Spread (y - beta*x)")

        ax2.legend() fig2.tight_layout()

        plt.show()

    return best_pair, float(best_p), float(best_beta), best_spread

Проведите тесты на стационарность (ADF и KPSS) для разницы Энгл-Грейнджера.

def RunADFTest(series: pd.Series) -> Tuple[float, float, dict]:
    """Augmented Dickey-Fuller test on a univariate series."""
    series = pd.Series(series).dropna()
    adf_stat, pvalue, usedlag, nobs, crit, icbest = adfuller(series, autolag="AIC")

    print(f"ADF: stat={adf_stat:.4f}, p={pvalue:.6f}, lags={usedlag}, nobs={nobs}, crit={crit}")

    return float(adf_stat), float(pvalue), crit
def RunKPSSTest(series: pd.Series, regression: str = "c") -> Tuple[float, float, dict]:

    """KPSS stationarity test on a univariate series (regression='c' or 'ct')."""
    series = pd.Series(series).dropna()
    stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

    print(f"KPSS({regression}): stat={stat:.4f}, p={pvalue:.6f}, lags={lags}, crit={crit}")

    return float(stat), float(pvalue), crit

Проверить коинтеграцию по Йохансену (с весами портфеля, взятыми из первого собственного вектора при ранге >= 1).

def RunJohansenTest(prices: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, plot: bool = True
                    ) -> Tuple[int, np.ndarray, np.ndarray, Optional[pd.Series]]:
    """
    Johansen cointegration test on a set of symbols (>=2).
    Returns: (rank, eigenvectors (beta), eigenvalues, spread_from_first_vector or None)
    - Significant rank determined by trace statistics vs 95% critical values.
    - First eigenvector provides portfolio weights (normalized s.t. min(|w|)=1 or w[-1]=1).
    """

    data = prices.dropna().values

    if data.shape[1] < 2:
        raise ValueError("Johansen requires at least two series.")

    joh = coint_johansen(data, det_order, k_ar_diff)

    # Significant rank by trace statistic > crit value (95% = column index 1)
    trace_stats = joh.lr1  # trace stats
    crit_vals = joh.cvt[:, 1]  # 95%
    rank = int(np.sum(trace_stats > crit_vals))
    print("Johansen trace stats:", trace_stats)

    print("Johansen 95% crit vals:", crit_vals)

    print(f"Johansen inferred rank: {rank}")

    weights = None
    spread = None

    if rank >= 1:

        # Beta columns are cointegrating vectors; take the first (associated with largest eigenvalue)
        beta = joh.evec  # shape (k, k)
        eigvals = joh.eig  # shape (k,)

        # Sort by eigenvalue desc to be explicit
        order = np.argsort(eigvals)[::-1]
        beta_sorted = beta[:, order]
        eig_sorted = eigvals[order]
        first_vec = beta_sorted[:, 0]

        # Normalize such that the weight of the last symbol is 1 (common in practice)
        if first_vec[-1] == 0:
            norm = np.max(np.abs(first_vec))
        else:
            norm = first_vec[-1]

        weights = first_vec / norm

        print("First Johansen eigenvector (portfolio weights):")

        for sym, w in zip(prices.columns, weights):
            print(f"  {sym:>12s}: {w:+.6f}")

        # Spread = P * weights
        spread = pd.Series(prices.values @ weights, index=prices.index, name="JohansenSpread")

        if plot:
            fig, ax = plt.subplots(figsize=(10, 4))
            ax.plot(spread.index, spread.values, label="Johansen Spread")
            ax.axhline(0, linestyle="--")
            ax.set_title("Johansen Spread (1st eigenvector)")
            ax.legend()
            fig.tight_layout()

            plt.show()

        return rank, beta_sorted, eig_sorted, spread, weights

    return rank, joh.evec, joh.eig, weights, None

Если какой-либо ранг в тесте коинтеграции Йохансена окажется СИГНИФИКАТИВНЫМ, скрипт также запустит тесты ADF и KPSS для спреда Йохансена и, наконец, сохранит параметры стратегии (символы, веса и таймфрейм) в таблице «strategy», как упоминалось в предыдущем разделе.

def StoreJohansenStrategy(db_path: str | Path, name: str, symbols: List[str],
                          weights: np.ndarray, timeframe: str) -> None:
    """
    Store Johansen eigenvector (portfolio weights) and symbols into the strategy table.
    Symbols and weights are stored as JSON arrays (text).
    """

    import json

    with sqlite3.connect(db_path) as conn:
        cur = conn.cursor()
        cur.execute("""
            INSERT OR REPLACE INTO strategy (name, symbols, weights, timeframe)
            VALUES (?, ?, ?, ?)
        """, (
            name,
            json.dumps(symbols),
            json.dumps([float(w) for w in weights]),
            timeframe
        ))
        conn.commit()

    logger.info("Stored Johansen strategy '%s' with symbols=%s timeframe=%s",
                name, symbols, timeframe)

Для удобства программа будет приостанавливать выполнение и строить таймфрейм на каждом соответствующем участке (например, при разбросах), чтобы вы могли легко определить, стоит ли выбранные символы подвергнуть более тщательному анализу, и в случае необходимости изменить порядок их в списке (для теста Энгл-Грейнджера) или временной интервал. Этот скрипт служит инструментом для ручного скрининга до того, как на следующем этапе у нас появится полностью автоматизированный конвейер. Каждая задача реализована в виде отдельной функции, поэтому их можно легко импортировать в другие скрипты.

ВНИМАНИЕ: Этот скрипт является точкой входа в нашу систему. Сразу после того, как новый список символов будет здесь определен, его необходимо добавить в службу обновления базы данных. Это обеспечит плавное обновление вашей базы данных, даже если ваш советник по каким-либо причинам не работает. На данный момент этот шаг необходимо выполнять вручную.

Чтобы запустить скрипт, вызовите его из командной строки с параметрами. Например:

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "D1" --bars 1000 --run

Как видите, мы используем переменные среды для параметра пути к базе данных. Если всё прошло успешно, вы должны увидеть примерно следующее, если, согласно тесту Йохансена, коинтеграции нет.

03.09.2025 15:35:24,588 [INFO] База данных существует: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

03.09.2025 15:35:24,589 [INFO] Используется существующая база данных по адресу C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db.

03.09.2025 15:35:24,605 [INFO] MetaTrader 5 инициализирован.

03.09.2025 15:35:24,613 [ИНФО] Загружено 1000 баров для MU D1  

03.09.2025 15:35:24,618 [ИНФО] Загружено 1000 баров для NVDA D1

03.09.2025 15:35:24,625 [ИНФО] Загружено 1000 баров для MPWR D1

03.09.2025 15:35:24,633 [ИНФО] Загружено 1000 баров для MCHP D1

03.09.2025 15:35:24,997 [INFO] Сохранено 1000 строк для MU D1

03.09.2025 15:35:25,261 [ИНФО] Сохранено 1000 строк для NVDA D1

03.09.2025 15:35:25,514 [INFO] Сохранено 1000 строк для MPWR D1

03.09.2025 15:35:25,870 [INFO] Сохранено 1000 строк для MCHP D1

03.09.2025 15:35:25,871 [INFO] MetaTrader 5 остановлен.

03.09.2025 15:35:25,961 [INFO] ReadMarketData: Таймфрейм D1, 1000 строк, 4 инструмента

03.09.2025 15:35:25,964 [INFO] Пара с наибольшей корреляцией: MPWR ~ NVDA (r = 0,8335)

Пара с наибольшей корреляцией: MPWR ~ NVDA (r = 0,8335)

Наиболее коинтегрированная пара (по Энглу-Грейнджеру): MPWR ~ MU (p = 0,010558, бета = 5,4940)


ADF и KPSS для спреда Энгла—Грейнджера:

ADF: stat=-3.8778, p=0.002204, lags=16, nobs=983, crit={'1%': np.float64(-3.4370198458812156), '5%': np.float64(-2.864484708707697), '10%': np.float64(-2.568337912084273)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: Предупреждение об интерполяции: Тестовая статистика выходит за пределы диапазона значений p, доступных в таблице. Фактическое значение p меньше, чем возвращаемое значение p.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0,9279, p=0,010000, lags=19, crit={'10%': 0,347, «5 %»: 0,463, «2,5 %»: 0,574, «1 %»: 0,739}

Тест Йохансена для всех предоставленных символов:

Статистика траектории Йохансена: [34.26486877 18.91156161  8.04865462  0.11018312]

Йохансен, 95-процентные доверительные интервалы: [47,8545 29,7961 15,4943  3,8415]

Ранг, выведенный Йохансеном: 0

Как уже упоминалось выше, при наличии ЗНАЧИМОГО ранга Йохансена (>=1), то есть когда между выбранными символами имеется хотя бы один вектор коинтеграции, скрипт сохраняет значения первого собственного вектора (веса портфеля) в базе данных в «таблице стратегий». Кроме того, он проведет тесты ADF и KPSS для данного спреда.

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "H4" --bars 90 --run   

В этом случае в окне Windows PowerShell или терминала должно появиться примерно следующее:

03.09.2025 15:50:43,168 [INFO] База данных существует: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

03.09.2025 15:50:43,169 [INFO] Используется существующая база данных по адресу C:\Users\your\path\to\Files\StatArb\statarb-0.2.db.

03.09.2025 15:50:43,188 [ИНФО] MetaTrader 5 инициализирован.

03.09.2025 15:50:44,297 [ИНФО] Загружено 90 баров для MU H4

03.09.2025 15:50:44,357 [ИНФО] Загружено 90 баров для NVDA H4

03.09.2025 15:50:46,322 [ИНФО] Загружено 90 баров для MPWR H4

03.09.2025 15:50:46,732 [ИНФО] Загружено 90 баров для MCHP H4

03.09.2025 15:50:49,148 [ИНФО] Сохранено 90 строк для MU H4

03.09.2025 15:50:49,160 [ИНФО] Сохранено 90 строк для NVDA H4

03.09.2025 15:50:49,171 [ИНФО] Сохранено 90 строк для MPWR H4

03.09.2025 15:50:49,276 [INFO] Сохранено 90 строк для MCHP H4

03.09.2025 15:50:49,277 [ИНФО] MetaTrader 5 остановлен.

03.09.2025 15:50:49,617 [ИНФО] ReadMarketData: График H4, 90 строк, 4 символа

03.09.2025 15:50:49,622 [ИНФО] Пара с наибольшей корреляцией: MPWR ~ NVDA (r = 0,6115)

Пара с наибольшей корреляцией: MPWR ~ NVDA (r = 0,6115)

Наиболее коинтегрированная пара (по Энглу-Грейнджеру): MU ~ NVDA (p = 0,072055, β = −0,2391)


ADF и KPSS для спреда Энгла—Грейнджера:

ADF: stat=-3.1693, p=0.021831, lags=0, nobs=89, crit={'1%': np.float64(-3.506057133647011), '5%': np.float64(-2.8946066061911946), '10%': np.float64(-2.5844100201994697)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: Предупреждение об интерполяции: Тестовая статистика выходит за пределы диапазона значений p, доступных в таблице. Фактическое значение p превышает возвращаемое значение p.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0,1793, p=0,100000, lags=5, crit={'10%': 0,347, «5 %»: 0,463, «2,5 %»: 0,574, «1 %»: 0,739}


Тест Йохансена для всех предоставленных символов:

Статистика траектории Йохансена: [63.23695088 32.35462044 16.63904745  5.4295422 ]

Йохансен, 95-процентные доверительные интервалы: [47,8545 29,7961 15,4943  3,8415]

Ранг, выведенный Йохансеном: 4

Первый собственный вектор Йохансена (веса портфеля):

          MCHP: +0,273652

          MPWR: -0,148092

            MU: +2,256498

          NVDA: +1,000000

ADF и KPSS для спреда Энгла—Грейнджера:

ADF: stat=-4.1427, p=0.000823, lags=1, nobs=88, crit={'1%': np.float64(-3.506944401824286), '5%': np.float64(-2.894989819214876), '10%': np.float64(-2.584614550619835)}

KPSS(c): stat=0,4009, p=0,076784, lags=4, crit={'10%': 0,347, «5 %»: 0,463, «2,5 %»: 0,574, «1 %»: 0,739}

03.09.2025 15:50:50,051 

[ИНФО] Сохранена стратегия Йохансена «Nasdaq_NVDA_Coint» с символами=['MCHP', 'MPWR', 'MU', 'NVDA'] и таймфреймом=H4


А как насчет упомянутых выше предупреждений об интерполяции?

Эти предупреждения об интерполяции связаны с тестом стационарности KPSS для разницы Энгл-Грейнджера. Они сообщают нам, что мы получили экстремальные результаты, выходящие за пределы таблицы значений теста. Эти предупреждения просто сообщают нам, что наш разброс Энгл-Грейнджера составляет

  • сильно стационарен (значение p больше, чем возвращаемое значение p) 
  • или сильно нестационарны (значение p меньше, чем возвращаемое значение p)

Пока мы занимаемся обновлением параметров нашей модели в режиме реального времени, эти предупреждения можно смело игнорировать. Мы рассмотрим их подробнее на следующем этапе, когда займёмся скринингом наших активов.


Сторона MQL5

В основном файле мы оставили только обработчики событий EA. Все остальные функции были перенесены в заголовочный файл Nasdaq_NVDA_Coint.mqh.

Раньше параметры стратегии были жестко заданными. Теперь это глобальные переменные. 

// Global variables
string symbols[] = {}; // Asset symbols
double weights[] = {}; // Portfolio weights
ENUM_TIMEFRAMES timeframe = PERIOD_CURRENT; // Strategy cointegrated timeframe

Эти глобальные переменные проверяются непосредственно в обработчике события OnInit().

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {
   ResetLastError();

// Check if all symbols are available
   for(int i = 0; i < ArraySize(symbols); i++)
     {
      if(!SymbolSelect(symbols[i], true))
        {
         Print("Error: Symbol ", symbols[i], " not found!");
         return(INIT_FAILED);
        }
     }
// Initialize spread buffer
   ArrayResize(spreadBuffer, InpLookbackPeriod);

// Set a timer for spread, mean, and stdev calculations
   EventSetTimer(InpUpdateFreq * 60); // min one minute

// Load strategy parameters from database
   if(!LoadStrategyFromDB(InpDbFilename,
                                InpStrategyName,
                                symbols,
                                weights,
                                timeframe))
     {
      // Handle error - maybe use default values
      printf("Error at " + __FUNCTION__ + " %s ",
             getUninitReasonText(GetLastError()));
      return INIT_FAILED;
     }
   return(INIT_SUCCEEDED);
  }

Функция LoadStrategyFromDB() находится в заголовочном файле.

//+------------------------------------------------------------------+
//|          Load strategy parameter from database                   |
//+------------------------------------------------------------------+

bool LoadStrategyFromDB(string db_name,
                        string strat_name,
                        string &symbols_arr[],
                        double &weights_arr[],
                        ENUM_TIMEFRAMES &tf)
  {

// Open the database (Metatrader's integrated SQLite)
   int db = DatabaseOpen(db_name, DATABASE_OPEN_READONLY);
   if(db == INVALID_HANDLE)
     {
      Print("Failed to open database: %s", GetLastError());
      return false;
     }
// Prepare the SQL query with json_extract
   string query = StringFormat(
                     "SELECT "
                     "json_extract(symbols, '$') as symbols_json, "
                     "json_extract(weights, '$') as weights_json, "
                     "timeframe "
                     "FROM strategy WHERE name = '%s'",
                     strat_name
                  );

// Execute the query
   int result = DatabasePrepare(db, query);

   if(result <= 0)
     {
      Print("Failed to prepare query: ", GetLastError());
      DatabaseClose(db);
      return false;
     }

// Read the results
   if(DatabaseRead(result))
     {
      // Get symbols_arr JSON array and parse it
      string symbolsJson;
      DatabaseColumnText(result, 0, symbolsJson);
      ParseJsonArray(symbolsJson, symbols_arr);

      // Get weights_arr JSON array and parse it
      string weightsJson;
      DatabaseColumnText(result, 1, weightsJson);
      ParseJsonDoubleArray(weightsJson, weights_arr);

      // Get tf string and convert to ENUM_TIMEFRAMES
      string timeframeStr;
      DatabaseColumnText(result, 2, timeframeStr);
      tf = StringToTimeframe(timeframeStr);

      Print("Successfully loaded strategy: ", strat_name);

      Print("Symbols JSON: ", symbolsJson);
      Print("Weights JSON: ", weightsJson);
      Print("Timeframe: ", timeframeStr);
     }
   else
     {
      Print("Strategy not found: ", strat_name);

      DatabaseFinalize(result);
      DatabaseClose(db);
      return false;
     }
// Clean up

   DatabaseFinalize(result);
   DatabaseClose(db);
   return true;
  }

Как уже упоминалось выше, мы будем считывать массивы символов и весов портфеля с помощью функции json_extract() библиотеки SQLite. Он имеет следующую сигнатуру:

json_extract(json, path)

Где:

  • «json» — это текстовый столбец или строка, содержащая данные в формате JSON
  • «path» — это выражение JSON-пути, которое указывает местоположение извлекаемого значения в JSON. Обычно она начинается с символа $, обозначающего корень JSON-документа, за которым следует точечная нотация для ключей объектов ($.key) или квадратная нотация для индексов массивов ($[index]). Именно эту последнюю форму мы и используем здесь для чтения данных массива.

Эти две вспомогательные функции анализируют массивы JSON. Эти функции обрабатывают формат JSON, удаляя скобки и кавычки, а затем разбивая текст по запятым.

ParseJsonArray()

анализирует массивы строк JSON, такие как ["MU", "NVDA", "MPWR", "MCHP"]

// Helper function to parse JSON array of strings
void ParseJsonArray(string json, string &array[])
  {
// Remove brackets and quotes from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]
   StringReplace(cleaned, "\"", ""); // Remove quotes

// Split by commas
   StringSplit(cleaned, ',', array);

// Trim whitespace from each element
   for(int i = 0; i < ArraySize(array); i++)
     {
      array[i] = StringTrim(array[i]);
     }
  }

ParseJsonDoubleArray()

анализирует массивы чисел в формате JSON, например [2.699439, 1.000000, -1.877447, -2.505294]

// Helper function to parse JSON array of doubles
void ParseJsonDoubleArray(string json, double &array[])
  {
// Remove brackets from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]

// Split by commas
   string stringArray[];
   int count = StringSplit(cleaned, ',', stringArray);

// Convert to double array
   ArrayResize(array, count);

   for(int i = 0; i < count; i++)
     {
      array[i] = StringToDouble(StringTrim(stringArray[i]));
     }
  }

Добавлено

StringTrim()

функция для удаления всех пробелов, которые могут присутствовать в значениях JSON.

// Helper function to trim whitespace
string StringTrim(string str)
  {

// Trim leading whitespace
   while(StringLen(str) > 0 && str[0] == ' ')
     {
      str = StringSubstr(str, 1);
     }

// Trim trailing whitespace
   while(StringLen(str) > 0 && str[StringLen(str) - 1] == ' ')
     {
      str = StringSubstr(str, 0, StringLen(str) - 1);
     }

   return str;
  }

В MQL5 Codebase имеются библиотеки JSON. Если вы все чаще используете объекты JSON, возможно, стоит обратить на них внимание и оценить их преимущества. В этом простом случае, на мой взгляд, преимущества недостаточны, чтобы оправдать использование внешней зависимости.

Вспомогательная функция

StringToTimeframe()

преобразует строку, обозначающую временной интервал базы данных, в соответствующее значение ENUM_TIMEFRAMES языка MQL5.

// Helper function to convert tf string to ENUM_TIMEFRAMES
ENUM_TIMEFRAMES StringToTimeframe(string tfStr)
  {
   if(tfStr == "M1")
      return PERIOD_M1;
   if(tfStr == "M2")
      return PERIOD_M2;
.
.
.
   if(tfStr == "MN1")
      return PERIOD_MN1;
   return PERIOD_CURRENT; // Default if not found
  }

Помимо скрининга параметров стратегии в функции OnInit(), вызываемой при запуске или перезапуске советника, мы также вызываем эту же функцию с регулярным интервалом в обработчике событий OnTimer(). Этот интервал времени является параметром, задаваемым пользователем.

// Set a timer for spread, mean, and stdev calculations
// and strategy parameters update (check DB)
   EventSetTimer(InpUpdateFreq * 60); // min one minute

void OnTimer(void)
  {
   ResetLastError();
// Wrapper around LoadStrategyFromDB: for clarity
   if(!UpdateModelParams(InpDbFilename,
                         InpStrategyName,
                         symbols,
                         weights,
                         timeframe))
     {
      printf("%s failed: %s", __FUNCTION__, GetLastError());
     }

Функция UpdateModelParams() представляет собой просто оболочку для функции LoadStrategyFromDB(). 

//+------------------------------------------------------------------+
//|          Wrapper to load strategy from db                        |
//+------------------------------------------------------------------+

// Load strategy parameters from database
bool UpdateModelParams(string db_name,
                       string strat_name,
                       string &symbols_arr[],
                       double &weights_arr[],
                       ENUM_TIMEFRAMES tf)
  {
   return LoadStrategyFromDB(db_name, strat_name, symbols_arr, weights_arr, tf);
  }

Это позволяет более четко понять назначение функции.


Заключение

В сбалансированной корзине коинтегрированных акций, используемой в стратегиях возврата к среднему значению, веса портфеля должны постоянно обновляться. В этой статье мы увидели, что у нас всегда есть возможность получить актуальные параметры стратегии, включая сами тикеры корзины.

Мы рассказали о том, как можно использовать встроенную в MetaTrader 5 базу данных SQLite в качестве своего рода интерфейса между нашей системой анализа данных и торговой средой, сохраняя при этом разделение этих двух процессов.

Чтобы облегчить задачу тем читателям, кто, возможно, не следит за этой серией статей, мы предоставляем единый скрипт на Python, содержащий полный конвейер статистического арбитража для коинтегрированных акций. Скрипт за один проход выполняет тесты на корреляцию, коинтеграцию и стационарность, а также подготавливает базу данных с актуальными параметрами стратегии, готовую к использованию советником.


Ссылки

Дэниел П. Паломар (2025). Оптимизация портфеля: Теория и практика. Издательство Кембриджского университета.

Название файла Описание
StatArb/db-setup.mq5 Скрипт MQL5 для создания и инициализации базы данных SQLite путем чтения файла схемы schema-0.2.sql.
StatArb/db-update-statarb-0.2.mq5 Служба MQL5 для обновления базы данных SQLite данными о последних свечах с ценой закрытия.
StatArb/Nasdaq_NVDA_Coint.mql5 Этот файл содержит пример советника.
StatArb/Nasdaq_NVDA_Coint.mqh Этот файл содержит пример файла заголовка советника.
StatArb/schema-0.2.sql Файл схемы SQL (DDL) для инициализации базы данных (создание таблиц, полей и ограничений).
stat_arb_pipeline_mt5.py Этот скрипт на Python запускает полный конвейер статистического анализа для коинтегрированных акций на основе теста корреляции Пирсона, теста коинтеграции Йохансена, а также тестов стационарности ADF и KPSS.

Перевод с английского произведен MetaQuotes Ltd.
Оригинальная статья: https://www.mql5.com/en/articles/19428

Прикрепленные файлы |
От начального до среднего уровня: Указатель на функцию От начального до среднего уровня: Указатель на функцию
Вы, вероятно, уже слышали о указателях, когда речь заходит о программировании. А вы знали, что мы можем использовать данные такого типа здесь, в MQL5? Это, конечно, должно быть сделано так, чтобы мы не теряли контроль и не вызывали странного поведения программы во время её выполнения. Тем не менее, поскольку это ресурс очень специфического назначения и ориентированный на определенные виды деятельности, редко можно услышать, чтобы кто-то обсуждал, что такое указатель и как его использовать в MQL5.
Торговые инструменты MQL5 (Часть 21): Добавление темы в стиле киберпанк в графики регрессии Торговые инструменты MQL5 (Часть 21): Добавление темы в стиле киберпанк в графики регрессии
В этой статье мы улучшаем инструмент построения графиков регрессии в MQL5, добавляя режим темы киберпанка с неоновым свечением, анимацией и голографическими рамками для иммерсивной визуализации. Мы интегрируем переключение тем, динамические фоны со звездами, светящимися контурами и неоновыми точками / линиями, сохраняя при этом совместимость со стандартным режимом. Эта двухтематическая система придает парному анализу футуристическую эстетику, поддерживая обновления и взаимодействия в режиме реального времени для получения полезных торговых выводов.
Особенности написания экспертов Особенности написания экспертов
Написание и тестирование экспертов в торговой системе MetaTrader 4.
Разработка инструментария для анализа Price Action (Часть 30): Советник CCI Zero Line Разработка инструментария для анализа Price Action (Часть 30): Советник CCI Zero Line
Будущее – за автоматизацией анализа движения цен. В этой статье мы используем индикатор Dual CCI (Commodity Channel Index – индекс товарного канала), стратегию пересечения нулевой линии (Zero Line Crossover), EMA и анализ движения цены, чтобы разработать инструмент, который генерирует торговые сигналы и задает уровни стоп-лосса и тейк-профита с помощью ATR. Прочитайте эту статью, чтобы узнать наш подход к разработке советника CCI Zero Line.