English Deutsch
preview
共和分株式による統計的裁定取引(第4回):リアルタイムモデル更新

共和分株式による統計的裁定取引(第4回):リアルタイムモデル更新

MetaTrader 5トレーディングシステム |
43 0
Jocimar Lopes
Jocimar Lopes

はじめに

今回の目的は、共和分モデルをリアルタイムで更新する仕組みを整えることです。より具体的には、EAが稼働している間も、ポートフォリオを構成する各銘柄の相対的な重みを、より新しい計算結果に基づいて逐次更新する必要があります。これまでの記事では説明を簡潔にするため、説明やバックテストで同じポートフォリオ比率を使い続けました。これは統計検定の意味や流れを理解しやすくするには有効でした。しかし実際の運用では、これらの比率は新しいデータポイントが到着するたび、つまり本記事の場合は新しいバーが確定するたびに更新されるものです。

その前に簡単に文脈を整えておきましょう。

私たちは現在、いわば「一般トレーダー向けの簡易版統計的アービトラージ環境」を構築しています。一般的なノートPCと通常のネットワーク帯域で稼働できるよう設計された、低コストな統計的アービトラージフレームワークです。本連載では、ペアトレード向けの高相関銘柄の抽出、ポートフォリオ構築のための共和分銘柄の抽出、さらに平均回帰戦略に必要なスプレッドの定常性検定について説明してきました。

前回の記事(第3回)ではデータベースの構築と設計上の選択について述べました。本記事はその直接の続きです。データベースが適切に構築され、定期的に更新されるようになれば、データ分析のプロセスと取引活動を分離できるようになります。

これから次の点を確認します。

  • 学んだ教訓を組み込みつつ、すべての検定を一度の実行で処理できるようにしたPythonスクリプトの修正によって資産選定を容易化した方法 

  • 最新のデータベースを稼働中に利用するためのEAコードの全面改修と、可読性および保守性向上のためにほぼすべての関数をMQL5ヘッダファイルへ移し、メインファイルにはネイティブイベントハンドラのみを残す構成への再設計をおこなった方法 

  • リアルタイムのデータ分析とオンライン取引環境の仲介インターフェースとして機能させるために施したデータベース側の改修

3番目から説明を始めます。これを理解しておくことで、残る2点の説明が明確になります。


唯一の正解源としてのデータベース

前回の記事では、モデルをリアルタイム更新するために、データベースからEAに対してどのような答えを返してほしいのか、その定義から話を始めました。これはボトムアップ設計でよく使われる典型的な「チートコード」です。つまり、まず即時に必要となる答えを尋ね、それを実装することによって、過剰な設計を避けることができ、ルート修正も容易になります。以下は前回の記事で登場した該当スニペットです。

結局のところ、私たちのデータベースは「今、どの銘柄を取引すれば最大のリターンを得られるのか」という非常に単純な問いに答えられれば良いのです。

前回の記事で、株式バスケットのポートフォリオ比率に基づいて注文の方向と数量を定義したことを思い返すと、データベースが返す理想的な回答の一例は次のような形式になります。

symbolsweightstimeframe
"MU", "NVDA", "MPWR", "MCHP"2.699439, 1.000000, -1.877447, -2.505294D1 (daily timeframe)

表1:リアルタイムモデル更新のための想定クエリ応答サンプル(架空データ)(前回の記事より)

この種の答えを得るために、データベーススキーマに新しいテーブル「strategy」を追加しました。このテーブルには4つのTEXTフィールドがあります。
  • Name PRIMARY KEY
  • Symbols
  • Weights
  • Timeframe
-- strategy definition

CREATE TABLE strategy (
        name TEXT CHECK(LENGTH(name) <= 20) NOT NULL,
        symbols TEXT CHECK(LENGTH(symbols) <= 255) NOT NULL,
        weights TEXT CHECK(LENGTH(weights) <= 255),
        timeframe TEXT CHECK (
        timeframe IN (
            'M1',
            'M2',
            'M3',
            'M4',
            'M5',
            'M6',
            'M10',
            'M12',
            'M15',
            'M20',
            'M30',
            'H1',
            'H2',
            'H3',
            'H4',
            'H6',
            'H8',
            'H12',
            'D1',
            'W1',
            'MN1'
        )
    ),CONSTRAINT strategy_pk PRIMARY KEY (name)
) WITHOUT ROWID, STRICT;
このテーブルをWITHOUT ROWIDテーブルとして定義しました。SQLiteにおいて、ROWIDテーブルとは、仮想テーブルおよびINTEGER PRIMARY KEY制約を持つテーブルを除く通常のテーブルのことです。ROWIDテーブルでは、PRIMARY KEYは実際の意味での主キーではありません。
「ROWIDテーブルにおけるPRIMARY KEY制約(真の主キーやINTEGER PRIMARY KEYでない限り)は、実質的にUNIQUE制約と同じものである。真の主キーではないため、PRIMARY KEYを構成するカラムにはNULLを許容しており、これはSQL標準すべてに反しています。」(強調は筆者による)

