English 日本語
preview
Statistische Arbitrage durch kointegrierte Aktien (Teil 4): Modellaktualisierung in Echtzeit

Statistische Arbitrage durch kointegrierte Aktien (Teil 4): Modellaktualisierung in Echtzeit

MetaTrader 5Handelssysteme |
78 0
Jocimar Lopes
Jocimar Lopes

Einführung

Es ist an der Zeit, unser Kointegrationsmodell in Echtzeit zu aktualisieren. Genauer gesagt, ist es an der Zeit, die relative Gewichtung der einzelnen Aktien in unserem Handelsportfolio mit den neuesten Berechnungen zu aktualisieren, während der Expert Advisor läuft. Bisher haben wir in unseren Erläuterungen und Backtests die gleichen Portfoliogewichte verwendet. Sie dienten als Vereinfachung, um das Verständnis des Prozesses und der Bedeutung und Funktionsweise der zugrunde liegenden statistischen Tests zu erleichtern. Aber im wirklichen Leben ändern sich diese Portfoliogewichte praktisch jedes Mal, wenn ein neuer Datenpunkt eintrifft, d. h. in unserem Fall bei jedem neuen geschlossenen Balken.

Doch zunächst ein wenig Kontext, eine kurze Zusammenfassung, um sicherzustellen, dass wir alle auf dem gleichen Level sind.

Wir entwickeln eine Art statistisches Arbitrage-System für den armen Mann - eine Stat-Arb-Pipeline, die für den durchschnittlichen Einzelhändler mit einem Consumer-Notebook und einer normalen Netzwerkbandbreite konzipiert ist. In dieser Artikelserie dokumentieren und beschreiben wir die statistischen Tests, die für die Suche nach hoch korrelierten Vermögenswerten für den Paarhandel und kointegrierten Aktien für die Portfoliobildung verwendet werden, sowie die Tests, die für die Bewertung der Stationarität ihrer relativen Spreads verwendet werden, die für Mean-Reversion-Strategien wie die unsere erforderlich ist.

Im vorigen Artikel dieser Serie (Teil 3) haben wir die Einrichtung der Datenbank beschrieben und einige Anmerkungen zu unseren Designentscheidungen gemacht. Dieser Artikel ist eine direkte Fortsetzung des letzten Artikels. Sobald unsere Datenbank eingerichtet ist und regelmäßig aktualisiert wird, können wir unsere Datenanalyse von unserer Handelstätigkeit entkoppeln.

Jetzt werden wir sehen:

  • wie wir unsere Python-Skripte geändert haben, um die gewonnenen Erkenntnisse einzubeziehen, wobei alle Tests in einem einzigen Durchgang verarbeitet werden, um die Auswahl der Assets zu erleichtern; 

  • wie wir unseren Expert Advisor Code komplett überarbeitet haben, um die aktuelle Datenbank während der Ausführung zu nutzen, und wie wir fast alle Funktionen in eine MQL5-Header-Datei verschoben haben, um die Dinge überschaubarer und wartbarer zu machen, wobei wir nur die nativen Event-Handler in der Hauptdatei behalten haben; 

  • Und schließlich werden wir sehen, welche Änderungen wir an unserer Datenbank vorgenommen haben, damit sie als Schnittstelle zwischen der Echtzeit-Datenanalyse und der Online-Handelsumgebung fungieren kann.

Beginnen wir mit dem letzten, Punkt 3, damit die anderen beiden Punkte bei der Beschreibung einen Sinn ergeben.


Die Datenbank als einzige Quelle der Wahrheit

Wir begannen den letzten Artikel mit einer einfachen Definition, welche Art von Antwort wir von unserer Datenbank an unseren Expert Advisor senden wollen, um das Modell in Echtzeit zu aktualisieren. Dies ist ein typischer „Schummelcode“, der häufig für Bottom-up-Designs verwendet wird. Wir fragen nach dem unmittelbaren Bedarf und arbeiten daran, ihn zu erfüllen. Dadurch wird eine Überplanung vermieden und Routenkorrekturen werden erleichtert. Dies ist der entsprechende Ausschnitt aus dem vorherigen Artikel.

Am Ende des Tages sollte unsere Datenbank eine ganz einfache Frage beantworten: Was sollten wir jetzt handeln, um die größtmögliche Rendite zu erzielen?

In Anlehnung an den letzten Artikel dieser Serie, in dem wir die Richtung und das Volumen unserer Aufträge auf der Grundlage der Gewichtung des Aktienkorbs definiert haben, könnte eine mögliche Antwort etwa so lauten:

SymboleGewichteZeitrahmen
„MU“, „NVDA“, „MPWR“, „MCHP“2.699439, 1.000000, -1.877447, -2.505294D1 (täglicher Zeitrahmen)

Tabelle 1. Beispiel einer erwarteten Antwort auf eine Abfrage zur Aktualisierung eines Echtzeitmodells (früherer Artikel)

Um diese Art von Antwort zu erhalten, haben wir eine neue Tabelle zu unserem Datenbankschema hinzugefügt, die Tabelle „strategy“. Sie hat vier TEXT-Felder:
  • Name (PRIMÄRSCHLÜSSEL)
  • Symbols
  • Weights
  • Timeframe
-- strategy definition

