Statistische Arbitrage durch kointegrierte Aktien (Teil 4): Modellaktualisierung in Echtzeit
Einführung
Es ist an der Zeit, unser Kointegrationsmodell in Echtzeit zu aktualisieren. Genauer gesagt, ist es an der Zeit, die relative Gewichtung der einzelnen Aktien in unserem Handelsportfolio mit den neuesten Berechnungen zu aktualisieren, während der Expert Advisor läuft. Bisher haben wir in unseren Erläuterungen und Backtests die gleichen Portfoliogewichte verwendet. Sie dienten als Vereinfachung, um das Verständnis des Prozesses und der Bedeutung und Funktionsweise der zugrunde liegenden statistischen Tests zu erleichtern. Aber im wirklichen Leben ändern sich diese Portfoliogewichte praktisch jedes Mal, wenn ein neuer Datenpunkt eintrifft, d. h. in unserem Fall bei jedem neuen geschlossenen Balken.
Doch zunächst ein wenig Kontext, eine kurze Zusammenfassung, um sicherzustellen, dass wir alle auf dem gleichen Level sind.
Wir entwickeln eine Art statistisches Arbitrage-System für den armen Mann - eine Stat-Arb-Pipeline, die für den durchschnittlichen Einzelhändler mit einem Consumer-Notebook und einer normalen Netzwerkbandbreite konzipiert ist. In dieser Artikelserie dokumentieren und beschreiben wir die statistischen Tests, die für die Suche nach hoch korrelierten Vermögenswerten für den Paarhandel und kointegrierten Aktien für die Portfoliobildung verwendet werden, sowie die Tests, die für die Bewertung der Stationarität ihrer relativen Spreads verwendet werden, die für Mean-Reversion-Strategien wie die unsere erforderlich ist.
Im vorigen Artikel dieser Serie (Teil 3) haben wir die Einrichtung der Datenbank beschrieben und einige Anmerkungen zu unseren Designentscheidungen gemacht. Dieser Artikel ist eine direkte Fortsetzung des letzten Artikels. Sobald unsere Datenbank eingerichtet ist und regelmäßig aktualisiert wird, können wir unsere Datenanalyse von unserer Handelstätigkeit entkoppeln.
Jetzt werden wir sehen:
- wie wir unsere Python-Skripte geändert haben, um die gewonnenen Erkenntnisse einzubeziehen, wobei alle Tests in einem einzigen Durchgang verarbeitet werden, um die Auswahl der Assets zu erleichtern;
- wie wir unseren Expert Advisor Code komplett überarbeitet haben, um die aktuelle Datenbank während der Ausführung zu nutzen, und wie wir fast alle Funktionen in eine MQL5-Header-Datei verschoben haben, um die Dinge überschaubarer und wartbarer zu machen, wobei wir nur die nativen Event-Handler in der Hauptdatei behalten haben;
- Und schließlich werden wir sehen, welche Änderungen wir an unserer Datenbank vorgenommen haben, damit sie als Schnittstelle zwischen der Echtzeit-Datenanalyse und der Online-Handelsumgebung fungieren kann.
Beginnen wir mit dem letzten, Punkt 3, damit die anderen beiden Punkte bei der Beschreibung einen Sinn ergeben.
Die Datenbank als einzige Quelle der Wahrheit
Wir begannen den letzten Artikel mit einer einfachen Definition, welche Art von Antwort wir von unserer Datenbank an unseren Expert Advisor senden wollen, um das Modell in Echtzeit zu aktualisieren. Dies ist ein typischer „Schummelcode“, der häufig für Bottom-up-Designs verwendet wird. Wir fragen nach dem unmittelbaren Bedarf und arbeiten daran, ihn zu erfüllen. Dadurch wird eine Überplanung vermieden und Routenkorrekturen werden erleichtert. Dies ist der entsprechende Ausschnitt aus dem vorherigen Artikel.
Am Ende des Tages sollte unsere Datenbank eine ganz einfache Frage beantworten: Was sollten wir jetzt handeln, um die größtmögliche Rendite zu erzielen?
In Anlehnung an den letzten Artikel dieser Serie, in dem wir die Richtung und das Volumen unserer Aufträge auf der Grundlage der Gewichtung des Aktienkorbs definiert haben, könnte eine mögliche Antwort etwa so lauten:
| Symbole | Gewichte | Zeitrahmen |
|---|---|---|
| „MU“, „NVDA“, „MPWR“, „MCHP“ | 2.699439, 1.000000, -1.877447, -2.505294 | D1 (täglicher Zeitrahmen) |
Tabelle 1. Beispiel einer erwarteten Antwort auf eine Abfrage zur Aktualisierung eines Echtzeitmodells (früherer Artikel)
Um diese Art von Antwort zu erhalten, haben wir eine neue Tabelle zu unserem Datenbankschema hinzugefügt, die Tabelle „strategy“. Sie hat vier TEXT-Felder:- Name (PRIMÄRSCHLÜSSEL)
- Symbols
- Weights
- Timeframe
-- strategy definition CREATE TABLE strategy ( name TEXT CHECK(LENGTH(name) <= 20) NOT NULL, symbols TEXT CHECK(LENGTH(symbols) <= 255) NOT NULL, weights TEXT CHECK(LENGTH(weights) <= 255), timeframe TEXT CHECK ( timeframe IN ( 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M10', 'M12', 'M15', 'M20', 'M30', 'H1', 'H2', 'H3', 'H4', 'H6', 'H8', 'H12', 'D1', 'W1', 'MN1' ) ),CONSTRAINT strategy_pk PRIMARY KEY (name) ) WITHOUT ROWID, STRICT;Wir haben diese Tabelle als Tabelle WITHOUT ROWID definiert. In SQLite sind alle rowid-Tabellen regulären Tabellen mit Ausnahme von virtuellen Tabellen und solchen mit der Beschränkung INTEGER PRIMARY KEY. In rowid-Tabellen sind die Primärschlüssel keine echten Primärschlüssel.
Indem wir die Tabelle als WITHOUT ROWID-Tabelle definieren, vermeiden wir, dass NULL eingefügt wird und folglich die relationale Integrität beeinträchtigt wird. Der Nachteil ist, dass diese Klausel SQLite-spezifisch ist, was bedeutet, dass unser Schema nicht mehr vollständig auf andere RDBMS portierbar ist. Um das gleiche Schema in anderen RDBMS zu verwenden, müssen wir es anpassen. Aber ich denke, der Vorteil, dass NULL nicht in das Primärschlüsselfeld eingefügt wird, ist die Kosten wert.
Wie Sie vielleicht bemerkt haben, sind „symbols“ und „weights“ beides Arrays. SQLite verfügt jedoch nicht über den Datentyp ARRAY. Aufgrund dieser Einschränkung werden wir diese Daten als TEXT einfügen, aber als JSON-Array formatiert. Diese Maßnahme wird uns später die Arbeit erleichtern, wenn wir zu einem anderen RDBMS wechseln, das den Datentyp ARRAY verwendet.
| name | symbols | weights | timeframe |
|---|---|---|---|
| Nasdaq_NVDA_Coint | ‘["MU", "NVDA", "MPWR", "MCHP"]’ | ‘[“2.699439”, “1.000000”, “-1.877447”, “-2.505294”]’ | D1 |
Tabelle 2. Symbole und Gewichtungsdaten werden in die Tabelle „strategy“ als TEXT-Datentyp eingefügt und als JSON-Arrays formatiert.
Um sie abzufragen, verwenden wir die SQLite Funktion json_extract().
Unser Datenbankschema, das jetzt auf statarb-0.2 aktualisiert wurde, enthält also die zusätzliche Tabelle „strategy“, deren Entity-Relationship-Diagramm wie folgt aussieht:

Abbildung 1. Das statarb-0.2 Datenbank-Entity-Relationship-Diagramm (ERD)
TIPP - Bei der Entwicklung mit SQLite habe ich mir angewöhnt, die alten Versionen der db-Dateien sowie die alten Versionen der Schemadateien NICHT zu löschen. Da wir es mit einer einzigen Datei zu tun haben, ist es einfach, die Datei zu duplizieren, die Kopie in die neue Version umzubenennen und beide im Versionskontrollsystem (VCS) zu verwalten. Diese Praxis bietet uns einen einfachen „Rollback-Pfad“ für den Fall der Fälle.
Abbildung 2. Metaeditor-Navigator, der verschiedene Versionen von SQLite-Dateien (db's) und entsprechende Schemadateien anzeigt
Wie Sie in der oben abgebildeten ERD sehen können, ist unsere neue Tabelle „strategy“ noch mit keiner anderen Tabelle verbunden. Dies wird sich ändern, sobald wir mit dem Backtests/Handel mit dieser neuen EA-Version beginnen, wenn sie mit der Handelstabelle verbunden sein wird. Auch hier gilt: Da wir von unten nach oben arbeiten, werden wir unsere Datenbank nur dann ändern, wenn es nötig ist. Im Moment ist die Tabelle „strategy“ nichts weiter als eine Schnittstelle, ein Mittel zur Kommunikation zwischen unserer Datenanalyse und unseren Handelsumgebungen.
Die Datenbank wird als Schnittstelle zwischen unserer Datenanalyse und unserem Trading Expert Advisor sowie als Schnittstelle zwischen unserem Screening - der Anwendung unserer Datenanalyse - und dem EA dienen. Der Ablauf wird in etwa so sein:

Abbildung 3. Ein Flussdiagramm veranschaulicht den zirkulären Aspekt unserer statistischen Arbitrage-Pipeline
Die Python-Seite
Wir sind von einzelnen Jupyter-Notebooks für jede Aufgabe zu einem einheitlichen Python-Skript übergegangen, das eine umfassende statistische Arbitrage-Pipeline für kointegrierte Aktien ausführt. Dieses Skript wird:
Prüfen Sie, ob die SQLite-Datenbank existiert.
def CheckForDb(db_path: str | Path) -> bool: """Return True if the SQLite database file exists on disk.""" exists = Path(db_path).is_file() logger.info("Database exists: %s (%s)", exists, db_path) return exists
Es wird eine Datenbank erstellt, wenn keine gefunden wird. Dies ist praktisch, wenn Sie diese Serie nicht verfolgen und einfach nur mit der Stat-Arb-Pipeline experimentieren möchten. Alles, was Sie tun müssen, ist, das Datenbankschema (hier beigefügt) bereitzustellen.
def CreateDbIfMissing(db_path: str | Path, schema_path: Optional[str | Path] = None) -> None: """ If the database does not exist and a schema is provided, create it from SQL. Otherwise, create an empty file (SQLite creates DB on first connection). """ db_path = Path(db_path) if db_path.exists(): logger.info("DB already present: %s", db_path) return logger.info("DB missing; creating: %s", db_path) with sqlite3.connect(db_path) as conn: if schema_path: schema_sql = Path(schema_path).read_text(encoding="utf-8") conn.executescript(schema_sql) logger.info("Applied schema from: %s", schema_path)
Verbinden Sie sich mit dem MetaTrader 5 Terminal.
def InitMT5(login: Optional[int] = None, password: Optional[str] = None, server: Optional[str] = None, path: Optional[str] = None) -> None: """Initialize connection to the running MetaTrader 5 terminal.""" if mt5 is None: raise RuntimeError("MetaTrader5 package not available.") # if not mt5.initialize(path=path, login=login, password=password, server=server): if not mt5.initialize(): raise RuntimeError(f"MT5 initialize() failed: {mt5.last_error()}") logger.info("MetaTrader5 initialized.")
Laden Sie Marktdaten von MetaTrader 5 herunter.
def DownloadMarketData(symbols: List[str], timeframes: List[str], bars: int = 5000 ) -> Dict[Tuple[str, str], pd.DataFrame]: """ Download OHLCV data from MT5 for each (symbol, timeframe). Returns dict with keys (symbol, timeframe) -> DataFrame columns: ['tstamp','timeframe','open','high','low','close','tick_volume','real_volume','spread'] """ if mt5 is None: raise RuntimeError("MetaTrader5 package not available.") out: Dict[Tuple[str, str], pd.DataFrame] = {} for sym in symbols: for tf in timeframes: tf_const = _mt5_timeframe_constant(tf) rates = mt5.copy_rates_from_pos(sym, tf_const, 0, bars) if rates is None or len(rates) == 0: logger.warning("No data for %s %s", sym, tf) continue df = pd.DataFrame(rates) # Rename & select df = df.rename(columns={ "time": "tstamp", "open": "price_open", "high": "price_high", "low": "price_low", "close": "price_close", "real_volume": "real_volume", }) df["timeframe"] = tf cols = ["tstamp","timeframe","price_open","price_high","price_low","price_close", "tick_volume","real_volume","spread"] df = df[cols].copy() out[(sym, tf)] = df logger.info("Downloaded %d bars for %s %s", len(df), sym, tf) return out
Speichern Sie diese Marktdaten in der SQLite-Datenbank.
def StoreMarketData(db_path: str | Path, data: Dict[Tuple[str, str], pd.DataFrame]) -> None: """Upsert market data into SQLite according to the provided schema.""" with sqlite3.connect(db_path) as conn: for (sym, tf), df in data.items(): symbol_id = _ensure_symbol(conn, sym) df = df.copy() df["symbol_id"] = symbol_id # Use executemany for performance sql = """ INSERT OR REPLACE INTO market_data (tstamp, timeframe, price_open, price_high, price_low, price_close, tick_volume, real_volume, spread, symbol_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ rows = list(df[["tstamp","timeframe","price_open","price_high","price_low","price_close", "tick_volume","real_volume","spread","symbol_id"]].itertuples(index=False, name=None)) conn.executemany(sql, rows) conn.commit() logger.info("Stored %d rows for %s %s", len(rows), sym, tf)
Prüfen Sie die Pearson-Korrelation für die ausgewählten Symbole.
def RunCorrelationTest(prices: pd.DataFrame, plot: bool = True) -> Tuple[pd.DataFrame, Tuple[str, str], float]: """Compute Pearson correlation matrix and return the most correlated pair (off-diagonal).""" corr = prices.corr(method="pearson") # Mask diagonal, find max corr_values = corr.where(~np.eye(len(corr), dtype=bool)) idxmax = np.unravel_index(np.nanargmax(corr_values.values), corr_values.shape) symbols = corr.columns.tolist() pair = (symbols[idxmax[0]], symbols[idxmax[1]]) value = corr_values.values[idxmax] logger.info("Most correlated pair: %s ~ %s (r=%.4f)", pair[0], pair[1], value) if plot: plt.figure(figsize=(10, 7)) sns.heatmap(corr, annot=False, vmin=-1, vmax=1, cmap="coolwarm", square=True) plt.title("Pearson Correlation Heatmap") plt.tight_layout() plt.show() print(f"Most correlated pair: {pair[0]} ~ {pair[1]} (r={value:.4f})") return corr, pair, float(value)
Berechnen Sie den Engle-Granger-Kointegrationskoeffizienten für die ausgewählten Symbole und geben Sie das am stärksten korrelierte Paar an.
def RunEngleGrangerTest(prices: pd.DataFrame, plot: bool = True ) -> Tuple[Tuple[str, str], float, float, pd.Series]: """ Loop over all symbol pairs, run statsmodels.coint (Engle-Granger). Return: best_pair, best_pvalue, hedge_ratio (y_on_x), spread series (y - beta*x) """ best_pair, best_p, best_beta, best_spread = None, 1.0, np.nan, None cols = list(prices.columns) for y_sym, x_sym in itertools.combinations(cols, 2): y = prices[y_sym].dropna() x = prices[x_sym].dropna() join = pd.concat([y, x], axis=1).dropna() if len(join) < 50: continue score, pvalue, _ = coint(join.iloc[:,0], join.iloc[:,1]) if pvalue < best_p: beta = _ols_hedge_ratio(join.iloc[:,0], join.iloc[:,1]) spread = join.iloc[:,0] - beta * join.iloc[:,1] best_pair, best_p, best_beta, best_spread = (y_sym, x_sym), pvalue, beta, spread if best_pair is None: raise ValueError("No cointegrated pairs found (Engle-Granger) with sufficient data.") print(f"Most cointegrated pair (Engle-Granger): {best_pair[0]} ~ {best_pair[1]} (p={best_p:.6f}, beta={best_beta:.4f})") if plot: fig, ax = plt.subplots(figsize=(10, 5)) ax.plot(prices.index, prices[best_pair[0]], label=best_pair[0]) ax.plot(prices.index, prices[best_pair[1]]*best_beta, label=f"{best_pair[1]} * beta") ax.set_title(f"Best EG Pair Overlay (beta={best_beta:.4f})") ax.legend() fig.tight_layout() plt.show()fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(best_spread.index, best_spread.values, label="Spread") ax2.axhline(0, line) ax2.set_title("Spread (y - beta*x)")
ax2.legend() fig2.tight_layout()
plt.show()
return best_pair, float(best_p), float(best_beta), best_spread
Führen Sie die Stationaritätstests (ADF und KPSS) für die Engle-Granger-Spanne durch.
def RunADFTest(series: pd.Series) -> Tuple[float, float, dict]: """Augmented Dickey-Fuller test on a univariate series.""" series = pd.Series(series).dropna() adf_stat, pvalue, usedlag, nobs, crit, icbest = adfuller(series, autolag="AIC") print(f"ADF: stat={adf_stat:.4f}, p={pvalue:.6f}, lags={usedlag}, nobs={nobs}, crit={crit}") return float(adf_stat), float(pvalue), crit
def RunKPSSTest(series: pd.Series, regression: str = "c") -> Tuple[float, float, dict]: """KPSS stationarity test on a univariate series (regression='c' or 'ct').""" series = pd.Series(series).dropna() stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto") print(f"KPSS({regression}): stat={stat:.4f}, p={pvalue:.6f}, lags={lags}, crit={crit}") return float(stat), float(pvalue), crit
Überprüfen Sie die Johansen-Kointegration (mit Portfoliogewichten aus dem ersten Eigenvektor, wenn Rang >= 1).
def RunJohansenTest(prices: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, plot: bool = True ) -> Tuple[int, np.ndarray, np.ndarray, Optional[pd.Series]]: """ Johansen cointegration test on a set of symbols (>=2). Returns: (rank, eigenvectors (beta), eigenvalues, spread_from_first_vector or None) - Significant rank determined by trace statistics vs 95% critical values. - First eigenvector provides portfolio weights (normalized s.t. min(|w|)=1 or w[-1]=1). """ data = prices.dropna().values if data.shape[1] < 2: raise ValueError("Johansen requires at least two series.") joh = coint_johansen(data, det_order, k_ar_diff) # Significant rank by trace statistic > crit value (95% = column index 1) trace_stats = joh.lr1 # trace stats crit_vals = joh.cvt[:, 1] # 95% rank = int(np.sum(trace_stats > crit_vals)) print("Johansen trace stats:", trace_stats) print("Johansen 95% crit vals:", crit_vals) print(f"Johansen inferred rank: {rank}") weights = None spread = None if rank >= 1: # Beta columns are cointegrating vectors; take the first (associated with largest eigenvalue) beta = joh.evec # shape (k, k) eigvals = joh.eig # shape (k,) # Sort by eigenvalue desc to be explicit order = np.argsort(eigvals)[::-1] beta_sorted = beta[:, order] eig_sorted = eigvals[order] first_vec = beta_sorted[:, 0] # Normalize such that the weight of the last symbol is 1 (common in practice) if first_vec[-1] == 0: norm = np.max(np.abs(first_vec)) else: norm = first_vec[-1] weights = first_vec / norm print("First Johansen eigenvector (portfolio weights):") for sym, w in zip(prices.columns, weights): print(f" {sym:>12s}: {w:+.6f}") # Spread = P * weights spread = pd.Series(prices.values @ weights, index=prices.index, name="JohansenSpread") if plot: fig, ax = plt.subplots(figsize=(10, 4)) ax.plot(spread.index, spread.values, label="Johansen Spread") ax.axhline(0, linestyle="--") ax.set_title("Johansen Spread (1st eigenvector)") ax.legend() fig.tight_layout() plt.show() return rank, beta_sorted, eig_sorted, spread, weights return rank, joh.evec, joh.eig, weights, None
Wenn ein Rang im Johansen-Kointegrationstest SIGNIFIKANT ist, führt das Skript auch den ADF- und den KPSS-Test für die Johansen-Spanne durch und speichert schließlich die Strategieparameter (Symbole, Gewichte und Zeitrahmen) in der Tabelle „strategy“, wie im vorherigen Abschnitt erwähnt.
def StoreJohansenStrategy(db_path: str | Path, name: str, symbols: List[str], weights: np.ndarray, timeframe: str) -> None: """ Store Johansen eigenvector (portfolio weights) and symbols into the strategy table. Symbols and weights are stored as JSON arrays (text). """ import json with sqlite3.connect(db_path) as conn: cur = conn.cursor() cur.execute(""" INSERT OR REPLACE INTO strategy (name, symbols, weights, timeframe) VALUES (?, ?, ?, ?) """, ( name, json.dumps(symbols), json.dumps([float(w) for w in weights]), timeframe )) conn.commit() logger.info("Stored Johansen strategy '%s' with symbols=%s timeframe=%s", name, symbols, timeframe)
Zur Vereinfachung wird die Ausführung angehalten und das Chart bei jedem relevanten Abschnitt (z. B. Spreads) gezeichnet, sodass Sie leicht beurteilen können, ob die ausgewählten Symbole eine detailliertere Bewertung wert sind und eventuell ihre Listenreihenfolge (für Engle-Granger) oder den Zeitrahmen ändern. Das Skript ist ein Werkzeug für das manuelle Screening, bevor wir im nächsten Schritt eine vollautomatische Pipeline aufbauen. Jede Aufgabe ist als unabhängige Funktion gekapselt, sodass Sie sie leicht in andere Skripte importieren können.
WARNUNG: Dieses Skript ist unser Einstiegspunkt in das System. Gleich nachdem hier eine neue Symbolliste definiert wurde, sollten Sie sie in den db-update-Dienst einfügen. Dadurch wird sichergestellt, dass Ihre Datenbank in einem ruhigen Tempo aktualisiert wird, auch wenn Ihr EA aus irgendeinem Grund nicht läuft. Dieser Schritt muss vorerst manuell durchgeführt werden.
Um das Skript auszuführen, rufen Sie es über die Befehlszeile mit Parametern auf. Zum Beispiel:
PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "D1" --bars 1000 --run
Wie Sie sehen können, verwenden wir Umgebungsvariablen für den Parameter db path. Wenn alles gut geht, sollten Sie so etwas sehen, wenn es gemäß dem Johansen-Test KEINE KINTEGRATION gibt.
2025-09-03 15:35:24,588 [INFO] Database exists: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)
2025-09-03 15:35:24,589 [INFO] Using existing DB at C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db.
2025-09-03 15:35:24,605 [INFO] MetaTrader 5 initialized.
2025-09-03 15:35:24,613 [INFO] Downloaded 1000 bars for MU D1
2025-09-03 15:35:24,618 [INFO] Downloaded 1000 bars for NVDA D1
2025-09-03 15:35:24,625 [INFO] Downloaded 1000 bars for MPWR D1
2025-09-03 15:35:24,633 [INFO] Downloaded 1000 bars for MCHP D1
2025-09-03 15:35:24,997 [INFO] Stored 1000 rows for MU D1
2025-09-03 15:35:25,261 [INFO] Stored 1000 rows for NVDA D1
2025-09-03 15:35:25,514 [INFO] Stored 1000 rows for MPWR D1
2025-09-03 15:35:25,870 [INFO] Stored 1000 rows for MCHP D1
2025-09-03 15:35:25,871 [INFO] MetaTrader 5 shutdown.
2025-09-03 15:35:25,961 [INFO] ReadMarketData: D1 timeframe, 1000 rows, 4 symbols
2025-09-03 15:35:25,964 [INFO] Most correlated pair: MPWR ~ NVDA (r=0.8335)
Das am stärksten korrelierende Paar: MPWR ~ NVDA (r=0.8335)
Am meisten kointegriertes Paar (Engle-Granger): MPWR ~ MU (p=0,010558, beta=5,4940)
ADF & KPSS zur Engle-Granger-Spanne:
ADF: stat=-3.8778, p=0.002204, lags=16, nobs=983, crit={'1%': np.float64(-3.4370198458812156), '5%': np.float64(-2.864484708707697), '10%': np.float64(-2.568337912084273)}
C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: InterpolationWarnung: Die Teststatistik liegt außerhalb des Bereichs der in der Look-up-Tabelle verfügbaren p-Werte. Der tatsächliche p-Wert ist kleiner als der zurückgegebene p-Wert.
stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")
KPSS(c): stat=0.9279, p=0.010000, lags=19, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}
Johansen-Test für alle angegebenen Symbole:
Johansen-Spurenstatistiken: [34.26486877 18.91156161 8.04865462 0.11018312]
Johansen 95% kritische Werte: [47.8545 29.7961 15.4943 3.8415]
Von Johansen abgeleiteter Rang: 0
Wie bereits erwähnt, speichert das Skript bei einem SIGNIFIKANTEN Johansen-Rang (>=1), d.h. wenn es mindestens einen kointegrierten Vektor zwischen den ausgewählten Symbolen gibt, die ersten Eigenvektorwerte (Portfoliogewichte) in der Datenbank in der „Strategietabelle“. Außerdem werden die ADF- und KPSS-Tests für die Streuung durchgeführt.
PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "H4" --bars 90 --run
In diesem Fall sollten Sie in der Windows PowerShell oder im Terminal etwas wie folgt sehen:
2025-09-03 15:50:43,168 [INFO] Database exists: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)
2025-09-03 15:50:43,169 [INFO] Using existing DB at C:\Users\your\path\to\Files\StatArb\statarb-0.2.db.
2025-09-03 15:50:43,188 [INFO] MetaTrader 5 initialized.
2025-09-03 15:50:44,297 [INFO] Downloaded 90 bars for MU H4
2025-09-03 15:50:44,357 [INFO] Downloaded 90 bars for NVDA H4
2025-09-03 15:50:46,322 [INFO] Downloaded 90 bars for MPWR H4
2025-09-03 15:50:46,732 [INFO] Downloaded 90 bars for MCHP H4
2025-09-03 15:50:49,148 [INFO] Stored 90 rows for MU H4
2025-09-03 15:50:49,160 [INFO] Stored 90 rows for NVDA H4
2025-09-03 15:50:49,171 [INFO] Stored 90 rows for MPWR H4
2025-09-03 15:50:49,276 [INFO] Stored 90 rows for MCHP H4
2025-09-03 15:50:49,277 [INFO] MetaTrader 5 shutdown.
2025-09-03 15:50:49,617 [INFO] ReadMarketData: H4 timeframe, 90 rows, 4 symbols
2025-09-03 15:50:49,622 [INFO] Most correlated pair: MPWR ~ NVDA (r=0,6115)
Das am stärksten korrelierende Paar: MPWR ~ NVDA (r=0,6115)
Am meisten kointegriertes Paar (Engle-Granger): MU ~ NVDA (p=0,072055, beta=-0,2391)
ADF & KPSS zur Engle-Granger-Spanne:
ADF: stat=-3.1693, p=0.021831, lags=0, nobs=89, crit={'1%': np.float64(-3.506057133647011), '5%': np.float64(-2.8946066061911946), '10%': np.float64(-2.5844100201994697)}
C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: InterpolationWarning: The test statistic is outside of the range of p-values available in the look-up table. The actual p-value is greater than the p-value returned.
stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")
KPSS(c): stat=0.1793, p=0.100000, lags=5, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}
Johansen-Test für alle angegebenen Symbole:
Johansen-Spurenstatistiken: [63.23695088 32.35462044 16.63904745 5.4295422 ]
Johansen 95% kritische Werte: [47.8545 29.7961 15.4943 3.8415]
Von Johansen abgeleiteter Rang: 4
Erster Johansen-Eigenvektor (Portfolio-Gewichte):
MCHP: +0.273652
MPWR: -0.148092
MU: +2.256498
NVDA: +1.000000
ADF & KPSS auf Johansen-Spread:
ADF: stat=-4.1427, p=0.000823, lags=1, nobs=88, crit={'1%': np.float64(-3.506944401824286), '5%': np.float64(-2.894989819214876), '10%': np.float64(-2.584614550619835)}
KPSS(c): stat=0.4009, p=0.076784, lags=4, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}
2025-09-03 15:50:50,051
[INFO] Stored Johansen strategy 'Nasdaq_NVDA_Coint' with symbols=['MCHP', 'MPWR', 'MU', 'NVDA'] timeframe=H4
Was ist mit den oben genannten Interpolationswarnungen?
Diese Interpolationswarnungen stehen im Zusammenhang mit dem KPSS-Stationaritätstest für die Engle-Granger-Spanne. Sie teilen uns mit, dass wir extreme Ergebnisse außerhalb der Testnachschlagetabelle erhalten haben. Die Warnungen weisen uns lediglich darauf hin, dass unsere Engle-Granger-Ausbreitung
- stark stationär (p-value größer als der zurückgegebene p-value)
- oder stark instationär (ein p-Wert kleiner als der zurückgegebene p-Wert) sind.
Wir können diese Warnungen vorerst getrost ignorieren, solange wir uns mit den Echtzeit-Aktualisierungen unserer Modellparameter beschäftigen. Wir werden sie im nächsten Schritt näher betrachten, wenn wir uns mit dem Screening unserer Vermögenswerte befassen.
Die MQL5-Seite
Wir haben nur die EA-Ereignishandlung in der Hauptdatei belassen. Alle anderen Funktionen wurden in die Header-Datei Nasdaq_NVDA_Coint.mqh verschoben.
Zuvor waren die Strategieparameter fest kodiert. Jetzt sind es globale Variablen.
// Global variables string symbols[] = {}; // Asset symbols double weights[] = {}; // Portfolio weights ENUM_TIMEFRAMES timeframe = PERIOD_CURRENT; // Strategy cointegrated timeframe
Diese globalen Variablen werden direkt in OnInit() überprüft.
//+------------------------------------------------------------------+ //| Expert initialization function | //+------------------------------------------------------------------+ int OnInit() { ResetLastError(); // Check if all symbols are available for(int i = 0; i < ArraySize(symbols); i++) { if(!SymbolSelect(symbols[i], true)) { Print("Error: Symbol ", symbols[i], " not found!"); return(INIT_FAILED); } } // Initialize spread buffer ArrayResize(spreadBuffer, InpLookbackPeriod); // Set a timer for spread, mean, and stdev calculations EventSetTimer(InpUpdateFreq * 60); // min one minute // Load strategy parameters from database if(!LoadStrategyFromDB(InpDbFilename, InpStrategyName, symbols, weights, timeframe)) { // Handle error - maybe use default values printf("Error at " + __FUNCTION__ + " %s ", getUninitReasonText(GetLastError())); return INIT_FAILED; } return(INIT_SUCCEEDED); }
Die Funktion LoadStrategyFromDB() befindet sich in der Header-Datei.
//+------------------------------------------------------------------+ //| Load strategy parameter from database | //+------------------------------------------------------------------+ bool LoadStrategyFromDB(string db_name, string strat_name, string &symbols_arr[], double &weights_arr[], ENUM_TIMEFRAMES &tf) { // Open the database (Metatrader's integrated SQLite) int db = DatabaseOpen(db_name, DATABASE_OPEN_READONLY); if(db == INVALID_HANDLE) { Print("Failed to open database: %s", GetLastError()); return false; } // Prepare the SQL query with json_extract string query = StringFormat( "SELECT " "json_extract(symbols, '$') as symbols_json, " "json_extract(weights, '$') as weights_json, " "timeframe " "FROM strategy WHERE name = '%s'", strat_name ); // Execute the query int result = DatabasePrepare(db, query); if(result <= 0) { Print("Failed to prepare query: ", GetLastError()); DatabaseClose(db); return false; } // Read the results if(DatabaseRead(result)) { // Get symbols_arr JSON array and parse it string symbolsJson; DatabaseColumnText(result, 0, symbolsJson); ParseJsonArray(symbolsJson, symbols_arr); // Get weights_arr JSON array and parse it string weightsJson; DatabaseColumnText(result, 1, weightsJson); ParseJsonDoubleArray(weightsJson, weights_arr); // Get tf string and convert to ENUM_TIMEFRAMES string timeframeStr; DatabaseColumnText(result, 2, timeframeStr); tf = StringToTimeframe(timeframeStr); Print("Successfully loaded strategy: ", strat_name); Print("Symbols JSON: ", symbolsJson); Print("Weights JSON: ", weightsJson); Print("Timeframe: ", timeframeStr); } else { Print("Strategy not found: ", strat_name); DatabaseFinalize(result); DatabaseClose(db); return false; } // Clean up DatabaseFinalize(result); DatabaseClose(db); return true; }
Wie bereits erwähnt, werden wir die Arrays der Symbole und der Portfoliogewichte mit der SQLite-Funktion json_extract() lesen. Sie trägt die folgende Signatur:
json_extract(json, Pfad)
wobei:
- „json“ ist die Textspalte oder Zeichenkette, die die JSON-Daten enthält,
- „path“ ist ein JSON-Pfadausdruck, der die Position des zu extrahierenden Wertes innerhalb des JSON angibt. Er beginnt in der Regel mit $, um die Wurzel des JSON-Dokuments darzustellen, gefolgt von der Punktnotation für Objektschlüssel ($.key) oder der Klammernotation für Array-Indizes ($[index]). Diese letzte Form verwenden wir hier, um die Array-Daten zu lesen.
Die beiden Hilfsfunktionen parsen die JSON-Arrays. Diese Funktionen verarbeiten das JSON-Format, indem sie Klammern und Anführungszeichen entfernen und dann durch Kommas trennen.
ParseJsonArray()
parst JSON-String-Arrays wie ["MU", "NVDA", "MPWR", "MCHP"]
// Helper function to parse JSON array of strings void ParseJsonArray(string json, string &array[]) { // Remove brackets and quotes from JSON array string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ] StringReplace(cleaned, "\"", ""); // Remove quotes // Split by commas StringSplit(cleaned, ',', array); // Trim whitespace from each element for(int i = 0; i < ArraySize(array); i++) { array[i] = StringTrim(array[i]); } }
ParseJsonDoubleArray()
parst JSON-Zahlen-Arrays wie [2.699439, 1.000000, -1.877447, -2.505294]
// Helper function to parse JSON array of doubles void ParseJsonDoubleArray(string json, double &array[]) { // Remove brackets from JSON array string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ] // Split by commas string stringArray[]; int count = StringSplit(cleaned, ',', stringArray); // Convert to double array ArrayResize(array, count); for(int i = 0; i < count; i++) { array[i] = StringToDouble(StringTrim(stringArray[i])); } }
Hinzugefügt wurde die Funktion
StringTrim(),
um alle Leerzeichen zu entfernen, die in den JSON-Werten vorhanden sein könnten.
// Helper function to trim whitespace string StringTrim(string str) { // Trim leading whitespace while(StringLen(str) > 0 && str[0] == ' ') { str = StringSubstr(str, 1); } // Trim trailing whitespace while(StringLen(str) > 0 && str[StringLen(str) - 1] == ' ') { str = StringSubstr(str, 0, StringLen(str) - 1); } return str; }
Es gibt JSON-Bibliotheken in der MQL5 Codebase. Wenn Sie JSON-Objekte intensiver nutzen, lohnt es sich vielleicht, nach ihnen zu suchen und die Vorteile zu evaluieren. In diesem einfachen Fall reichen die Vorteile meines Erachtens nicht aus, um eine externe Abhängigkeit zu begründen.
Die Hilfsfunktion
StringToTimeframe()
wandelt die Zeichenkette des Datenbank-Zeitrahmens in den entsprechenden Wert der ENUM_TIMEFRAMES von MQL5 um.
// Helper function to convert tf string to ENUM_TIMEFRAMES ENUM_TIMEFRAMES StringToTimeframe(string tfStr) { if(tfStr == "M1") return PERIOD_M1; if(tfStr == "M2") return PERIOD_M2; . . . if(tfStr == "MN1") return PERIOD_MN1; return PERIOD_CURRENT; // Default if not found }
Neben der Überprüfung des Strategieparameters im OnInit(), der beim Start oder Neustart des EA aufgerufen wird, rufen wir dieselbe Funktion auch in regelmäßigen Abständen im OnTimer()-Eventhandler auf. Dieses Zeitintervall ist ein vom Nutzer eingegebener Parameter.
// Set a timer for spread, mean, and stdev calculations // and strategy parameters update (check DB) EventSetTimer(InpUpdateFreq * 60); // min one minute void OnTimer(void) { ResetLastError(); // Wrapper around LoadStrategyFromDB: for clarity if(!UpdateModelParams(InpDbFilename, InpStrategyName, symbols, weights, timeframe)) { printf("%s failed: %s", __FUNCTION__, GetLastError()); }
UpdateModelParams() ist nur ein Wrapper für LoadStrategyFromDB().
//+------------------------------------------------------------------+ //| Wrapper to load strategy from db | //+------------------------------------------------------------------+ // Load strategy parameters from database bool UpdateModelParams(string db_name, string strat_name, string &symbols_arr[], double &weights_arr[], ENUM_TIMEFRAMES tf) { return LoadStrategyFromDB(db_name, strat_name, symbols_arr, weights_arr, tf); }
Dadurch wird der Zweck der Funktion deutlicher.
Schlussfolgerung
Bei einem ausgewogenen Korb kointegrierter Aktien für eine Strategien der Rückkehr zum Mittelwert müssen die Portfoliogewichte kontinuierlich aktualisiert werden. In diesem Artikel haben wir gesehen, dass wir immer aktuelle Strategieparameter haben können, einschließlich der Korbsymbole selbst.
Wir haben beschrieben, wie wir die in MetaTrader 5 integrierte SQLite-Datenbank als eine Art Schnittstelle zwischen unserer Datenanalyse und unserer Handelsumgebung nutzen können, wobei diese beiden Aktivitäten entkoppelt bleiben.
Um den Lesern, die diese Serie nicht verfolgen, die Arbeit zu erleichtern, stellen wir ein einheitliches Python-Skript mit einer vollständigen statistischen Arbitrage-Pipeline für kointegrierte Aktien zur Verfügung. Das Skript führt Korrelations-, Kointegrations- und Stationaritätstests in einem einzigen Durchgang durch und stellt die Datenbank mit den aktuellen Strategieparametern zum Lesen durch den Expert Advisor bereit.
Referenzen
Daniel P. Palomar (2025). Portfolio Optimization: Theory and Application. Cambridge University Press.
| Dateiname | Beschreibung |
|---|---|
| StatArb/db-setup.mq5 | MQL5-Skript zum Erstellen und Initialisieren der SQLite-Datenbank durch Einlesen der Schemadatei schema-0.2.sql. |
| StatArb/db-update-statarb-0.2.mq5 | MQL5 Service zur Aktualisierung der SQLite-Datenbank mit den letzten geschlossenen Preisbalken. |
| StatArb/Nasdaq_NVDA_Coint.mql5 | Diese Datei enthält den Beispiel-Expert Advisor. |
| StatArb/Nasdaq_NVDA_Coint.mqh | Diese Datei enthält die Beispiel-Header-Datei für den Expert Advisor. |
| StatArb/schema-0.2.sql | SQL-Schemadatei (DDL) zur Initialisierung der Datenbank (Erzeugung von Tabellen, Feldern und Beschränkungen). |
| stat_arb_pipeline_mt5.py | Dieses Python-Skript führt die komplette stat arb-Pipeline für kointegrierte Aktien auf der Grundlage von Pearson-Korrelationstests, Johansen-Kointegrationstests und ADF- sowie KPSS-Stationaritätstests aus. |
Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/19428
Warnung: Alle Rechte sind von MetaQuotes Ltd. vorbehalten. Kopieren oder Vervielfältigen untersagt.
Dieser Artikel wurde von einem Nutzer der Website verfasst und gibt dessen persönliche Meinung wieder. MetaQuotes Ltd übernimmt keine Verantwortung für die Richtigkeit der dargestellten Informationen oder für Folgen, die sich aus der Anwendung der beschriebenen Lösungen, Strategien oder Empfehlungen ergeben.
Automatisieren von Handelsstrategien in MQL5 (Teil 29): Erstellung eines Preisaktionssystems mit dem harmonischen Muster von Gartley
Verbessern Sie Ihren Handel mit Smart Money Konzepten (SMC): OB, BOS und FVG
Chart-Synchronisation für eine einfachere technische Analyse
Automatisieren von Handelsstrategien in MQL5 (Teil 28): Erstellen eines Price Action Bat Harmonic Patterns mit visuellem Feedback
- Freie Handelsapplikationen
- Über 8.000 Signale zum Kopieren
- Wirtschaftsnachrichten für die Lage an den Finanzmärkte
Sie stimmen der Website-Richtlinie und den Nutzungsbedingungen zu.