したがって、テーブルをWITHOUT ROWIDとして定義することで、NULLが挿入されることを防ぎ、結果としてリレーショナル整合性が損なわれるのを回避できます。欠点として、この句はSQLite特有であり、私たちのスキーマは他のRDBMSに対して完全な移植性を持たなくなります。他のRDBMSで同じスキーマを使うには適応が必要になります。しかし、主キー項目にNULLが挿入されないという利点は、そのコストに十分見合うと考えています。

すでにお気付きかもしれませんが、symbolsとweightsはどちらも配列です。しかしSQLiteにはARRAYデータ型がありません。この制約のため、これらのデータはTEXTとして挿入しますが、その際JSON配列としてフォーマットします。この措置により、将来ARRAY型を持つ他のRDBMSに移行する場合に作業が容易になります。 

namesymbolsweightstimeframe
Nasdaq_NVDA_Coint‘["MU", "NVDA", "MPWR", "MCHP"]’‘[“2.699439”, “1.000000”, “-1.877447”, “-2.505294”]’D1

表2:symbolsとweightsのデータは、JSON配列としてフォーマットしたTEXTデータ型でstrategyテーブルに挿入される

これらをクエリする際には、SQLiteのjson_extract()関数を使用します。 

このようにして、statarb-0.2へとアップグレードされた私たちのデータベーススキーマには新たにstrategyテーブルが加わり、そのエンティティリレーションシップ図は以下のようになります。

図1:statarb-0.2データベースのエンティティリレーションシップ図(ERD)

図1:statarb-0.2データベースのエンティティリレーションシップ図(ERD)

ヒント:SQLiteで開発を進める際、私はdbファイルの古いバージョンを削除しないようにしています。スキーマファイルの古いバージョンも同様です。単一ファイルで構成されているため、ファイルを複製してコピーに新バージョン名を付け、両方をバージョン管理システム(VCS)に保持するのは容易です。この習慣により、万一の場合に備えた簡易的な「ロールバック経路」が確保できます。

図2:MetaEditorのナビゲータに表示されたSQLiteファイル(データベース)の異なるバージョンと対応するスキーマファイル

図2:MetaEditorのナビゲータに表示されたSQLiteファイル(データベース)の異なるバージョンと対応するスキーマファイル

上のERDが示すように、新しいstrategyテーブルは現段階ではまだ他のどのテーブルとも接続されていません。これは、この新しいEAバージョンを使ってバックテストおよび取引を開始した時点で、tradeテーブルと接続されることで変わります。繰り返しになりますが、ボトムアップ方式で作業しているため、必要が生じた時、そしてその時に限りデータベースを変更します。現状ではstrategyテーブルは単なるインターフェースであり、データ分析環境と取引環境を結ぶ通信手段に過ぎません。

データベースは、データ分析と取引用EAの間のインターフェースとして、またスクリーニング(つまりデータ分析の適用)とEAの間のインターフェースとして機能します。流れは次のようになります。

図3:統計的アービトラージ取引パイプラインの循環的な性質を示すフロー図

図3:統計的アービトラージ取引パイプラインの循環的な性質を示すフロー図


Python側

私たちは、従来タスクごとに個別のJupyter Notebookを使っていたものを、統合されたPythonスクリプトに移行しました。このスクリプトは、共和分株式の統計的アービトラージ取引パイプライン全体を実行します。このスクリプトでは以下の処理をおこないます。

SQLiteデータベースが存在するかどうかを確認します。

def CheckForDb(db_path: str | Path) -> bool:
    """Return True if the SQLite database file exists on disk."""
    exists = Path(db_path).is_file()
    logger.info("Database exists: %s (%s)", exists, db_path)
    return exists

データベースが見つからなければ作成します。本連載を読んでいない方でも、すぐに統計的アービトラージ取引パイプラインを試せるように配慮しています。必要なのは、データベーススキーマを提供することだけです(添付参照)。

def CreateDbIfMissing(db_path: str | Path, schema_path: Optional[str | Path] = None) -> None:
    """
    If the database does not exist and a schema is provided, create it from SQL.
    Otherwise, create an empty file (SQLite creates DB on first connection).
    """
    db_path = Path(db_path)
    if db_path.exists():
        logger.info("DB already present: %s", db_path)
        return
    logger.info("DB missing; creating: %s", db_path)

    with sqlite3.connect(db_path) as conn:
        if schema_path:
            schema_sql = Path(schema_path).read_text(encoding="utf-8")
            conn.executescript(schema_sql)
            logger.info("Applied schema from: %s", schema_path)

MetaTrader 5ターミナルに接続します。

def InitMT5(login: Optional[int] = None, password: Optional[str] = None,
            server: Optional[str] = None, path: Optional[str] = None) -> None:

    """Initialize connection to the running MetaTrader 5 terminal."""

    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")

    # if not mt5.initialize(path=path, login=login, password=password, server=server):
    if not mt5.initialize():
        raise RuntimeError(f"MT5 initialize() failed: {mt5.last_error()}")
    logger.info("MetaTrader5 initialized.")

MetaTrader 5から市場データをダウンロードします。