CREATE TABLE strategy (
        name TEXT CHECK(LENGTH(name) <= 20) NOT NULL,
        symbols TEXT CHECK(LENGTH(symbols) <= 255) NOT NULL,
        weights TEXT CHECK(LENGTH(weights) <= 255),
        timeframe TEXT CHECK (
        timeframe IN (
            'M1',
            'M2',
            'M3',
            'M4',
            'M5',
            'M6',
            'M10',
            'M12',
            'M15',
            'M20',
            'M30',
            'H1',
            'H2',
            'H3',
            'H4',
            'H6',
            'H8',
            'H12',
            'D1',
            'W1',
            'MN1'
        )
    ),CONSTRAINT strategy_pk PRIMARY KEY (name)
) WITHOUT ROWID, STRICT;
Wir haben diese Tabelle als Tabelle WITHOUT ROWID definiert. In SQLite sind alle rowid-Tabellen regulären Tabellen mit Ausnahme von virtuellen Tabellen und solchen mit der Beschränkung INTEGER PRIMARY KEY. In rowid-Tabellen sind die Primärschlüssel keine echten Primärschlüssel.
„Die Beschränkung PRIMARY KEY für eine rowid-Tabelle (solange es sich nicht um den echten Primärschlüssel oder INTEGER PRIMARY KEY handelt) ist eigentlich dasselbe wie eine UNIQUE-Beschränkung. Da es sich nicht um einen echten Primärschlüssel handelt, dürfen die Spalten des PRIMARY KEY NULL sein, was gegen alle SQL-Standards verstößt.“ (Hervorhebung durch uns).

Indem wir die Tabelle als WITHOUT ROWID-Tabelle definieren, vermeiden wir, dass NULL eingefügt wird und folglich die relationale Integrität beeinträchtigt wird. Der Nachteil ist, dass diese Klausel SQLite-spezifisch ist, was bedeutet, dass unser Schema nicht mehr vollständig auf andere RDBMS portierbar ist. Um das gleiche Schema in anderen RDBMS zu verwenden, müssen wir es anpassen. Aber ich denke, der Vorteil, dass NULL nicht in das Primärschlüsselfeld eingefügt wird, ist die Kosten wert.

Wie Sie vielleicht bemerkt haben, sind „symbols“ und „weights“ beides Arrays. SQLite verfügt jedoch nicht über den Datentyp ARRAY. Aufgrund dieser Einschränkung werden wir diese Daten als TEXT einfügen, aber als JSON-Array formatiert. Diese Maßnahme wird uns später die Arbeit erleichtern, wenn wir zu einem anderen RDBMS wechseln, das den Datentyp ARRAY verwendet. 

namesymbolsweightstimeframe
Nasdaq_NVDA_Coint‘["MU", "NVDA", "MPWR", "MCHP"]’‘[“2.699439”, “1.000000”, “-1.877447”, “-2.505294”]’D1

Tabelle 2. Symbole und Gewichtungsdaten werden in die Tabelle „strategy“ als TEXT-Datentyp eingefügt und als JSON-Arrays formatiert.

Um sie abzufragen, verwenden wir die SQLite Funktion json_extract()

Unser Datenbankschema, das jetzt auf statarb-0.2 aktualisiert wurde, enthält also die zusätzliche Tabelle „strategy“, deren Entity-Relationship-Diagramm wie folgt aussieht:

Abbildung 1 - Das Entity-Relationship-Diagramm (ERD) der statarb-0.2 Datenbank

Abbildung 1. Das statarb-0.2 Datenbank-Entity-Relationship-Diagramm (ERD)

TIPP - Bei der Entwicklung mit SQLite habe ich mir angewöhnt, die alten Versionen der db-Dateien sowie die alten Versionen der Schemadateien NICHT zu löschen. Da wir es mit einer einzigen Datei zu tun haben, ist es einfach, die Datei zu duplizieren, die Kopie in die neue Version umzubenennen und beide im Versionskontrollsystem (VCS) zu verwalten. Diese Praxis bietet uns einen einfachen „Rollback-Pfad“ für den Fall der Fälle.

