共和分株式による統計的裁定取引(第4回):リアルタイムモデル更新
はじめに
今回の目的は、共和分モデルをリアルタイムで更新する仕組みを整えることです。より具体的には、EAが稼働している間も、ポートフォリオを構成する各銘柄の相対的な重みを、より新しい計算結果に基づいて逐次更新する必要があります。これまでの記事では説明を簡潔にするため、説明やバックテストで同じポートフォリオ比率を使い続けました。これは統計検定の意味や流れを理解しやすくするには有効でした。しかし実際の運用では、これらの比率は新しいデータポイントが到着するたび、つまり本記事の場合は新しいバーが確定するたびに更新されるものです。
その前に簡単に文脈を整えておきましょう。
私たちは現在、いわば「一般トレーダー向けの簡易版統計的アービトラージ環境」を構築しています。一般的なノートPCと通常のネットワーク帯域で稼働できるよう設計された、低コストな統計的アービトラージフレームワークです。本連載では、ペアトレード向けの高相関銘柄の抽出、ポートフォリオ構築のための共和分銘柄の抽出、さらに平均回帰戦略に必要なスプレッドの定常性検定について説明してきました。
前回の記事(第3回)ではデータベースの構築と設計上の選択について述べました。本記事はその直接の続きです。データベースが適切に構築され、定期的に更新されるようになれば、データ分析のプロセスと取引活動を分離できるようになります。
これから次の点を確認します。
- 学んだ教訓を組み込みつつ、すべての検定を一度の実行で処理できるようにしたPythonスクリプトの修正によって資産選定を容易化した方法
- 最新のデータベースを稼働中に利用するためのEAコードの全面改修と、可読性および保守性向上のためにほぼすべての関数をMQL5ヘッダファイルへ移し、メインファイルにはネイティブイベントハンドラのみを残す構成への再設計をおこなった方法
- リアルタイムのデータ分析とオンライン取引環境の仲介インターフェースとして機能させるために施したデータベース側の改修
3番目から説明を始めます。これを理解しておくことで、残る2点の説明が明確になります。
唯一の正解源としてのデータベース
前回の記事では、モデルをリアルタイム更新するために、データベースからEAに対してどのような答えを返してほしいのか、その定義から話を始めました。これはボトムアップ設計でよく使われる典型的な「チートコード」です。つまり、まず即時に必要となる答えを尋ね、それを実装することによって、過剰な設計を避けることができ、ルート修正も容易になります。以下は前回の記事で登場した該当スニペットです。
結局のところ、私たちのデータベースは「今、どの銘柄を取引すれば最大のリターンを得られるのか」という非常に単純な問いに答えられれば良いのです。
前回の記事で、株式バスケットのポートフォリオ比率に基づいて注文の方向と数量を定義したことを思い返すと、データベースが返す理想的な回答の一例は次のような形式になります。
| symbols | weights | timeframe |
|---|---|---|
| "MU", "NVDA", "MPWR", "MCHP" | 2.699439, 1.000000, -1.877447, -2.505294 | D1 (daily timeframe) |
表1:リアルタイムモデル更新のための想定クエリ応答サンプル(架空データ)(前回の記事より)
この種の答えを得るために、データベーススキーマに新しいテーブル「strategy」を追加しました。このテーブルには4つのTEXTフィールドがあります。- Name PRIMARY KEY
- Symbols
- Weights
- Timeframe
-- strategy definition CREATE TABLE strategy ( name TEXT CHECK(LENGTH(name) <= 20) NOT NULL, symbols TEXT CHECK(LENGTH(symbols) <= 255) NOT NULL, weights TEXT CHECK(LENGTH(weights) <= 255), timeframe TEXT CHECK ( timeframe IN ( 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M10', 'M12', 'M15', 'M20', 'M30', 'H1', 'H2', 'H3', 'H4', 'H6', 'H8', 'H12', 'D1', 'W1', 'MN1' ) ),CONSTRAINT strategy_pk PRIMARY KEY (name) ) WITHOUT ROWID, STRICT;このテーブルをWITHOUT ROWIDテーブルとして定義しました。SQLiteにおいて、ROWIDテーブルとは、仮想テーブルおよびINTEGER PRIMARY KEY制約を持つテーブルを除く通常のテーブルのことです。ROWIDテーブルでは、PRIMARY KEYは実際の意味での主キーではありません。
したがって、テーブルをWITHOUT ROWIDとして定義することで、NULLが挿入されることを防ぎ、結果としてリレーショナル整合性が損なわれるのを回避できます。欠点として、この句はSQLite特有であり、私たちのスキーマは他のRDBMSに対して完全な移植性を持たなくなります。他のRDBMSで同じスキーマを使うには適応が必要になります。しかし、主キー項目にNULLが挿入されないという利点は、そのコストに十分見合うと考えています。
すでにお気付きかもしれませんが、symbolsとweightsはどちらも配列です。しかしSQLiteにはARRAYデータ型がありません。この制約のため、これらのデータはTEXTとして挿入しますが、その際JSON配列としてフォーマットします。この措置により、将来ARRAY型を持つ他のRDBMSに移行する場合に作業が容易になります。
| name | symbols | weights | timeframe |
|---|---|---|---|
| Nasdaq_NVDA_Coint | ‘["MU", "NVDA", "MPWR", "MCHP"]’ | ‘[“2.699439”, “1.000000”, “-1.877447”, “-2.505294”]’ | D1 |
表2:symbolsとweightsのデータは、JSON配列としてフォーマットしたTEXTデータ型でstrategyテーブルに挿入される
これらをクエリする際には、SQLiteのjson_extract()関数を使用します。
このようにして、statarb-0.2へとアップグレードされた私たちのデータベーススキーマには新たにstrategyテーブルが加わり、そのエンティティリレーションシップ図は以下のようになります。

図1:statarb-0.2データベースのエンティティリレーションシップ図(ERD)
ヒント:SQLiteで開発を進める際、私はdbファイルの古いバージョンを削除しないようにしています。スキーマファイルの古いバージョンも同様です。単一ファイルで構成されているため、ファイルを複製してコピーに新バージョン名を付け、両方をバージョン管理システム(VCS)に保持するのは容易です。この習慣により、万一の場合に備えた簡易的な「ロールバック経路」が確保できます。
図2:MetaEditorのナビゲータに表示されたSQLiteファイル(データベース)の異なるバージョンと対応するスキーマファイル
上のERDが示すように、新しいstrategyテーブルは現段階ではまだ他のどのテーブルとも接続されていません。これは、この新しいEAバージョンを使ってバックテストおよび取引を開始した時点で、tradeテーブルと接続されることで変わります。繰り返しになりますが、ボトムアップ方式で作業しているため、必要が生じた時、そしてその時に限りデータベースを変更します。現状ではstrategyテーブルは単なるインターフェースであり、データ分析環境と取引環境を結ぶ通信手段に過ぎません。
データベースは、データ分析と取引用EAの間のインターフェースとして、またスクリーニング(つまりデータ分析の適用)とEAの間のインターフェースとして機能します。流れは次のようになります。

図3:統計的アービトラージ取引パイプラインの循環的な性質を示すフロー図
Python側
私たちは、従来タスクごとに個別のJupyter Notebookを使っていたものを、統合されたPythonスクリプトに移行しました。このスクリプトは、共和分株式の統計的アービトラージ取引パイプライン全体を実行します。このスクリプトでは以下の処理をおこないます。
SQLiteデータベースが存在するかどうかを確認します。
def CheckForDb(db_path: str | Path) -> bool: """Return True if the SQLite database file exists on disk.""" exists = Path(db_path).is_file() logger.info("Database exists: %s (%s)", exists, db_path) return exists
データベースが見つからなければ作成します。本連載を読んでいない方でも、すぐに統計的アービトラージ取引パイプラインを試せるように配慮しています。必要なのは、データベーススキーマを提供することだけです(添付参照)。
def CreateDbIfMissing(db_path: str | Path, schema_path: Optional[str | Path] = None) -> None: """ If the database does not exist and a schema is provided, create it from SQL. Otherwise, create an empty file (SQLite creates DB on first connection). """ db_path = Path(db_path) if db_path.exists(): logger.info("DB already present: %s", db_path) return logger.info("DB missing; creating: %s", db_path) with sqlite3.connect(db_path) as conn: if schema_path: schema_sql = Path(schema_path).read_text(encoding="utf-8") conn.executescript(schema_sql) logger.info("Applied schema from: %s", schema_path)
MetaTrader 5ターミナルに接続します。
def InitMT5(login: Optional[int] = None, password: Optional[str] = None, server: Optional[str] = None, path: Optional[str] = None) -> None: """Initialize connection to the running MetaTrader 5 terminal.""" if mt5 is None: raise RuntimeError("MetaTrader5 package not available.") # if not mt5.initialize(path=path, login=login, password=password, server=server): if not mt5.initialize(): raise RuntimeError(f"MT5 initialize() failed: {mt5.last_error()}") logger.info("MetaTrader5 initialized.")
MetaTrader 5から市場データをダウンロードします。
def DownloadMarketData(symbols: List[str], timeframes: List[str], bars: int = 5000 ) -> Dict[Tuple[str, str], pd.DataFrame]: """ Download OHLCV data from MT5 for each (symbol, timeframe). Returns dict with keys (symbol, timeframe) -> DataFrame columns: ['tstamp','timeframe','open','high','low','close','tick_volume','real_volume','spread'] """ if mt5 is None: raise RuntimeError("MetaTrader5 package not available.") out: Dict[Tuple[str, str], pd.DataFrame] = {} for sym in symbols: for tf in timeframes: tf_const = _mt5_timeframe_constant(tf) rates = mt5.copy_rates_from_pos(sym, tf_const, 0, bars) if rates is None or len(rates) == 0: logger.warning("No data for %s %s", sym, tf) continue df = pd.DataFrame(rates) # Rename & select df = df.rename(columns={ "time": "tstamp", "open": "price_open", "high": "price_high", "low": "price_low", "close": "price_close", "real_volume": "real_volume", }) df["timeframe"] = tf cols = ["tstamp","timeframe","price_open","price_high","price_low","price_close", "tick_volume","real_volume","spread"] df = df[cols].copy() out[(sym, tf)] = df logger.info("Downloaded %d bars for %s %s", len(df), sym, tf) return out
この市場データをSQLiteデータベースに保存します。
def StoreMarketData(db_path: str | Path, data: Dict[Tuple[str, str], pd.DataFrame]) -> None: """Upsert market data into SQLite according to the provided schema.""" with sqlite3.connect(db_path) as conn: for (sym, tf), df in data.items(): symbol_id = _ensure_symbol(conn, sym) df = df.copy() df["symbol_id"] = symbol_id # Use executemany for performance sql = """ INSERT OR REPLACE INTO market_data (tstamp, timeframe, price_open, price_high, price_low, price_close, tick_volume, real_volume, spread, symbol_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ rows = list(df[["tstamp","timeframe","price_open","price_high","price_low","price_close", "tick_volume","real_volume","spread","symbol_id"]].itertuples(index=False, name=None)) conn.executemany(sql, rows) conn.commit() logger.info("Stored %d rows for %s %s", len(rows), sym, tf)
選択した銘柄のピアソン相関を確認します。
def RunCorrelationTest(prices: pd.DataFrame, plot: bool = True) -> Tuple[pd.DataFrame, Tuple[str, str], float]: """Compute Pearson correlation matrix and return the most correlated pair (off-diagonal).""" corr = prices.corr(method="pearson") # Mask diagonal, find max corr_values = corr.where(~np.eye(len(corr), dtype=bool)) idxmax = np.unravel_index(np.nanargmax(corr_values.values), corr_values.shape) symbols = corr.columns.tolist() pair = (symbols[idxmax[0]], symbols[idxmax[1]]) value = corr_values.values[idxmax] logger.info("Most correlated pair: %s ~ %s (r=%.4f)", pair[0], pair[1], value) if plot: plt.figure(figsize=(10, 7)) sns.heatmap(corr, annot=False, vmin=-1, vmax=1, cmap="coolwarm", square=True) plt.title("Pearson Correlation Heatmap") plt.tight_layout() plt.show() print(f"Most correlated pair: {pair[0]} ~ {pair[1]} (r={value:.4f})") return corr, pair, float(value)
選択した銘柄のエングル=グレンジャー共和分係数を計算し、最も共和分性の高いペアを特定します。
def RunEngleGrangerTest(prices: pd.DataFrame, plot: bool = True ) -> Tuple[Tuple[str, str], float, float, pd.Series]: """ Loop over all symbol pairs, run statsmodels.coint (Engle-Granger). Return: best_pair, best_pvalue, hedge_ratio (y_on_x), spread series (y - beta*x) """ best_pair, best_p, best_beta, best_spread = None, 1.0, np.nan, None cols = list(prices.columns) for y_sym, x_sym in itertools.combinations(cols, 2): y = prices[y_sym].dropna() x = prices[x_sym].dropna() join = pd.concat([y, x], axis=1).dropna() if len(join) < 50: continue score, pvalue, _ = coint(join.iloc[:,0], join.iloc[:,1]) if pvalue < best_p: beta = _ols_hedge_ratio(join.iloc[:,0], join.iloc[:,1]) spread = join.iloc[:,0] - beta * join.iloc[:,1] best_pair, best_p, best_beta, best_spread = (y_sym, x_sym), pvalue, beta, spread if best_pair is None: raise ValueError("No cointegrated pairs found (Engle-Granger) with sufficient data.") print(f"Most cointegrated pair (Engle-Granger): {best_pair[0]} ~ {best_pair[1]} (p={best_p:.6f}, beta={best_beta:.4f})") if plot: fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(prices.index, prices[best_pair[0]], label=best_pair[0]) ax.plot(prices.index, prices[best_pair[1]]*best_beta, label=f"{best_pair[1]} * beta") ax.set_title(f"Best EG Pair Overlay (beta={best_beta:.4f})") ax.legend() fig.tight_layout() plt.show()fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(best_spread.index, best_spread.values, label="Spread") ax2.axhline(0, line) ax2.set_title("Spread (y - beta*x)")
ax2.legend() fig2.tight_layout()
plt.show()
return best_pair, float(best_p), float(best_beta), best_spread
エングル=グレンジャースプレッドに対して定常性検定(ADFおよびKPSS)を実行します。
def RunADFTest(series: pd.Series) -> Tuple[float, float, dict]: """Augmented Dickey-Fuller test on a univariate series.""" series = pd.Series(series).dropna() adf_stat, pvalue, usedlag, nobs, crit, icbest = adfuller(series, autolag="AIC") print(f"ADF: stat={adf_stat:.4f}, p={pvalue:.6f}, lags={usedlag}, nobs={nobs}, crit={crit}") return float(adf_stat), float(pvalue), crit
def RunKPSSTest(series: pd.Series, regression: str = "c") -> Tuple[float, float, dict]: """KPSS stationarity test on a univariate series (regression='c' or 'ct').""" series = pd.Series(series).dropna() stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto") print(f"KPSS({regression}): stat={stat:.4f}, p={pvalue:.6f}, lags={lags}, crit={crit}") return float(stat), float(pvalue), crit
ジョハンセン共和分検定を実行します(ランク>=1の場合、第一固有ベクトルからポートフォリオ比率を計算)。
def RunJohansenTest(prices: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, plot: bool = True ) -> Tuple[int, np.ndarray, np.ndarray, Optional[pd.Series]]: """ Johansen cointegration test on a set of symbols (>=2). Returns: (rank, eigenvectors (beta), eigenvalues, spread_from_first_vector or None) - Significant rank determined by trace statistics vs 95% critical values. - First eigenvector provides portfolio weights (normalized s.t. min(|w|)=1 or w[-1]=1). """ data = prices.dropna().values if data.shape[1] < 2: raise ValueError("Johansen requires at least two series.") joh = coint_johansen(data, det_order, k_ar_diff) # Significant rank by trace statistic > crit value (95% = column index 1) trace_stats = joh.lr1 # trace stats crit_vals = joh.cvt[:, 1] # 95% rank = int(np.sum(trace_stats > crit_vals)) print("Johansen trace stats:", trace_stats) print("Johansen 95% crit vals:", crit_vals) print(f"Johansen inferred rank: {rank}") weights = None spread = None if rank >= 1: # Beta columns are cointegrating vectors; take the first (associated with largest eigenvalue) beta = joh.evec # shape (k, k) eigvals = joh.eig # shape (k,) # Sort by eigenvalue desc to be explicit order = np.argsort(eigvals)[::-1] beta_sorted = beta[:, order] eig_sorted = eigvals[order] first_vec = beta_sorted[:, 0] # Normalize such that the weight of the last symbol is 1 (common in practice) if first_vec[-1] == 0: norm = np.max(np.abs(first_vec)) else: norm = first_vec[-1] weights = first_vec / norm print("First Johansen eigenvector (portfolio weights):") for sym, w in zip(prices.columns, weights): print(f" {sym:>12s}: {w:+.6f}") # Spread = P * weights spread = pd.Series(prices.values @ weights, index=prices.index, name="JohansenSpread") if plot: fig, ax = plt.subplots(figsize=(10, 4)) ax.plot(spread.index, spread.values, label="Johansen Spread") ax.axhline(0, linestyle="--") ax.set_title("Johansen Spread (1st eigenvector)") ax.legend() fig.tight_layout() plt.show() return rank, beta_sorted, eig_sorted, spread, weights return rank, joh.evec, joh.eig, weights, None
ジョハンセン共和分検定で任意のランクが有意であれば、スクリプトはジョハンセンスプレッドに対してADFおよびKPSS検定を実行し、最後にstrategyテーブルに戦略パラメータ(銘柄、比率、時間足)を保存します。
def StoreJohansenStrategy(db_path: str | Path, name: str, symbols: List[str], weights: np.ndarray, timeframe: str) -> None: """ Store Johansen eigenvector (portfolio weights) and symbols into the strategy table. Symbols and weights are stored as JSON arrays (text). """ import json with sqlite3.connect(db_path) as conn: cur = conn.cursor() cur.execute(""" INSERT OR REPLACE INTO strategy (name, symbols, weights, timeframe) VALUES (?, ?, ?, ?) """, ( name, json.dumps(symbols), json.dumps([float(w) for w in weights]), timeframe )) conn.commit() logger.info("Stored Johansen strategy '%s' with symbols=%s timeframe=%s", name, symbols, timeframe)
利便性のため、スクリプトは関連する各セクション(スプレッドなど)で処理を停止してグラフをプロットします。これにより、選択した銘柄がより詳細な評価に値するかどうかを簡単に判断でき、必要に応じて銘柄の順序(エングル=グレンジャー用)や時間足を変更できます。このスクリプトは、次のステップで完全自動化されたパイプラインが整う前の手動スクリーニング用ツールとして機能します。各タスクは独立した関数としてカプセル化されているため、他のスクリプトに簡単にインポート可能です。
警告:このスクリプトはシステムのエントリーポイントです。新しい銘柄リストをここで定義した直後に、db-updateサービスに登録してください。これにより、EAが動作していない場合でもデータベースが安定したペースで更新されます。このステップは現時点では手動でおこなう必要があります。
スクリプトを実行するには、コマンドラインからパラメータ付きで呼び出します。例は以下の通りです。
PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "D1" --bars 1000 --run
ご覧の通り、dbパスのパラメータには環境変数を使用しています。すべて順調にいけば、ジョハンセン検定により共和分が確認されなかった場合、以下のような結果が表示されます。
2025-09-03 15:35:24,588 [INFO] Database exists:True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)
2025-09-03 15:35:24,589 [INFO] Using existing DB at C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db.
2025-09-03 15:35:24,605 [INFO] MetaTrader 5 initialized.
2025-09-03 15:35:24,613 [INFO] Downloaded 1000 bars for MU D1
2025-09-03 15:35:24,618 [INFO] Downloaded 1000 bars for NVDA D1
2025-09-03 15:35:24,625 [INFO] Downloaded 1000 bars for MPWR D1
2025-09-03 15:35:24,633 [INFO] Downloaded 1000 bars for MCHP D1
2025-09-03 15:35:24,997 [INFO] Stored 1000 rows for MU D1
2025-09-03 15:35:25,261 [INFO] Stored 1000 rows for NVDA D1
2025-09-03 15:35:25,514 [INFO] Stored 1000 rows for MPWR D1
2025-09-03 15:35:25,870 [INFO] Stored 1000 rows for MCHP D1
2025-09-03 15:35:25,871 [INFO] MetaTrader 5 shutdown.
2025-09-03 15:35:25,961 [INFO] ReadMarketData:D1 timeframe, 1000 rows, 4 symbols
2025-09-03 15:35:25,964 [INFO] Most correlated pair:MPWR ~ NVDA (r=0.8335)
Most correlated pair:MPWR ~ NVDA (r=0.8335)
Most cointegrated pair (Engle-Granger):MPWR ~ MU (p=0.010558, beta=5.4940)
エングル=グレンジャースプレッドに対するADFおよびKPSS検定:
ADF: stat=-3.8778, p=0.002204, lags=16, nobs=983, crit={'1%': np.float64(-3.4370198458812156), '5%': np.float64(-2.864484708707697), '10%': np.float64(-2.568337912084273)}
C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321:InterpolationWarning:The test statistic is outside of the range of p-values available in the look-up table.The actual p-value is smaller than the p-value returned.
stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")
KPSS(c): stat=0.9279, p=0.010000, lags=19, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}
提供されたすべての銘柄に対するジョハンセン検定:
Johansen trace stats: [34.26486877 18.91156161 8.04865462 0.11018312]
Johansen 95% crit vals: [47.8545 29.7961 15.4943 3.8415]
Johansen inferred rank:0
上述の通り、ジョハンセン検定で有意なランク(1以上)が得られた場合、つまり選択した銘柄間に少なくとも1つの共和分ベクトルが存在する場合、スクリプトは最初の固有ベクトルの値(ポートフォリオの比率)をデータベースのstrategyテーブルに保存します。また、スプレッドに対してADF検定とKPSS検定も実行されます。
PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "H4" --bars 90 --run
この場合、Windows PowerShellまたはターミナルに次のようなものが表示されます。
2025-09-03 15:50:43,168 [INFO] Database exists:True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)
2025-09-03 15:50:43,169 [INFO] Using existing DB at C:\Users\your\path\to\Files\StatArb\statarb-0.2.db.
2025-09-03 15:50:43,188 [INFO] MetaTrader 5 initialized.
2025-09-03 15:50:44,297 [INFO] Downloaded 90 bars for MU H4
2025-09-03 15:50:44,357 [INFO] Downloaded 90 bars for NVDA H4
2025-09-03 15:50:46,322 [INFO] Downloaded 90 bars for MPWR H4
2025-09-03 15:50:46,732 [INFO] Downloaded 90 bars for MCHP H4
2025-09-03 15:50:49,148 [INFO] Stored 90 rows for MU H4
2025-09-03 15:50:49,160 [INFO] Stored 90 rows for NVDA H4
2025-09-03 15:50:49,171 [INFO] Stored 90 rows for MPWR H4
2025-09-03 15:50:49,276 [INFO] Stored 90 rows for MCHP H4
2025-09-03 15:50:49,277 [INFO] MetaTrader 5 shutdown.
2025-09-03 15:50:49,617 [INFO] ReadMarketData:H4 timeframe, 90 rows, 4 symbols
2025-09-03 15:50:49,622 [INFO] Most correlated pair:MPWR ~ NVDA (r=0.6115)
Most correlated pair:MPWR ~ NVDA (r=0.6115)
Most cointegrated pair (Engle-Granger):MU ~ NVDA (p=0.072055, beta=-0.2391)
エングル=グレンジャースプレッドに対するADFおよびKPSS検定:
ADF: stat=-3.1693, p=0.021831, lags=0, nobs=89, crit={'1%': np.float64(-3.506057133647011), '5%': np.float64(-2.8946066061911946), '10%': np.float64(-2.5844100201994697)}
C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321:InterpolationWarning:The test statistic is outside of the range of p-values available in the look-up table.The actual p-value is greater than the p-value returned.
stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")
KPSS(c): stat=0.1793, p=0.100000, lags=5, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}
提供されたすべての銘柄に対するジョハンセン検定:
Johansen trace stats: [63.23695088 32.35462044 16.63904745 5.4295422 ]
Johansen 95% crit vals: [47.8545 29.7961 15.4943 3.8415]
Johansen inferred rank:4
First Johansen eigenvector (portfolio weights):
MCHP: +0.273652
MPWR: -0.148092
MU: +2.256498
NVDA: +1.000000
ADF & KPSS on Johansen spread:
ADF: stat=-4.1427, p=0.000823, lags=1, nobs=88, crit={'1%': np.float64(-3.506944401824286), '5%': np.float64(-2.894989819214876), '10%': np.float64(-2.584614550619835)}
KPSS(c): stat=0.4009, p=0.076784, lags=4, crit={'10%':0.347, '5%':0.463, '2.5%':0.574, '1%':0.739}
2025-09-03 15:50:50,051
[INFO] Stored Johansen strategy 'Nasdaq_NVDA_Coint' with symbols=['MCHP', 'MPWR', 'MU', 'NVDA'] timeframe=H4
上記の補間警告についてはどうでしょうか。
これらの補間警告は、エングル=グレンジャースプレッドに対するKPSS定常性検定に関連しています。警告は、検定の参照表の範囲を超えた極端な結果が出たことを示しています。具体的には、エングル=グレンジャースプレッドが次のいずれかであることを示しています。
- 強く定常的である(返されたp値より大きい)
- 強く非定常的である(返されたp値より小さい)
現在のところ、リアルタイムでモデルパラメータを更新している段階では、これらの警告は安全に無視して構いません。次のステップ、資産のスクリーニングをおこなう段階で詳しく確認することになります。
MQL5側
メインファイルにはEAのイベントハンドラのみを残し、その他の関数はすべてNasdaq_NVDA_Coint.mqhヘッダファイルに移動しました。
以前は戦略パラメータがハードコードされていましたが、現在はグローバル変数として管理されています。
// Global variables string symbols[] = {}; // Asset symbols double weights[] = {}; // Portfolio weights ENUM_TIMEFRAMES timeframe = PERIOD_CURRENT; // Strategy cointegrated timeframe
OnInit()イベントハンドラ内でこれらの変数をチェックし、必要に応じてデータベースからロードします。
//+------------------------------------------------------------------+ //| Expert initialization function | //+------------------------------------------------------------------+ int OnInit() { ResetLastError(); // Check if all symbols are available for(int i = 0; i < ArraySize(symbols); i++) { if(!SymbolSelect(symbols[i], true)) { Print("Error: Symbol ", symbols[i], " not found!"); return(INIT_FAILED); } } // Initialize spread buffer ArrayResize(spreadBuffer, InpLookbackPeriod); // Set a timer for spread, mean, and stdev calculations EventSetTimer(InpUpdateFreq * 60); // min one minute // Load strategy parameters from database if(!LoadStrategyFromDB(InpDbFilename, InpStrategyName, symbols, weights, timeframe)) { // Handle error - maybe use default values printf("Error at " + __FUNCTION__ + " %s ", getUninitReasonText(GetLastError())); return INIT_FAILED; } return(INIT_SUCCEEDED); }
LoadStrategyFromDB()関数はヘッダファイル内に実装されています。
//+------------------------------------------------------------------+ //| Load strategy parameter from database | //+------------------------------------------------------------------+ bool LoadStrategyFromDB(string db_name, string strat_name, string &symbols_arr[], double &weights_arr[], ENUM_TIMEFRAMES &tf) { // Open the database (Metatrader's integrated SQLite) int db = DatabaseOpen(db_name, DATABASE_OPEN_READONLY); if(db == INVALID_HANDLE) { Print("Failed to open database: %s", GetLastError()); return false; } // Prepare the SQL query with json_extract string query = StringFormat( "SELECT " "json_extract(symbols, '$') as symbols_json, " "json_extract(weights, '$') as weights_json, " "timeframe " "FROM strategy WHERE name = '%s'", strat_name ); // Execute the query int result = DatabasePrepare(db, query); if(result <= 0) { Print("Failed to prepare query: ", GetLastError()); DatabaseClose(db); return false; } // Read the results if(DatabaseRead(result)) { // Get symbols_arr JSON array and parse it string symbolsJson; DatabaseColumnText(result, 0, symbolsJson); ParseJsonArray(symbolsJson, symbols_arr); // Get weights_arr JSON array and parse it string weightsJson; DatabaseColumnText(result, 1, weightsJson); ParseJsonDoubleArray(weightsJson, weights_arr); // Get tf string and convert to ENUM_TIMEFRAMES string timeframeStr; DatabaseColumnText(result, 2, timeframeStr); tf = StringToTimeframe(timeframeStr); Print("Successfully loaded strategy: ", strat_name); Print("Symbols JSON: ", symbolsJson); Print("Weights JSON: ", weightsJson); Print("Timeframe: ", timeframeStr); } else { Print("Strategy not found: ", strat_name); DatabaseFinalize(result); DatabaseClose(db); return false; } // Clean up DatabaseFinalize(result); DatabaseClose(db); return true; }
前述の通り、symbols配列とポートフォリオのweights配列は、SQLiteのjson_extract()関数を用いて読み込みます。署名は次のようになります。
json_extract(json, path)
ここで
- json:JSONデータを格納したTEXT型列または文字列
- path:JSON内で抽出したい値の位置を指定するJSONパス式。通常「$」でJSONドキュメントのルートを示し、その後ドット表記($.key)でオブジェクトのキーを、またはブラケット表記($[index])で配列のインデックスを指定します。本ケースでは配列データを読み取るため、ブラケット表記を使用しています。
このJSON配列をMQL5配列に変換するためのヘルパー関数を2つ用意しています。これらの関数はJSONの形式を処理し、括弧や引用符を削除した後、カンマで分割して配列として扱えるようにします。
ParseJsonArray()
["MU", "NVDA", "MPWR", "MCHP"]のようなJSON文字列配列を解析します。
// Helper function to parse JSON array of strings void ParseJsonArray(string json, string &array[]) { // Remove brackets and quotes from JSON array string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ] StringReplace(cleaned, "\"", ""); // Remove quotes // Split by commas StringSplit(cleaned, ',', array); // Trim whitespace from each element for(int i = 0; i < ArraySize(array); i++) { array[i] = StringTrim(array[i]); } }
ParseJsonDoubleArray()
[2.699439, 1.000000, -1.877447, -2.505294]のようなJSON数値配列を解析します。
// Helper function to parse JSON array of doubles void ParseJsonDoubleArray(string json, double &array[]) { // Remove brackets from JSON array string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ] // Split by commas string stringArray[]; int count = StringSplit(cleaned, ',', stringArray); // Convert to double array ArrayResize(array, count); for(int i = 0; i < count; i++) { array[i] = StringToDouble(StringTrim(stringArray[i])); } }
JSON値に存在する可能性のある空白を削除する関数
StringTrim()
を追加しました。
// Helper function to trim whitespace string StringTrim(string str) { // Trim leading whitespace while(StringLen(str) > 0 && str[0] == ' ') { str = StringSubstr(str, 1); } // Trim trailing whitespace while(StringLen(str) > 0 && str[StringLen(str) - 1] == ' ') { str = StringSubstr(str, 0, StringLen(str) - 1); } return str; }
MQL5コードベースにはJSON処理用のライブラリが存在します。JSONオブジェクトの使用頻度や複雑さが増す場合は、それらを検討して利点を評価する価値があります。しかし、本ケースのように単純な用途では、外部ライブラリに依存するほどのメリットはあまりないと考えられます。
ヘルパー関数
StringToTimeframe()
データベースの時間足文字列を適切なMQL5 ENUM_TIMEFRAMES値に変換します。
// Helper function to convert tf string to ENUM_TIMEFRAMES ENUM_TIMEFRAMES StringToTimeframe(string tfStr) { if(tfStr == "M1") return PERIOD_M1; if(tfStr == "M2") return PERIOD_M2; . . . if(tfStr == "MN1") return PERIOD_MN1; return PERIOD_CURRENT; // Default if not found }
EAを起動または再起動した際に呼ばれるOnInit()で戦略パラメータを確認するのに加え、同じ関数をOnTimer()イベントハンドラでも定期的に呼び出します。この定期実行の間隔はユーザー入力パラメータで設定可能です。
// Set a timer for spread, mean, and stdev calculations // and strategy parameters update (check DB) EventSetTimer(InpUpdateFreq * 60); // min one minute void OnTimer(void) { ResetLastError(); // Wrapper around LoadStrategyFromDB: for clarity if(!UpdateModelParams(InpDbFilename, InpStrategyName, symbols, weights, timeframe)) { printf("%s failed: %s", __FUNCTION__, GetLastError()); }
UpdateModelParams()はLoadStrategyFromDB()の単なるラッパーです。
//+------------------------------------------------------------------+ //| Wrapper to load strategy from db | //+------------------------------------------------------------------+ // Load strategy parameters from database bool UpdateModelParams(string db_name, string strat_name, string &symbols_arr[], double &weights_arr[], ENUM_TIMEFRAMES tf) { return LoadStrategyFromDB(db_name, strat_name, symbols_arr, weights_arr, tf); }
これにより、関数の目的がより明確になります。
結論
平均回帰戦略向けの共和分株バスケットは、ポートフォリオの比率を常に更新する必要があります。本記事では、バスケットの銘柄自体を含め、常に最新の戦略パラメータを保持できる方法を示しました。
MetaTrader 5の統合SQLiteデータベースを、データ分析と取引環境の間のインターフェースとして活用し、両者を分離したまま運用できる方法も説明しました。
また、本連載を読んでいない方向けに、共和分株の統計的アービトラージ取引パイプラインを一括で実行できる統合Pythonスクリプトを提供しています。このスクリプトは相関検定、共和分検定、定常性検定を一括実行し、最新の戦略パラメータをEAが読み取れる形でデータベースに反映します。
参照文献
Daniel P. Palomar (2025).Portfolio Optimization:Theory and Application.Cambridge University Press.
| ファイル名 | 説明 |
|---|---|
| StatArb/db-setup.mq5 | schema-0.2.sqlスキーマファイルを読み込んでSQLiteデータベースを作成および初期化するMQL5スクリプト |
| StatArb/db-update-statarb-0.2.mq5 | SQLiteデータベースを最新の確定価格バーで更新するMQL5 Service |
| StatArb/Nasdaq_NVDA_Coint.mql5 | サンプルEA |
| StatArb/Nasdaq_NVDA_Coint.mqh | サンプルEAヘッダファイル |
| StatArb/schema-0.2.sql | データベースを初期化するためのSQLスキーマファイル(DDL)。テーブル、フィールド、制約を生成します。 |
| stat_arb_pipeline_mt5.py | このPythonスクリプトは、共和分株に対する統計的アービトラージ取引パイプラインを一括で実行します。処理にはPearson相関検定、ジョハンセン共和分検定、そしてADFおよびKPSS定常性検定が含まれます。 |
MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/19428
警告: これらの資料についてのすべての権利はMetaQuotes Ltd.が保有しています。これらの資料の全部または一部の複製や再プリントは禁じられています。
この記事はサイトのユーザーによって執筆されたものであり、著者の個人的な見解を反映しています。MetaQuotes Ltdは、提示された情報の正確性や、記載されているソリューション、戦略、または推奨事項の使用によって生じたいかなる結果についても責任を負いません。
MQL5での取引戦略の自動化(第31回):プライスアクションに基づくスリードライブハーモニックパターンシステムの作成
MQL5で自己最適化エキスパートアドバイザーを構築する(第14回):フィードバックコントローラーにおけるデータ変換を調整パラメータとして捉える
MQL5における単変量時系列への動的モード分解の適用
カスタム市場センチメント指標の開発
- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索