def DownloadMarketData(symbols: List[str], timeframes: List[str], bars: int = 5000
                       ) -> Dict[Tuple[str, str], pd.DataFrame]:
    """
    Download OHLCV data from MT5 for each (symbol, timeframe).
    Returns dict with keys (symbol, timeframe) -> DataFrame columns:
    ['tstamp','timeframe','open','high','low','close','tick_volume','real_volume','spread']
    """
    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")
    out: Dict[Tuple[str, str], pd.DataFrame] = {}

    for sym in symbols:
        for tf in timeframes:
            tf_const = _mt5_timeframe_constant(tf)
            rates = mt5.copy_rates_from_pos(sym, tf_const, 0, bars)

            if rates is None or len(rates) == 0:
                logger.warning("No data for %s %s", sym, tf)
                continue

            df = pd.DataFrame(rates)

            # Rename & select
            df = df.rename(columns={
                "time": "tstamp",
                "open": "price_open",
                "high": "price_high",
                "low": "price_low",
                "close": "price_close",
                "real_volume": "real_volume",
            })

            df["timeframe"] = tf

            cols = ["tstamp","timeframe","price_open","price_high","price_low","price_close",
                    "tick_volume","real_volume","spread"]
            df = df[cols].copy()
            out[(sym, tf)] = df
            logger.info("Downloaded %d bars for %s %s", len(df), sym, tf)
    return out

この市場データをSQLiteデータベースに保存します。

def StoreMarketData(db_path: str | Path, data: Dict[Tuple[str, str], pd.DataFrame]) -> None:

    """Upsert market data into SQLite according to the provided schema."""
    with sqlite3.connect(db_path) as conn:
        for (sym, tf), df in data.items():
            symbol_id = _ensure_symbol(conn, sym)

            df = df.copy()
            df["symbol_id"] = symbol_id

            # Use executemany for performance
            sql = """
            INSERT OR REPLACE INTO market_data
            (tstamp, timeframe, price_open, price_high, price_low, price_close,
             tick_volume, real_volume, spread, symbol_id)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """

            rows = list(df[["tstamp","timeframe","price_open","price_high","price_low","price_close",
                            "tick_volume","real_volume","spread","symbol_id"]].itertuples(index=False, name=None))
            conn.executemany(sql, rows)
            conn.commit()

            logger.info("Stored %d rows for %s %s", len(rows), sym, tf)

選択した銘柄のピアソン相関を確認します。

def RunCorrelationTest(prices: pd.DataFrame, plot: bool = True) -> Tuple[pd.DataFrame, Tuple[str, str], float]:

    """Compute Pearson correlation matrix and return the most correlated pair (off-diagonal)."""
    corr = prices.corr(method="pearson")

    # Mask diagonal, find max
    corr_values = corr.where(~np.eye(len(corr), dtype=bool))
    idxmax = np.unravel_index(np.nanargmax(corr_values.values), corr_values.shape)
    symbols = corr.columns.tolist()
    pair = (symbols[idxmax[0]], symbols[idxmax[1]])
    value = corr_values.values[idxmax]

    logger.info("Most correlated pair: %s ~ %s (r=%.4f)", pair[0], pair[1], value)

    if plot:
        plt.figure(figsize=(10, 7))
        sns.heatmap(corr, annot=False, vmin=-1, vmax=1, cmap="coolwarm", square=True)
        plt.title("Pearson Correlation Heatmap")
        plt.tight_layout()
        plt.show()

    print(f"Most correlated pair: {pair[0]} ~ {pair[1]} (r={value:.4f})")

    return corr, pair, float(value)

選択した銘柄のエングル=グレンジャー共和分係数を計算し、最も共和分性の高いペアを特定します。

def RunEngleGrangerTest(prices: pd.DataFrame, plot: bool = True
                        ) -> Tuple[Tuple[str, str], float, float, pd.Series]:
    """
    Loop over all symbol pairs, run statsmodels.coint (Engle-Granger). Return:
        best_pair, best_pvalue, hedge_ratio (y_on_x), spread series (y - beta*x)
    """
    best_pair, best_p, best_beta, best_spread = None, 1.0, np.nan, None
    cols = list(prices.columns)
    for y_sym, x_sym in itertools.combinations(cols, 2):
        y = prices[y_sym].dropna()
        x = prices[x_sym].dropna()
        join = pd.concat([y, x], axis=1).dropna()

        if len(join) < 50:
            continue

        score, pvalue, _ = coint(join.iloc[:,0], join.iloc[:,1])

        if pvalue < best_p:
            beta = _ols_hedge_ratio(join.iloc[:,0], join.iloc[:,1])
            spread = join.iloc[:,0] - beta * join.iloc[:,1]
            best_pair, best_p, best_beta, best_spread = (y_sym, x_sym), pvalue, beta, spread

    if best_pair is None:
        raise ValueError("No cointegrated pairs found (Engle-Granger) with sufficient data.")

    print(f"Most cointegrated pair (Engle-Granger): {best_pair[0]} ~ {best_pair[1]} (p={best_p:.6f}, beta={best_beta:.4f})")

    if plot:
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(prices.index, prices[best_pair[0]], label=best_pair[0])
        ax.plot(prices.index, prices[best_pair[1]]*best_beta, label=f"{best_pair[1]} * beta")
        ax.set_title(f"Best EG Pair Overlay (beta={best_beta:.4f})")
        ax.legend()
        fig.tight_layout()
        plt.show()


      fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(best_spread.index, best_spread.values, label="Spread") ax2.axhline(0, line) ax2.set_title("Spread (y - beta*x)")

        ax2.legend() fig2.tight_layout()

        plt.show()

    return best_pair, float(best_p), float(best_beta), best_spread