Abbildung 2 - Metaeditor-Navigator mit Anzeige der verschiedenen Versionen der SQLite-Dateien (db's) und der entsprechenden Schemadateien

Abbildung 2. Metaeditor-Navigator, der verschiedene Versionen von SQLite-Dateien (db's) und entsprechende Schemadateien anzeigt

Wie Sie in der oben abgebildeten ERD sehen können, ist unsere neue Tabelle „strategy“ noch mit keiner anderen Tabelle verbunden. Dies wird sich ändern, sobald wir mit dem Backtests/Handel mit dieser neuen EA-Version beginnen, wenn sie mit der Handelstabelle verbunden sein wird. Auch hier gilt: Da wir von unten nach oben arbeiten, werden wir unsere Datenbank nur dann ändern, wenn es nötig ist. Im Moment ist die Tabelle „strategy“ nichts weiter als eine Schnittstelle, ein Mittel zur Kommunikation zwischen unserer Datenanalyse und unseren Handelsumgebungen.

Die Datenbank wird als Schnittstelle zwischen unserer Datenanalyse und unserem Trading Expert Advisor sowie als Schnittstelle zwischen unserem Screening - der Anwendung unserer Datenanalyse - und dem EA dienen. Der Ablauf wird in etwa so sein:

Abbildung 3 - Ein Flussdiagramm veranschaulicht den Kreislaufaspekt unserer statistischen Arbitrage-Pipeline

Abbildung 3. Ein Flussdiagramm veranschaulicht den zirkulären Aspekt unserer statistischen Arbitrage-Pipeline


Die Python-Seite

Wir sind von einzelnen Jupyter-Notebooks für jede Aufgabe zu einem einheitlichen Python-Skript übergegangen, das eine umfassende statistische Arbitrage-Pipeline für kointegrierte Aktien ausführt. Dieses Skript wird:

Prüfen Sie, ob die SQLite-Datenbank existiert.

def CheckForDb(db_path: str | Path) -> bool:
    """Return True if the SQLite database file exists on disk."""
    exists = Path(db_path).is_file()
    logger.info("Database exists: %s (%s)", exists, db_path)
    return exists

Es wird eine Datenbank erstellt, wenn keine gefunden wird. Dies ist praktisch, wenn Sie diese Serie nicht verfolgen und einfach nur mit der Stat-Arb-Pipeline experimentieren möchten. Alles, was Sie tun müssen, ist, das Datenbankschema (hier beigefügt) bereitzustellen.

def CreateDbIfMissing(db_path: str | Path, schema_path: Optional[str | Path] = None) -> None:
    """
    If the database does not exist and a schema is provided, create it from SQL.
    Otherwise, create an empty file (SQLite creates DB on first connection).
    """
    db_path = Path(db_path)
    if db_path.exists():
        logger.info("DB already present: %s", db_path)
        return
    logger.info("DB missing; creating: %s", db_path)

    with sqlite3.connect(db_path) as conn:
        if schema_path:
            schema_sql = Path(schema_path).read_text(encoding="utf-8")
            conn.executescript(schema_sql)
            logger.info("Applied schema from: %s", schema_path)

Verbinden Sie sich mit dem MetaTrader 5 Terminal.

def InitMT5(login: Optional[int] = None, password: Optional[str] = None,
            server: Optional[str] = None, path: Optional[str] = None) -> None:

    """Initialize connection to the running MetaTrader 5 terminal."""

    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")

    # if not mt5.initialize(path=path, login=login, password=password, server=server):
    if not mt5.initialize():
        raise RuntimeError(f"MT5 initialize() failed: {mt5.last_error()}")
    logger.info("MetaTrader5 initialized.")

Laden Sie Marktdaten von MetaTrader 5 herunter.

def DownloadMarketData(symbols: List[str], timeframes: List[str], bars: int = 5000
                       ) -> Dict[Tuple[str, str], pd.DataFrame]:
    """
    Download OHLCV data from MT5 for each (symbol, timeframe).
    Returns dict with keys (symbol, timeframe) -> DataFrame columns:
    ['tstamp','timeframe','open','high','low','close','tick_volume','real_volume','spread']
    """
    if mt5 is None:
        raise RuntimeError("MetaTrader5 package not available.")
    out: Dict[Tuple[str, str], pd.DataFrame] = {}

    for sym in symbols:
        for tf in timeframes:
            tf_const = _mt5_timeframe_constant(tf)
            rates = mt5.copy_rates_from_pos(sym, tf_const, 0, bars)

            if rates is None or len(rates) == 0:
                logger.warning("No data for %s %s", sym, tf)
                continue

            df = pd.DataFrame(rates)

            # Rename & select
            df = df.rename(columns={
                "time": "tstamp",
                "open": "price_open",
                "high": "price_high",
                "low": "price_low",
                "close": "price_close",
                "real_volume": "real_volume",
            })

            df["timeframe"] = tf

            cols = ["tstamp","timeframe","price_open","price_high","price_low","price_close",
                    "tick_volume","real_volume","spread"]
            df = df[cols].copy()
            out[(sym, tf)] = df
            logger.info("Downloaded %d bars for %s %s", len(df), sym, tf)
    return out

Speichern Sie diese Marktdaten in der SQLite-Datenbank.

def StoreMarketData(db_path: str | Path, data: Dict[Tuple[str, str], pd.DataFrame]) -> None:

    """Upsert market data into SQLite according to the provided schema."""
    with sqlite3.connect(db_path) as conn:
        for (sym, tf), df in data.items():
            symbol_id = _ensure_symbol(conn, sym)

            df = df.copy()
            df["symbol_id"] = symbol_id

            # Use executemany for performance
            sql = """
            INSERT OR REPLACE INTO market_data
            (tstamp, timeframe, price_open, price_high, price_low, price_close,
             tick_volume, real_volume, spread, symbol_id)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """

            rows = list(df[["tstamp","timeframe","price_open","price_high","price_low","price_close",
                            "tick_volume","real_volume","spread","symbol_id"]].itertuples(index=False, name=None))
            conn.executemany(sql, rows)
            conn.commit()

            logger.info("Stored %d rows for %s %s", len(rows), sym, tf)

Prüfen Sie die Pearson-Korrelation für die ausgewählten Symbole.

def RunCorrelationTest(prices: pd.DataFrame, plot: bool = True) -> Tuple[pd.DataFrame, Tuple[str, str], float]:

    """Compute Pearson correlation matrix and return the most correlated pair (off-diagonal)."""
    corr = prices.corr(method="pearson")

    # Mask diagonal, find max
    corr_values = corr.where(~np.eye(len(corr), dtype=bool))
    idxmax = np.unravel_index(np.nanargmax(corr_values.values), corr_values.shape)
    symbols = corr.columns.tolist()
    pair = (symbols[idxmax[0]], symbols[idxmax[1]])
    value = corr_values.values[idxmax]

    logger.info("Most correlated pair: %s ~ %s (r=%.4f)", pair[0], pair[1], value)

    if plot:
        plt.figure(figsize=(10, 7))
        sns.heatmap(corr, annot=False, vmin=-1, vmax=1, cmap="coolwarm", square=True)
        plt.title("Pearson Correlation Heatmap")
        plt.tight_layout()
        plt.show()

    print(f"Most correlated pair: {pair[0]} ~ {pair[1]} (r={value:.4f})")

    return corr, pair, float(value)

Berechnen Sie den Engle-Granger-Kointegrationskoeffizienten für die ausgewählten Symbole und geben Sie das am stärksten korrelierte Paar an.

def RunEngleGrangerTest(prices: pd.DataFrame, plot: bool = True
                        ) -> Tuple[Tuple[str, str], float, float, pd.Series]:
    """
    Loop over all symbol pairs, run statsmodels.coint (Engle-Granger). Return:
        best_pair, best_pvalue, hedge_ratio (y_on_x), spread series (y - beta*x)
    """
    best_pair, best_p, best_beta, best_spread = None, 1.0, np.nan, None
    cols = list(prices.columns)
    for y_sym, x_sym in itertools.combinations(cols, 2):
        y = prices[y_sym].dropna()
        x = prices[x_sym].dropna()
        join = pd.concat([y, x], axis=1).dropna()

        if len(join) < 50:
            continue

        score, pvalue, _ = coint(join.iloc[:,0], join.iloc[:,1])

        if pvalue < best_p:
            beta = _ols_hedge_ratio(join.iloc[:,0], join.iloc[:,1])
            spread = join.iloc[:,0] - beta * join.iloc[:,1]
            best_pair, best_p, best_beta, best_spread = (y_sym, x_sym), pvalue, beta, spread

    if best_pair is None:
        raise ValueError("No cointegrated pairs found (Engle-Granger) with sufficient data.")

    print(f"Most cointegrated pair (Engle-Granger): {best_pair[0]} ~ {best_pair[1]} (p={best_p:.6f}, beta={best_beta:.4f})")

    if plot:
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(prices.index, prices[best_pair[0]], label=best_pair[0])
        ax.plot(prices.index, prices[best_pair[1]]*best_beta, label=f"{best_pair[1]} * beta")
        ax.set_title(f"Best EG Pair Overlay (beta={best_beta:.4f})")
        ax.legend()
        fig.tight_layout()
        plt.show()


      fig2, ax2 = plt.subplots(figsize=(10, 4)) ax2.plot(best_spread.index, best_spread.values, label="Spread") ax2.axhline(0, line) ax2.set_title("Spread (y - beta*x)")

        ax2.legend() fig2.tight_layout()

        plt.show()

    return best_pair, float(best_p), float(best_beta), best_spread

Führen Sie die Stationaritätstests (ADF und KPSS) für die Engle-Granger-Spanne durch.

def RunADFTest(series: pd.Series) -> Tuple[float, float, dict]:
    """Augmented Dickey-Fuller test on a univariate series."""
    series = pd.Series(series).dropna()
    adf_stat, pvalue, usedlag, nobs, crit, icbest = adfuller(series, autolag="AIC")

    print(f"ADF: stat={adf_stat:.4f}, p={pvalue:.6f}, lags={usedlag}, nobs={nobs}, crit={crit}")

    return float(adf_stat), float(pvalue), crit
def RunKPSSTest(series: pd.Series, regression: str = "c") -> Tuple[float, float, dict]:

    """KPSS stationarity test on a univariate series (regression='c' or 'ct')."""
    series = pd.Series(series).dropna()
    stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

    print(f"KPSS({regression}): stat={stat:.4f}, p={pvalue:.6f}, lags={lags}, crit={crit}")

    return float(stat), float(pvalue), crit

Überprüfen Sie die Johansen-Kointegration (mit Portfoliogewichten aus dem ersten Eigenvektor, wenn Rang >= 1).

def RunJohansenTest(prices: pd.DataFrame, det_order: int = 0, k_ar_diff: int = 1, plot: bool = True
                    ) -> Tuple[int, np.ndarray, np.ndarray, Optional[pd.Series]]:
    """
    Johansen cointegration test on a set of symbols (>=2).
    Returns: (rank, eigenvectors (beta), eigenvalues, spread_from_first_vector or None)
    - Significant rank determined by trace statistics vs 95% critical values.
    - First eigenvector provides portfolio weights (normalized s.t. min(|w|)=1 or w[-1]=1).
    """

    data = prices.dropna().values

    if data.shape[1] < 2:
        raise ValueError("Johansen requires at least two series.")

    joh = coint_johansen(data, det_order, k_ar_diff)

    # Significant rank by trace statistic > crit value (95% = column index 1)
    trace_stats = joh.lr1  # trace stats
    crit_vals = joh.cvt[:, 1]  # 95%
    rank = int(np.sum(trace_stats > crit_vals))
    print("Johansen trace stats:", trace_stats)

    print("Johansen 95% crit vals:", crit_vals)

    print(f"Johansen inferred rank: {rank}")

    weights = None
    spread = None

    if rank >= 1:

        # Beta columns are cointegrating vectors; take the first (associated with largest eigenvalue)
        beta = joh.evec  # shape (k, k)
        eigvals = joh.eig  # shape (k,)

        # Sort by eigenvalue desc to be explicit
        order = np.argsort(eigvals)[::-1]
        beta_sorted = beta[:, order]
        eig_sorted = eigvals[order]
        first_vec = beta_sorted[:, 0]

        # Normalize such that the weight of the last symbol is 1 (common in practice)
        if first_vec[-1] == 0:
            norm = np.max(np.abs(first_vec))
        else:
            norm = first_vec[-1]

        weights = first_vec / norm

        print("First Johansen eigenvector (portfolio weights):")

        for sym, w in zip(prices.columns, weights):
            print(f"  {sym:>12s}: {w:+.6f}")

        # Spread = P * weights
        spread = pd.Series(prices.values @ weights, index=prices.index, name="JohansenSpread")

        if plot:
            fig, ax = plt.subplots(figsize=(10, 4))
            ax.plot(spread.index, spread.values, label="Johansen Spread")
            ax.axhline(0, linestyle="--")
            ax.set_title("Johansen Spread (1st eigenvector)")
            ax.legend()
            fig.tight_layout()

            plt.show()

        return rank, beta_sorted, eig_sorted, spread, weights

    return rank, joh.evec, joh.eig, weights, None

Wenn ein Rang im Johansen-Kointegrationstest SIGNIFIKANT ist, führt das Skript auch den ADF- und den KPSS-Test für die Johansen-Spanne durch und speichert schließlich die Strategieparameter (Symbole, Gewichte und Zeitrahmen) in der Tabelle „strategy“, wie im vorherigen Abschnitt erwähnt.

def StoreJohansenStrategy(db_path: str | Path, name: str, symbols: List[str],
                          weights: np.ndarray, timeframe: str) -> None:
    """
    Store Johansen eigenvector (portfolio weights) and symbols into the strategy table.
    Symbols and weights are stored as JSON arrays (text).
    """

    import json

    with sqlite3.connect(db_path) as conn:
        cur = conn.cursor()
        cur.execute("""
            INSERT OR REPLACE INTO strategy (name, symbols, weights, timeframe)
            VALUES (?, ?, ?, ?)
        """, (
            name,
            json.dumps(symbols),
            json.dumps([float(w) for w in weights]),
            timeframe
        ))
        conn.commit()

    logger.info("Stored Johansen strategy '%s' with symbols=%s timeframe=%s",
                name, symbols, timeframe)

Zur Vereinfachung wird die Ausführung angehalten und das Chart bei jedem relevanten Abschnitt (z. B. Spreads) gezeichnet, sodass Sie leicht beurteilen können, ob die ausgewählten Symbole eine detailliertere Bewertung wert sind und eventuell ihre Listenreihenfolge (für Engle-Granger) oder den Zeitrahmen ändern. Das Skript ist ein Werkzeug für das manuelle Screening, bevor wir im nächsten Schritt eine vollautomatische Pipeline aufbauen. Jede Aufgabe ist als unabhängige Funktion gekapselt, sodass Sie sie leicht in andere Skripte importieren können.

WARNUNG: Dieses Skript ist unser Einstiegspunkt in das System. Gleich nachdem hier eine neue Symbolliste definiert wurde, sollten Sie sie in den db-update-Dienst einfügen. Dadurch wird sichergestellt, dass Ihre Datenbank in einem ruhigen Tempo aktualisiert wird, auch wenn Ihr EA aus irgendeinem Grund nicht läuft. Dieser Schritt muss vorerst manuell durchgeführt werden.

Um das Skript auszuführen, rufen Sie es über die Befehlszeile mit Parametern auf. Zum Beispiel:

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "D1" --bars 1000 --run

Wie Sie sehen können, verwenden wir Umgebungsvariablen für den Parameter db path. Wenn alles gut geht, sollten Sie so etwas sehen, wenn es gemäß dem Johansen-Test KEINE KINTEGRATION gibt.

2025-09-03 15:35:24,588 [INFO] Database exists: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

2025-09-03 15:35:24,589 [INFO] Using existing DB at C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db.

2025-09-03 15:35:24,605 [INFO] MetaTrader 5 initialized.

2025-09-03 15:35:24,613 [INFO] Downloaded 1000 bars for MU D1  

2025-09-03 15:35:24,618 [INFO] Downloaded 1000 bars for NVDA D1

2025-09-03 15:35:24,625 [INFO] Downloaded 1000 bars for MPWR D1

2025-09-03 15:35:24,633 [INFO] Downloaded 1000 bars for MCHP D1

2025-09-03 15:35:24,997 [INFO] Stored 1000 rows for MU D1

2025-09-03 15:35:25,261 [INFO] Stored 1000 rows for NVDA D1

2025-09-03 15:35:25,514 [INFO] Stored 1000 rows for MPWR D1

2025-09-03 15:35:25,870 [INFO] Stored 1000 rows for MCHP D1

2025-09-03 15:35:25,871 [INFO] MetaTrader 5 shutdown.

2025-09-03 15:35:25,961 [INFO] ReadMarketData: D1 timeframe, 1000 rows, 4 symbols

2025-09-03 15:35:25,964 [INFO] Most correlated pair: MPWR ~ NVDA (r=0.8335)

Das am stärksten korrelierende Paar: MPWR ~ NVDA (r=0.8335)

Am meisten kointegriertes Paar (Engle-Granger): MPWR ~ MU (p=0,010558, beta=5,4940)


ADF & KPSS zur Engle-Granger-Spanne:

ADF: stat=-3.8778, p=0.002204, lags=16, nobs=983, crit={'1%': np.float64(-3.4370198458812156), '5%': np.float64(-2.864484708707697), '10%': np.float64(-2.568337912084273)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: InterpolationWarnung: Die Teststatistik liegt außerhalb des Bereichs der in der Look-up-Tabelle verfügbaren p-Werte. Der tatsächliche p-Wert ist kleiner als der zurückgegebene p-Wert.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0.9279, p=0.010000, lags=19, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}

Johansen-Test für alle angegebenen Symbole:

Johansen-Spurenstatistiken: [34.26486877 18.91156161 8.04865462 0.11018312]

Johansen 95% kritische Werte: [47.8545 29.7961 15.4943 3.8415]

Von Johansen abgeleiteter Rang: 0

Wie bereits erwähnt, speichert das Skript bei einem SIGNIFIKANTEN Johansen-Rang (>=1), d.h. wenn es mindestens einen kointegrierten Vektor zwischen den ausgewählten Symbolen gibt, die ersten Eigenvektorwerte (Portfoliogewichte) in der Datenbank in der „Strategietabelle“. Außerdem werden die ADF- und KPSS-Tests für die Streuung durchgeführt.

PS C:\Users\your\path\to\JupyterNotebooks\StatArb\coint> python py/stat_arb_pipeline_mt5.py --db $env:STATARB_DB_PATH --schema sql/schema-0.2.sql --symbols "MU,NVDA,MPWR,MCHP" --timeframes "H4" --bars 90 --run   

In diesem Fall sollten Sie in der Windows PowerShell oder im Terminal etwas wie folgt sehen:

2025-09-03 15:50:43,168 [INFO] Database exists: True (C:\Users\your\path\to\MQL5\Files\StatArb\statarb-0.2.db)

2025-09-03 15:50:43,169 [INFO] Using existing DB at C:\Users\your\path\to\Files\StatArb\statarb-0.2.db.

2025-09-03 15:50:43,188 [INFO] MetaTrader 5 initialized.

2025-09-03 15:50:44,297 [INFO] Downloaded 90 bars for MU H4

2025-09-03 15:50:44,357 [INFO] Downloaded 90 bars for NVDA H4

2025-09-03 15:50:46,322 [INFO] Downloaded 90 bars for MPWR H4

2025-09-03 15:50:46,732 [INFO] Downloaded 90 bars for MCHP H4

2025-09-03 15:50:49,148 [INFO] Stored 90 rows for MU H4

2025-09-03 15:50:49,160 [INFO] Stored 90 rows for NVDA H4

2025-09-03 15:50:49,171 [INFO] Stored 90 rows for MPWR H4

2025-09-03 15:50:49,276 [INFO] Stored 90 rows for MCHP H4

2025-09-03 15:50:49,277 [INFO] MetaTrader 5 shutdown.

2025-09-03 15:50:49,617 [INFO] ReadMarketData: H4 timeframe, 90 rows, 4 symbols

2025-09-03 15:50:49,622 [INFO] Most correlated pair: MPWR ~ NVDA (r=0,6115)

Das am stärksten korrelierende Paar: MPWR ~ NVDA (r=0,6115)

Am meisten kointegriertes Paar (Engle-Granger): MU ~ NVDA (p=0,072055, beta=-0,2391)


ADF & KPSS zur Engle-Granger-Spanne:

ADF: stat=-3.1693, p=0.021831, lags=0, nobs=89, crit={'1%': np.float64(-3.506057133647011), '5%': np.float64(-2.8946066061911946), '10%': np.float64(-2.5844100201994697)}

C:\Users\your\path\to\JupyterNotebooks\StatArb\coint\py\stat_arb_pipeline_mt5.py:321: InterpolationWarning: The test statistic is outside of the range of p-values available in the look-up table. The actual p-value is greater than the p-value returned.

stat, pvalue, lags, crit = kpss(series, regression=regression, nlags="auto")

KPSS(c): stat=0.1793, p=0.100000, lags=5, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}


Johansen-Test für alle angegebenen Symbole:

Johansen-Spurenstatistiken: [63.23695088 32.35462044 16.63904745 5.4295422 ]

Johansen 95% kritische Werte: [47.8545 29.7961 15.4943 3.8415]

Von Johansen abgeleiteter Rang: 4

Erster Johansen-Eigenvektor (Portfolio-Gewichte):

          MCHP: +0.273652

          MPWR: -0.148092

            MU: +2.256498

          NVDA: +1.000000

ADF & KPSS auf Johansen-Spread:

ADF: stat=-4.1427, p=0.000823, lags=1, nobs=88, crit={'1%': np.float64(-3.506944401824286), '5%': np.float64(-2.894989819214876), '10%': np.float64(-2.584614550619835)}

KPSS(c): stat=0.4009, p=0.076784, lags=4, crit={'10%': 0.347, '5%': 0.463, '2.5%': 0.574, '1%': 0.739}

2025-09-03 15:50:50,051 

[INFO] Stored Johansen strategy 'Nasdaq_NVDA_Coint' with symbols=['MCHP', 'MPWR', 'MU', 'NVDA'] timeframe=H4


Was ist mit den oben genannten Interpolationswarnungen?

Diese Interpolationswarnungen stehen im Zusammenhang mit dem KPSS-Stationaritätstest für die Engle-Granger-Spanne. Sie teilen uns mit, dass wir extreme Ergebnisse außerhalb der Testnachschlagetabelle erhalten haben. Die Warnungen weisen uns lediglich darauf hin, dass unsere Engle-Granger-Ausbreitung

  • stark stationär (p-value größer als der zurückgegebene p-value) 
  • oder stark instationär (ein p-Wert kleiner als der zurückgegebene p-Wert) sind.

Wir können diese Warnungen vorerst getrost ignorieren, solange wir uns mit den Echtzeit-Aktualisierungen unserer Modellparameter beschäftigen. Wir werden sie im nächsten Schritt näher betrachten, wenn wir uns mit dem Screening unserer Vermögenswerte befassen.


Die MQL5-Seite

Wir haben nur die EA-Ereignishandlung in der Hauptdatei belassen. Alle anderen Funktionen wurden in die Header-Datei Nasdaq_NVDA_Coint.mqh verschoben.

Zuvor waren die Strategieparameter fest kodiert. Jetzt sind es globale Variablen. 

// Global variables
string symbols[] = {}; // Asset symbols
double weights[] = {}; // Portfolio weights
ENUM_TIMEFRAMES timeframe = PERIOD_CURRENT; // Strategy cointegrated timeframe

Diese globalen Variablen werden direkt in OnInit() überprüft.

//+------------------------------------------------------------------+
//| Expert initialization function                                   |
//+------------------------------------------------------------------+

int OnInit()
  {
   ResetLastError();

// Check if all symbols are available
   for(int i = 0; i < ArraySize(symbols); i++)
     {
      if(!SymbolSelect(symbols[i], true))
        {
         Print("Error: Symbol ", symbols[i], " not found!");
         return(INIT_FAILED);
        }
     }
// Initialize spread buffer
   ArrayResize(spreadBuffer, InpLookbackPeriod);

// Set a timer for spread, mean, and stdev calculations
   EventSetTimer(InpUpdateFreq * 60); // min one minute

// Load strategy parameters from database
   if(!LoadStrategyFromDB(InpDbFilename,
                                InpStrategyName,
                                symbols,
                                weights,
                                timeframe))
     {
      // Handle error - maybe use default values
      printf("Error at " + __FUNCTION__ + " %s ",
             getUninitReasonText(GetLastError()));
      return INIT_FAILED;
     }
   return(INIT_SUCCEEDED);
  }

Die Funktion LoadStrategyFromDB() befindet sich in der Header-Datei.

//+------------------------------------------------------------------+
//|          Load strategy parameter from database                   |
//+------------------------------------------------------------------+

bool LoadStrategyFromDB(string db_name,
                        string strat_name,
                        string &symbols_arr[],
                        double &weights_arr[],
                        ENUM_TIMEFRAMES &tf)
  {

// Open the database (Metatrader's integrated SQLite)
   int db = DatabaseOpen(db_name, DATABASE_OPEN_READONLY);
   if(db == INVALID_HANDLE)
     {
      Print("Failed to open database: %s", GetLastError());
      return false;
     }
// Prepare the SQL query with json_extract
   string query = StringFormat(
                     "SELECT "
                     "json_extract(symbols, '$') as symbols_json, "
                     "json_extract(weights, '$') as weights_json, "
                     "timeframe "
                     "FROM strategy WHERE name = '%s'",
                     strat_name
                  );

// Execute the query
   int result = DatabasePrepare(db, query);

   if(result <= 0)
     {
      Print("Failed to prepare query: ", GetLastError());
      DatabaseClose(db);
      return false;
     }

// Read the results
   if(DatabaseRead(result))
     {
      // Get symbols_arr JSON array and parse it
      string symbolsJson;
      DatabaseColumnText(result, 0, symbolsJson);
      ParseJsonArray(symbolsJson, symbols_arr);

      // Get weights_arr JSON array and parse it
      string weightsJson;
      DatabaseColumnText(result, 1, weightsJson);
      ParseJsonDoubleArray(weightsJson, weights_arr);

      // Get tf string and convert to ENUM_TIMEFRAMES
      string timeframeStr;
      DatabaseColumnText(result, 2, timeframeStr);
      tf = StringToTimeframe(timeframeStr);

      Print("Successfully loaded strategy: ", strat_name);

      Print("Symbols JSON: ", symbolsJson);
      Print("Weights JSON: ", weightsJson);
      Print("Timeframe: ", timeframeStr);
     }
   else
     {
      Print("Strategy not found: ", strat_name);

      DatabaseFinalize(result);
      DatabaseClose(db);
      return false;
     }
// Clean up

   DatabaseFinalize(result);
   DatabaseClose(db);
   return true;
  }

Wie bereits erwähnt, werden wir die Arrays der Symbole und der Portfoliogewichte mit der SQLite-Funktion json_extract() lesen. Sie trägt die folgende Signatur:

json_extract(json, Pfad)

wobei:

  • „json“ ist die Textspalte oder Zeichenkette, die die JSON-Daten enthält,
  • „path“ ist ein JSON-Pfadausdruck, der die Position des zu extrahierenden Wertes innerhalb des JSON angibt. Er beginnt in der Regel mit $, um die Wurzel des JSON-Dokuments darzustellen, gefolgt von der Punktnotation für Objektschlüssel ($.key) oder der Klammernotation für Array-Indizes ($[index]). Diese letzte Form verwenden wir hier, um die Array-Daten zu lesen.

Die beiden Hilfsfunktionen parsen die JSON-Arrays. Diese Funktionen verarbeiten das JSON-Format, indem sie Klammern und Anführungszeichen entfernen und dann durch Kommas trennen.

ParseJsonArray()

parst JSON-String-Arrays wie ["MU", "NVDA", "MPWR", "MCHP"]

// Helper function to parse JSON array of strings
void ParseJsonArray(string json, string &array[])
  {
// Remove brackets and quotes from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]
   StringReplace(cleaned, "\"", ""); // Remove quotes

// Split by commas
   StringSplit(cleaned, ',', array);

// Trim whitespace from each element
   for(int i = 0; i < ArraySize(array); i++)
     {
      array[i] = StringTrim(array[i]);
     }
  }

ParseJsonDoubleArray()

parst JSON-Zahlen-Arrays wie [2.699439, 1.000000, -1.877447, -2.505294]

// Helper function to parse JSON array of doubles
void ParseJsonDoubleArray(string json, double &array[])
  {
// Remove brackets from JSON array
   string cleaned = StringSubstr(json, 1, StringLen(json) - 2); // Remove [ and ]

// Split by commas
   string stringArray[];
   int count = StringSplit(cleaned, ',', stringArray);

// Convert to double array
   ArrayResize(array, count);

   for(int i = 0; i < count; i++)
     {
      array[i] = StringToDouble(StringTrim(stringArray[i]));
     }
  }

Hinzugefügt wurde die Funktion

StringTrim(),

um alle Leerzeichen zu entfernen, die in den JSON-Werten vorhanden sein könnten.

// Helper function to trim whitespace
string StringTrim(string str)
  {

// Trim leading whitespace
   while(StringLen(str) > 0 && str[0] == ' ')
     {
      str = StringSubstr(str, 1);
     }

// Trim trailing whitespace
   while(StringLen(str) > 0 && str[StringLen(str) - 1] == ' ')
     {
      str = StringSubstr(str, 0, StringLen(str) - 1);
     }

   return str;
  }

Es gibt JSON-Bibliotheken in der MQL5 Codebase. Wenn Sie JSON-Objekte intensiver nutzen, lohnt es sich vielleicht, nach ihnen zu suchen und die Vorteile zu evaluieren. In diesem einfachen Fall reichen die Vorteile meines Erachtens nicht aus, um eine externe Abhängigkeit zu begründen.

Die Hilfsfunktion

StringToTimeframe()

wandelt die Zeichenkette des Datenbank-Zeitrahmens in den entsprechenden Wert der ENUM_TIMEFRAMES von MQL5 um.

// Helper function to convert tf string to ENUM_TIMEFRAMES
ENUM_TIMEFRAMES StringToTimeframe(string tfStr)
  {
   if(tfStr == "M1")
      return PERIOD_M1;
   if(tfStr == "M2")
      return PERIOD_M2;
.
.
.
   if(tfStr == "MN1")
      return PERIOD_MN1;
   return PERIOD_CURRENT; // Default if not found
  }

Neben der Überprüfung des Strategieparameters im OnInit(), der beim Start oder Neustart des EA aufgerufen wird, rufen wir dieselbe Funktion auch in regelmäßigen Abständen im OnTimer()-Eventhandler auf. Dieses Zeitintervall ist ein vom Nutzer eingegebener Parameter.

// Set a timer for spread, mean, and stdev calculations
// and strategy parameters update (check DB)
   EventSetTimer(InpUpdateFreq * 60); // min one minute

void OnTimer(void)
  {
   ResetLastError();
// Wrapper around LoadStrategyFromDB: for clarity
   if(!UpdateModelParams(InpDbFilename,
                         InpStrategyName,
                         symbols,
                         weights,
                         timeframe))
     {
      printf("%s failed: %s", __FUNCTION__, GetLastError());
     }

UpdateModelParams() ist nur ein Wrapper für LoadStrategyFromDB(). 

//+------------------------------------------------------------------+
//|          Wrapper to load strategy from db                        |
//+------------------------------------------------------------------+

// Load strategy parameters from database
bool UpdateModelParams(string db_name,
                       string strat_name,
                       string &symbols_arr[],
                       double &weights_arr[],
                       ENUM_TIMEFRAMES tf)
  {
   return LoadStrategyFromDB(db_name, strat_name, symbols_arr, weights_arr, tf);
  }

Dadurch wird der Zweck der Funktion deutlicher.


Schlussfolgerung

Bei einem ausgewogenen Korb kointegrierter Aktien für eine Strategien der Rückkehr zum Mittelwert müssen die Portfoliogewichte kontinuierlich aktualisiert werden. In diesem Artikel haben wir gesehen, dass wir immer aktuelle Strategieparameter haben können, einschließlich der Korbsymbole selbst.

Wir haben beschrieben, wie wir die in MetaTrader 5 integrierte SQLite-Datenbank als eine Art Schnittstelle zwischen unserer Datenanalyse und unserer Handelsumgebung nutzen können, wobei diese beiden Aktivitäten entkoppelt bleiben.

Um den Lesern, die diese Serie nicht verfolgen, die Arbeit zu erleichtern, stellen wir ein einheitliches Python-Skript mit einer vollständigen statistischen Arbitrage-Pipeline für kointegrierte Aktien zur Verfügung. Das Skript führt Korrelations-, Kointegrations- und Stationaritätstests in einem einzigen Durchgang durch und stellt die Datenbank mit den aktuellen Strategieparametern zum Lesen durch den Expert Advisor bereit.


Referenzen

Daniel P. Palomar (2025). Portfolio Optimization: Theory and Application. Cambridge University Press.

DateinameBeschreibung
StatArb/db-setup.mq5MQL5-Skript zum Erstellen und Initialisieren der SQLite-Datenbank durch Einlesen der Schemadatei schema-0.2.sql.
StatArb/db-update-statarb-0.2.mq5MQL5 Service zur Aktualisierung der SQLite-Datenbank mit den letzten geschlossenen Preisbalken.
StatArb/Nasdaq_NVDA_Coint.mql5Diese Datei enthält den Beispiel-Expert Advisor.
StatArb/Nasdaq_NVDA_Coint.mqhDiese Datei enthält die Beispiel-Header-Datei für den Expert Advisor.
StatArb/schema-0.2.sqlSQL-Schemadatei (DDL) zur Initialisierung der Datenbank (Erzeugung von Tabellen, Feldern und Beschränkungen).
stat_arb_pipeline_mt5.pyDieses Python-Skript führt die komplette stat arb-Pipeline für kointegrierte Aktien auf der Grundlage von Pearson-Korrelationstests, Johansen-Kointegrationstests und ADF- sowie KPSS-Stationaritätstests aus.

Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/19428

Beigefügte Dateien |
Automatisieren von Handelsstrategien in MQL5 (Teil 29): Erstellung eines Preisaktionssystems mit dem harmonischen Muster von Gartley Automatisieren von Handelsstrategien in MQL5 (Teil 29): Erstellung eines Preisaktionssystems mit dem harmonischen Muster von Gartley
In diesem Artikel entwickeln wir ein System des Gartley-Musters in MQL5, das harmonische Auf- und Abwärtsmuster von Gartley mit Hilfe von Umkehrpunkten und Fibonacci-Verhältnissen identifiziert und Handelsgeschäfte mit präzisen Einstiegs-, Stop-Loss- und Take-Profit-Levels ausführt. Wir verbessern den Einblick des Händlers mit visuellem Feedback durch Chart-Objekte wie Dreiecke, Trendlinien und Beschriftungen, um die XABCD-Musterstruktur klar darzustellen.
Verbessern Sie Ihren Handel mit Smart Money Konzepten (SMC): OB, BOS und FVG Verbessern Sie Ihren Handel mit Smart Money Konzepten (SMC): OB, BOS und FVG
Verbessern Sie Ihren Handel mit Smart Money Konzepten (SMC) durch die Kombination von Order Blocks (OB), Break of Structure (BOS) und Fair Value Gaps (FVG) in einem leistungsstarken EA. Wählen Sie die automatische Strategieausführung oder konzentrieren Sie sich auf jedes einzelne SMC-Konzept, um flexibel und präzise zu handeln.
Chart-Synchronisation für eine einfachere technische Analyse Chart-Synchronisation für eine einfachere technische Analyse
Die Chart-Synchronisierung für eine einfachere technische Analyse ist ein Tool, das sicherstellt, dass alle Chart-Zeitrahmen für ein einzelnes Symbol konsistente grafische Objekte wie Trendlinien, Rechtecke oder Indikatoren über verschiedene Zeitrahmen hinweg anzeigen. Aktionen wie Schwenken, Zoomen oder Symbolwechsel werden in allen synchronisierten Charts gespiegelt, sodass Händler nahtlos denselben Preisaktionskontext in mehreren Zeitrahmen anzeigen und vergleichen können.
Automatisieren von Handelsstrategien in MQL5 (Teil 28): Erstellen eines Price Action Bat Harmonic Patterns mit visuellem Feedback Automatisieren von Handelsstrategien in MQL5 (Teil 28): Erstellen eines Price Action Bat Harmonic Patterns mit visuellem Feedback
In diesem Artikel entwickeln wir ein Bat-Pattern-System in MQL5, das Auf- und Abwärtsmuster von Bat-Harmonic unter Verwendung von Umkehrpunkten und Fibonacci-Verhältnissen identifiziert und Handelsgeschäfte mit präzisen Einstiegs-, Stop-Loss- und Take-Profit-Levels auslöst, ergänzt durch visuelles Feedback durch Chart-Objekte