エングル=グレンジャースプレッドに対して定常性検定(ADFおよびKPSS)を実行します。

def RunADFTest(series: pd.Series) -> Tuple[float, float, dict]:
    """Augmented Dickey-Fuller test on a univariate series."""
    series = pd.Series(series).dropna()
    adf_stat, pvalue, usedlag, nobs, crit, icbest = adfuller(series, autolag="AIC")

    print(f"ADF: stat={adf_stat:.4f}, p={pvalue:.6f}, lags={usedlag}, nobs={nobs}, crit={crit}")

    return float(adf_stat), float(pvalue), crit
def RunKPSSTest(series: pd.Series, regression: str = "c") -> Tuple[float, float, dict]:

    """KPSS stationarity test on a univariate series (regression='c' or 'ct')."""
    series = pd.Series(series).dropna()
    stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

    print(f"KPSS({regression}): stat={stat:.4f}, p={pvalue:.6f}, lags={lags}, crit={crit}")

    return float(stat), float(pvalue), crit

ジョハンセン共和分検定を実行します(ランク>=1の場合、第一固有ベクトルからポートフォリオ比率を計算)。

def RunJohansenTest(prices: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, plot: bool = True
                    ) -> Tuple[int, np.ndarray, np.ndarray, Optional[pd.Series]]:
    """
    Johansen cointegration test on a set of symbols (>=2).
    Returns: (rank, eigenvectors (beta), eigenvalues, spread_from_first_vector or None)
    - Significant rank determined by trace statistics vs 95% critical values.
    - First eigenvector provides portfolio weights (normalized s.t. min(|w|)=1 or w[-1]=1).
    """

    data = prices.dropna().values

    if data.shape[1] < 2:
        raise ValueError("Johansen requires at least two series.")

    joh = coint_johansen(data, det_order, k_ar_diff)

    # Significant rank by trace statistic > crit value (95% = column index 1)
    trace_stats = joh.lr1  # trace stats
    crit_vals = joh.cvt[:, 1]  # 95%
    rank = int(np.sum(trace_stats > crit_vals))
    print("Johansen trace stats:", trace_stats)

    print("Johansen 95% crit vals:", crit_vals)

    print(f"Johansen inferred rank: {rank}")

    weights = None
    spread = None

    if rank >= 1:

        # Beta columns are cointegrating vectors; take the first (associated with largest eigenvalue)
        beta = joh.evec  # shape (k, k)
        eigvals = joh.eig  # shape (k,)

        # Sort by eigenvalue desc to be explicit
        order = np.argsort(eigvals)[::-1]
        beta_sorted = beta[:, order]
        eig_sorted = eigvals[order]
        first_vec = beta_sorted[:, 0]

        # Normalize such that the weight of the last symbol is 1 (common in practice)
        if first_vec[-1] == 0:
            norm = np.max(np.abs(first_vec))
        else:
            norm = first_vec[-1]

        weights = first_vec / norm

        print("First Johansen eigenvector (portfolio weights):")

        for sym, w in zip(prices.columns, weights):
            print(f"  {sym:>12s}: {w:+.6f}")

        # Spread = P * weights
        spread = pd.Series(prices.values @ weights, index=prices.index, name="JohansenSpread")

        if plot:
            fig, ax = plt.subplots(figsize=(10, 4))
            ax.plot(spread.index, spread.values, label="Johansen Spread")
            ax.axhline(0, linestyle="--")
            ax.set_title("Johansen Spread (1st eigenvector)")
            ax.legend()
            fig.tight_layout()

            plt.show()

        return rank, beta_sorted, eig_sorted, spread, weights

    return rank, joh.evec, joh.eig, weights, None

ジョハンセン共和分検定で任意のランクが有意であれば、スクリプトはジョハンセンスプレッドに対してADFおよびKPSS検定を実行し、最後にstrategyテーブルに戦略パラメータ(銘柄、比率、時間足)を保存します。

def StoreJohansenStrategy(db_path: str | Path, name: str, symbols: List[str],
                          weights: np.ndarray, timeframe: str) -> None:
    """
    Store Johansen eigenvector (portfolio weights) and symbols into the strategy table.
    Symbols and weights are stored as JSON arrays (text).
    """

    import json

    with sqlite3.connect(db_path) as conn:
        cur = conn.cursor()
        cur.execute("""
            INSERT OR REPLACE INTO strategy (name, symbols, weights, timeframe)
            VALUES (?, ?, ?, ?)
        """, (
            name,
            json.dumps(symbols),
            json.dumps([float(w) for w in weights]),
            timeframe
        ))
        conn.commit()

    logger.info("Stored Johansen strategy '%s' with symbols=%s timeframe=%s",
                name, symbols, timeframe)

利便性のため、スクリプトは関連する各セクション(スプレッドなど)で処理を停止してグラフをプロットします。これにより、選択した銘柄がより詳細な評価に値するかどうかを簡単に判断でき、必要に応じて銘柄の順序(エングル=グレンジャー用)や時間足を変更できます。このスクリプトは、次のステップで完全自動化されたパイプラインが整う前の手動スクリーニング用ツールとして機能します。各タスクは独立した関数としてカプセル化されているため、他のスクリプトに簡単にインポート可能です。

警告:このスクリプトはシステムのエントリーポイントです。新しい銘柄リストをここで定義した直後に、db-updateサービスに登録してください。これにより、EAが動作していない場合でもデータベースが安定したペースで更新されます。このステップは現時点では手動でおこなう必要があります。

スクリプトを実行するには、コマンドラインからパラメータ付きで呼び出します。例は以下の通りです。

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "D1" --bars 1000 --run

ご覧の通り、dbパスのパラメータには環境変数を使用しています。すべて順調にいけば、ジョハンセン検定により共和分が確認されなかった場合、以下のような結果が表示されます。

2025-09-03 15:35:24,588 [INFO] Database exists:True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

2025-09-03 15:35:24,589 [INFO] Using existing DB at C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db.

2025-09-03 15:35:24,605 [INFO] MetaTrader 5 initialized.

2025-09-03 15:35:24,613 [INFO] Downloaded 1000 bars for MU D1  

2025-09-03 15:35:24,618 [INFO] Downloaded 1000 bars for NVDA D1

2025-09-03 15:35:24,625 [INFO] Downloaded 1000 bars for MPWR D1

2025-09-03 15:35:24,633 [INFO] Downloaded 1000 bars for MCHP D1

2025-09-03 15:35:24,997 [INFO] Stored 1000 rows for MU D1

2025-09-03 15:35:25,261 [INFO] Stored 1000 rows for NVDA D1

2025-09-03 15:35:25,514 [INFO] Stored 1000 rows for MPWR D1

2025-09-03 15:35:25,870 [INFO] Stored 1000 rows for MCHP D1

2025-09-03 15:35:25,871 [INFO] MetaTrader 5 shutdown.

2025-09-03 15:35:25,961 [INFO] ReadMarketData:D1 timeframe, 1000 rows, 4 symbols

2025-09-03 15:35:25,964 [INFO] Most correlated pair:MPWR ~ NVDA (r=0.8335)

Most correlated pair:MPWR ~ NVDA (r=0.8335)

Most cointegrated pair (Engle-Granger):MPWR ~ MU (p=0.010558, beta=5.4940)


エングル=グレンジャースプレッドに対するADFおよびKPSS検定:

ADF: stat=-3.8778, p=0.002204, lags=16, nobs=983, crit={'1%': np.float64(-3.4370198458812156), '5%': np.float64(-2.864484708707697), '10%': np.float64(-2.568337912084273)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321:InterpolationWarning:The test statistic is outside of the range of p-values available in the look-up table.The actual p-value is smaller than the p-value returned.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0.9279, p=0.010000, lags=19, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}

提供されたすべての銘柄に対するジョハンセン検定:

Johansen trace stats: [34.26486877 18.91156161  8.04865462  0.11018312]

Johansen 95% crit vals: [47.8545 29.7961 15.4943  3.8415]

Johansen inferred rank:0

上述の通り、ジョハンセン検定で有意なランク(1以上)が得られた場合、つまり選択した銘柄間に少なくとも1つの共和分ベクトルが存在する場合、スクリプトは最初の固有ベクトルの値(ポートフォリオの比率)をデータベースのstrategyテーブルに保存します。また、スプレッドに対してADF検定とKPSS検定も実行されます。

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "H4" --bars 90 --run   

この場合、Windows PowerShellまたはターミナルに次のようなものが表示されます。

2025-09-03 15:50:43,168 [INFO] Database exists:True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

2025-09-03 15:50:43,169 [INFO] Using existing DB at C:\Users\your\path\to\Files\StatArb\statarb-0.2.db.

2025-09-03 15:50:43,188 [INFO] MetaTrader 5 initialized.

2025-09-03 15:50:44,297 [INFO] Downloaded 90 bars for MU H4

2025-09-03 15:50:44,357 [INFO] Downloaded 90 bars for NVDA H4

2025-09-03 15:50:46,322 [INFO] Downloaded 90 bars for MPWR H4

2025-09-03 15:50:46,732 [INFO] Downloaded 90 bars for MCHP H4

2025-09-03 15:50:49,148 [INFO] Stored 90 rows for MU H4

2025-09-03 15:50:49,160 [INFO] Stored 90 rows for NVDA H4

2025-09-03 15:50:49,171 [INFO] Stored 90 rows for MPWR H4

2025-09-03 15:50:49,276 [INFO] Stored 90 rows for MCHP H4

2025-09-03 15:50:49,277 [INFO] MetaTrader 5 shutdown.

2025-09-03 15:50:49,617 [INFO] ReadMarketData:H4 timeframe, 90 rows, 4 symbols

2025-09-03 15:50:49,622 [INFO] Most correlated pair:MPWR ~ NVDA (r=0.6115)

Most correlated pair:MPWR ~ NVDA (r=0.6115)

Most cointegrated pair (Engle-Granger):MU ~ NVDA (p=0.072055, beta=-0.2391)


エングル=グレンジャースプレッドに対するADFおよびKPSS検定:

ADF: stat=-3.1693, p=0.021831, lags=0, nobs=89, crit={'1%': np.float64(-3.506057133647011), '5%': np.float64(-2.8946066061911946), '10%': np.float64(-2.5844100201994697)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321:InterpolationWarning:The test statistic is outside of the range of p-values available in the look-up table.The actual p-value is greater than the p-value returned.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0.1793, p=0.100000, lags=5, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}


提供されたすべての銘柄に対するジョハンセン検定:

Johansen trace stats: [63.23695088 32.35462044 16.63904745  5.4295422 ]

Johansen 95% crit vals: [47.8545 29.7961 15.4943  3.8415]

Johansen inferred rank:4

First Johansen eigenvector (portfolio weights):

          MCHP: +0.273652

          MPWR: -0.148092

            MU: +2.256498

          NVDA: +1.000000

ADF & KPSS on Johansen spread:

ADF: stat=-4.1427, p=0.000823, lags=1, nobs=88, crit={'1%': np.float64(-3.506944401824286), '5%': np.float64(-2.894989819214876), '10%': np.float64(-2.584614550619835)}

KPSS(c): stat=0.4009, p=0.076784, lags=4, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}

2025-09-03 15:50:50,051 

[INFO] Stored Johansen strategy 'Nasdaq_NVDA_Coint' with symbols=['MCHP', 'MPWR', 'MU', 'NVDA'] timeframe=H4


上記の補間警告についてはどうでしょうか。

これらの補間警告は、エングル=グレンジャースプレッドに対するKPSS定常性検定に関連しています。警告は、検定の参照表の範囲を超えた極端な結果が出たことを示しています。具体的には、エングル=グレンジャースプレッドが次のいずれかであることを示しています。

  • 強く定常的である(返されたp値より大きい) 
  • 強く非定常的である(返されたp値より小さい)

現在のところ、リアルタイムでモデルパラメータを更新している段階では、これらの警告は安全に無視して構いません。次のステップ、資産のスクリーニングをおこなう段階で詳しく確認することになります。


MQL5側

メインファイルにはEAのイベントハンドラのみを残し、その他の関数はすべてNasdaq_NVDA_Coint.mqhヘッダファイルに移動しました。

以前は戦略パラメータがハードコードされていましたが、現在はグローバル変数として管理されています。 

// Global variables
string symbols[] = {}; // Asset symbols
double weights[] = {}; // Portfolio weights
ENUM_TIMEFRAMES timeframe = PERIOD_CURRENT; // Strategy cointegrated timeframe

OnInit()イベントハンドラ内でこれらの変数をチェックし、必要に応じてデータベースからロードします。

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {
   ResetLastError();

// Check if all symbols are available
   for(int i = 0; i < ArraySize(symbols); i++)
     {
      if(!SymbolSelect(symbols[i], true))
        {
         Print("Error: Symbol ", symbols[i], " not found!");
         return(INIT_FAILED);
        }
     }
// Initialize spread buffer
   ArrayResize(spreadBuffer, InpLookbackPeriod);

// Set a timer for spread, mean, and stdev calculations
   EventSetTimer(InpUpdateFreq * 60); // min one minute

// Load strategy parameters from database
   if(!LoadStrategyFromDB(InpDbFilename,
                                InpStrategyName,
                                symbols,
                                weights,
                                timeframe))
     {
      // Handle error - maybe use default values
      printf("Error at " + __FUNCTION__ + " %s ",
             getUninitReasonText(GetLastError()));
      return INIT_FAILED;
     }
   return(INIT_SUCCEEDED);
  }

LoadStrategyFromDB()関数はヘッダファイル内に実装されています。

//+------------------------------------------------------------------+
//|          Load strategy parameter from database                   |
//+------------------------------------------------------------------+

bool LoadStrategyFromDB(string db_name,
                        string strat_name,
                        string &symbols_arr[],
                        double &weights_arr[],
                        ENUM_TIMEFRAMES &tf)
  {

// Open the database (Metatrader's integrated SQLite)
   int db = DatabaseOpen(db_name, DATABASE_OPEN_READONLY);
   if(db == INVALID_HANDLE)
     {
      Print("Failed to open database: %s", GetLastError());
      return false;
     }
// Prepare the SQL query with json_extract
   string query = StringFormat(
                     "SELECT "
                     "json_extract(symbols, '$') as symbols_json, "
                     "json_extract(weights, '$') as weights_json, "
                     "timeframe "
                     "FROM strategy WHERE name = '%s'",
                     strat_name
                  );

// Execute the query
   int result = DatabasePrepare(db, query);

   if(result <= 0)
     {
      Print("Failed to prepare query: ", GetLastError());
      DatabaseClose(db);
      return false;
     }

// Read the results
   if(DatabaseRead(result))
     {
      // Get symbols_arr JSON array and parse it
      string symbolsJson;
      DatabaseColumnText(result, 0, symbolsJson);
      ParseJsonArray(symbolsJson, symbols_arr);

      // Get weights_arr JSON array and parse it
      string weightsJson;
      DatabaseColumnText(result, 1, weightsJson);
      ParseJsonDoubleArray(weightsJson, weights_arr);

      // Get tf string and convert to ENUM_TIMEFRAMES
      string timeframeStr;
      DatabaseColumnText(result, 2, timeframeStr);
      tf = StringToTimeframe(timeframeStr);

      Print("Successfully loaded strategy: ", strat_name);

      Print("Symbols JSON: ", symbolsJson);
      Print("Weights JSON: ", weightsJson);
      Print("Timeframe: ", timeframeStr);
     }
   else
     {
      Print("Strategy not found: ", strat_name);

      DatabaseFinalize(result);
      DatabaseClose(db);
      return false;
     }
// Clean up

   DatabaseFinalize(result);
   DatabaseClose(db);
   return true;
  }

前述の通り、symbols配列とポートフォリオのweights配列は、SQLiteのjson_extract()関数を用いて読み込みます。署名は次のようになります。

json_extract(json, path)

ここで

  • json:JSONデータを格納したTEXT型列または文字列
  • path:JSON内で抽出したい値の位置を指定するJSONパス式。通常「$」でJSONドキュメントのルートを示し、その後ドット表記($.key)でオブジェクトのキーを、またはブラケット表記($[index])で配列のインデックスを指定します。本ケースでは配列データを読み取るため、ブラケット表記を使用しています。

このJSON配列をMQL5配列に変換するためのヘルパー関数を2つ用意しています。これらの関数はJSONの形式を処理し、括弧や引用符を削除した後、カンマで分割して配列として扱えるようにします。

ParseJsonArray()

["MU", "NVDA", "MPWR", "MCHP"]のようなJSON文字列配列を解析します。

// Helper function to parse JSON array of strings
void ParseJsonArray(string json, string &array[])
  {
// Remove brackets and quotes from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]
   StringReplace(cleaned, "\"", ""); // Remove quotes

// Split by commas
   StringSplit(cleaned, ',', array);

// Trim whitespace from each element
   for(int i = 0; i < ArraySize(array); i++)
     {
      array[i] = StringTrim(array[i]);
     }
  }

ParseJsonDoubleArray()

[2.699439, 1.000000, -1.877447, -2.505294]のようなJSON数値配列を解析します。

// Helper function to parse JSON array of doubles
void ParseJsonDoubleArray(string json, double &array[])
  {
// Remove brackets from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]

// Split by commas
   string stringArray[];
   int count = StringSplit(cleaned, ',', stringArray);

// Convert to double array
   ArrayResize(array, count);

   for(int i = 0; i < count; i++)
     {
      array[i] = StringToDouble(StringTrim(stringArray[i]));
     }
  }

JSON値に存在する可能性のある空白を削除する関数

StringTrim()

を追加しました。

// Helper function to trim whitespace
string StringTrim(string str)
  {

// Trim leading whitespace
   while(StringLen(str) > 0 && str[0] == ' ')
     {
      str = StringSubstr(str, 1);
     }

// Trim trailing whitespace
   while(StringLen(str) > 0 && str[StringLen(str) - 1] == ' ')
     {
      str = StringSubstr(str, 0, StringLen(str) - 1);
     }

   return str;
  }

MQL5コードベースにはJSON処理用のライブラリが存在します。JSONオブジェクトの使用頻度や複雑さが増す場合は、それらを検討して利点を評価する価値があります。しかし、本ケースのように単純な用途では、外部ライブラリに依存するほどのメリットはあまりないと考えられます。

ヘルパー関数

StringToTimeframe()

データベースの時間足文字列を適切なMQL5 ENUM_TIMEFRAMES値に変換します。

// Helper function to convert tf string to ENUM_TIMEFRAMES
ENUM_TIMEFRAMES StringToTimeframe(string tfStr)
  {
   if(tfStr == "M1")
      return PERIOD_M1;
   if(tfStr == "M2")
      return PERIOD_M2;
.
.
.
   if(tfStr == "MN1")
      return PERIOD_MN1;
   return PERIOD_CURRENT; // Default if not found
  }

EAを起動または再起動した際に呼ばれるOnInit()で戦略パラメータを確認するのに加え、同じ関数をOnTimer()イベントハンドラでも定期的に呼び出します。この定期実行の間隔はユーザー入力パラメータで設定可能です。

// Set a timer for spread, mean, and stdev calculations
// and strategy parameters update (check DB)
   EventSetTimer(InpUpdateFreq * 60); // min one minute

void OnTimer(void)
  {
   ResetLastError();
// Wrapper around LoadStrategyFromDB: for clarity
   if(!UpdateModelParams(InpDbFilename,
                         InpStrategyName,
                         symbols,
                         weights,
                         timeframe))
     {
      printf("%s failed: %s", __FUNCTION__, GetLastError());
     }

UpdateModelParams()はLoadStrategyFromDB()の単なるラッパーです。 

//+------------------------------------------------------------------+
//|          Wrapper to load strategy from db                        |
//+------------------------------------------------------------------+

// Load strategy parameters from database
bool UpdateModelParams(string db_name,
                       string strat_name,
                       string &symbols_arr[],
                       double &weights_arr[],
                       ENUM_TIMEFRAMES tf)
  {
   return LoadStrategyFromDB(db_name, strat_name, symbols_arr, weights_arr, tf);
  }

これにより、関数の目的がより明確になります。


結論

平均回帰戦略向けの共和分株バスケットは、ポートフォリオの比率を常に更新する必要があります。本記事では、バスケットの銘柄自体を含め、常に最新の戦略パラメータを保持できる方法を示しました。

MetaTrader 5の統合SQLiteデータベースを、データ分析と取引環境の間のインターフェースとして活用し、両者を分離したまま運用できる方法も説明しました。

また、本連載を読んでいない方向けに、共和分株の統計的アービトラージ取引パイプラインを一括で実行できる統合Pythonスクリプトを提供しています。このスクリプトは相関検定、共和分検定、定常性検定を一括実行し、最新の戦略パラメータをEAが読み取れる形でデータベースに反映します。


参照文献

Daniel P. Palomar (2025).Portfolio Optimization:Theory and Application.Cambridge University Press.

ファイル名説明
StatArb/db-setup.mq5schema-0.2.sqlスキーマファイルを読み込んでSQLiteデータベースを作成および初期化するMQL5スクリプト
StatArb/db-update-statarb-0.2.mq5SQLiteデータベースを最新の確定価格バーで更新するMQL5 Service
StatArb/Nasdaq_NVDA_Coint.mql5サンプルEA
StatArb/Nasdaq_NVDA_Coint.mqhサンプルEAヘッダファイル
StatArb/schema-0.2.sqlデータベースを初期化するためのSQLスキーマファイル(DDL)。テーブル、フィールド、制約を生成します。
stat_arb_pipeline_mt5.pyこのPythonスクリプトは、共和分株に対する統計的アービトラージ取引パイプラインを一括で実行します。処理にはPearson相関検定、ジョハンセン共和分検定、そしてADFおよびKPSS定常性検定が含まれます。

MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/19428

添付されたファイル |
MQL5での取引戦略の自動化(第31回):プライスアクションに基づくスリードライブハーモニックパターンシステムの作成 MQL5での取引戦略の自動化(第31回):プライスアクションに基づくスリードライブハーモニックパターンシステムの作成
本記事では、MQL5においてピボットポイントとフィボナッチ比率に基づいて強気、弱気双方のスリードライブハーモニックパターンを識別し、ユーザーが選択できるカスタムエントリー、ストップロス、テイクプロフィット設定を用いて取引を実行するスリードライブパターンシステムを開発します。さらに、チャートオブジェクトによる視覚的フィードバックによって、トレーダーの洞察を強化します。
MQL5で自己最適化エキスパートアドバイザーを構築する(第14回):フィードバックコントローラーにおけるデータ変換を調整パラメータとして捉える MQL5で自己最適化エキスパートアドバイザーを構築する(第14回):フィードバックコントローラーにおけるデータ変換を調整パラメータとして捉える
前処理は非常に強力でありながら、しばしば軽視されがちな調整パラメータです。その存在は、より注目されるオプティマイザーや華やかなモデル構造の影に隠れています。しかし、前処理のわずかな改善は、利益やリスクに対して予想以上に大きな複利効果をもたらすことがあります。あまりにも多くの場合、このほとんど未踏の領域は単なるルーチン作業として扱われ、手段としてしか意識されません。しかし実際には、前処理は信号を直接増幅することもあれば、容易に破壊してしまうこともあるのです。
MQL5における単変量時系列への動的モード分解の適用 MQL5における単変量時系列への動的モード分解の適用
動的モード分解(DMD: Dynamic Mode Decomposition)は、主に高次元データセットに対して用いられる手法です。本稿では、DMDを単変量の時系列に適用し、その特性把握や予測に活用できることを示します。その過程で、MQL5に搭載されているDMDの実装、とりわけ新しい行列メソッドであるDynamicModeDecomposition()について詳しく解説します。
カスタム市場センチメント指標の開発 カスタム市場センチメント指標の開発
本記事では、複数の時間足を用いて市場センチメントを判定し、強気、弱気、リスクオン、リスクオフ、中立のいずれかに分類するMarket Sentimentカスタムインジケーターの開発について解説します。多時間足分析を組み合わせることで、トレーダーは市場全体の偏りと短期的な動向をより明確に把握できるようになります。