MetaTrader 5 Machine Learning Blueprint (Teil 5): Sequentielles Bootstrapping – Verzicht auf Kennzeichen, Verbesserung der Ergebnisse
Einführung
In diesem Artikel wird das Sequential Bootstrapping vorgestellt, ein prinzipielles Stichprobenverfahren, das die Gleichzeitigkeit an der Wurzel packt. Anstatt die Redundanz nach der Probenahme zu korrigieren, verhindert das sequentielle Bootstrapping sie aktiv während des Probenahmeprozesses selbst. Durch dynamische Anpassung der Ziehungswahrscheinlichkeiten auf der Grundlage zeitlicher Überschneidungen konstruiert diese Methode Bootstrap-Stichproben mit maximal unabhängigen Beobachtungen.
Wir zeigen Ihnen, wie das geht:
- Die grundlegenden Grenzen des Standard-Bootstraps in finanziellen Zusammenhängen verstehen.
- Implementierung des sequentiellen Bootstrap-Algorithmus nach den ersten Prinzipien.
- Validierung seiner Wirksamkeit durch Monte-Carlo-Simulationen.
- Integration in eine vollständige ML-Pipeline für den Finanzbereich.
- Bewertung von Leistungsverbesserungen bei echten Handelsstrategien.
Voraussetzungen
Dieser Artikel setzt Kenntnisse über den das Problem der Kennzeichen-Parallelität im finanziellen ML und Triple-Barrier-Kennzeichnung Techniken voraus, die wir bereits zuvor in dieser Reihe behandelt haben. Gute Kenntnisse der ML-Bibliotheken von Python sind unerlässlich, um von diesem Artikel zu profitieren.
Den gesamten Quellcode zu dieser Serie finden Sie in meinem GitHub.
Warum Bootstrap in der traditionellen Statistik funktioniert
Die 1979 von Bradley Efron eingeführte Bootstrap-Methode ist eines der leistungsfähigsten Werkzeuge für statistische Schlussfolgerungen. Die Eleganz dieser Methode liegt in ihrer Einfachheit: Um die Stichprobenverteilung einer Statistik zu schätzen, müssen Sie Ihre Daten wiederholt mit Ersetzung neu stichprobenartig erfassen und die Statistik für jede neue Stichprobe berechnen.
Dies funktioniert hervorragend, wenn die Datenpunkte unabhängig und identisch verteilt sind (Independent and Identically Distributed, IID). Jede Beobachtung in einer medizinischen Studie, einem landwirtschaftlichen Experiment oder einer Qualitätskontrollprobe aus der Produktion stellt normalerweise ein unabhängiges Ereignis dar. Blutproben von verschiedenen Patienten enthalten unabhängige Informationen. Die Ernteerträge der verschiedenen Parzellen spiegeln unabhängige Anbaubedingungen wider.
Die 2/3-Regel: Ein verstecktes Merkmal, kein Fehler
Der Standard-Bootstrap hat eine faszinierende mathematische Eigenschaft, die von den meisten Praktikern übersehen wird. Wenn Sie aus I Beobachtungen I-mal eine Ersatzstichprobe ziehen, werden Sie in jeder Bootstrap-Stichprobe etwa 2/3 Ihrer ursprünglichen Beobachtungen finden.
Wir wollen verstehen, warum das so ist.
Ein einfaches Gedankenexperiment
Stellen Sie sich vor, Sie haben 100 Bälle in einem Beutel, nummeriert von 1 bis 100. Folgendes passiert:
- Ziehe zufällig einen Ball.
- Schreibe seine Nummer auf.
- Legen ihn zurück (das ist der Schlüssel!).
- Wiederhole das 100 Mal.
Frage: Wie viele verschiedene Bälle wurden nach 100 Ziehungen mindestens einmal gezogen?
Deine Intuition sagt dir vielleicht: „Ich habe 100 gezogen, also vielleicht alle 100?“ Da du aber jede Kugel nach der Ziehung zurücklegen musst, wirst du einige Kugeln mehrfach ziehen, während du andere ganz verpasst.
Die Mathematik
Betrachte eine bestimmte Kugel – sagen wir die Kugel Nr. 42.
Jedes Mal, wenn wir ziehen:
- Wahrscheinlichkeit, die Kugel Nr. 42 zu bekommen = 1/100
- Wahrscheinlichkeit, den Ball #42 NICHT zu bekommen = 99/100
Nach 100 Ziehungen:
- Wahrscheinlichkeit, dass du NIE die Kugel Nr. 42 gewählt hast = (99/100)100 ≈ 0,366
Das bedeutet, dass mit einer Wahrscheinlichkeit von 63,4 % die Kugel Nr. 42 mindestens einmal gezogen wurde.
Dieses Muster gilt unabhängig von der Größe:
| Anzahl der Gegenstände | Ziehungen | Wahrscheinlichkeit Jedes Element zu ziehen |
|---|---|---|
| 10 | 10 | (9/10)10 ≈ 0.651 |
| 100 | 100 | (99/100)100 ≈ 0.634 |
| 1,000 | 1,000 | (999/1000)1000 ≈ 0.632 |
| 10,000 | 10,000 | (9999/10000)10000 ≈ 0.632 |
Der Bruch konvergiert zu 1 – e-1 ≈ 0.632 (wobei e ≈ 2,71828 die Eulersche Zahl ist).
Warum diese Zahl? Die natürliche Wachstumskonstante
Der genaue Wert ist 1 – e-1, wobei e erscheint, weil wir unsere Chancen in immer kleinere Stücke aufteilen, während I größer wird. Dies ist das gleiche mathematische Muster, das auch für den kontinuierlichen Zinseszins gilt.
Die einfache Regel zum Merken:
Wenn Sie eine Stichprobe MIT Zurücklegen machen und dabei so viele Ziehungen machen, wie Sie Kugeln haben, werden Sie feststellen, dass etwa 63% der Kugeln gezogen wurden (etwa 2/3).
Das bedeutet, dass beim Standard-Bootstrap in jeder Iteration etwa 37 % der Daten unbewertet bleiben. In der traditionellen Statistik ist das völlig in Ordnung – es hilft sogar bei der Varianzschätzung. Im Finanzwesen hat sie jedoch eine katastrophale Wechselwirkung mit der Gleichzeitigkeit von Kennzeichen.
Warum dies in der Finanzwelt katastrophale Folgen haben kann
Die 2/3-Regel geht davon aus, dass jede Beobachtung unabhängige Informationen enthält. In der ML-Finanzwelt mit Triple-Barrier-Kennzeichnung scheitert diese Annahme spektakulär an der zeitlichen Überlappung.
Erinnern Sie sich an die Analogie zur Blutprobe aus Teil 3: Stellen Sie sich vor, jemand in Ihrem Labor schüttet Blut aus jedem Röhrchen in die folgenden neun Röhrchen zu seiner Rechten. Röhrchen 10 enthält Blut für Patient 10, aber auch Blut von den Patienten 1 bis 9. Jetzt nehmen Sie aus diesen kontaminierten Röhrchen Ersatzproben.
Die sich verstärkende Katastrophe:
- Standard-Bootstrap-Stichproben nur ~63 % der Beobachtungen.
- Jede Beobachtung der Stichprobe überschneidet sich zeitlich mit anderen.
- Die tatsächliche unabhängige Information ist weit weniger als 63%.
- Modelle lernen die gleichen Muster mehrfach innerhalb einer einzigen Bootstrap-Stichprobe.
- Varianzschätzungen werden unzuverlässig, was den Zweck des Bootstraps zunichte macht.
Betrachten Sie ein konkretes Beispiel aus unseren Kennzeichen mit Triple-Barrier:
Szenario: 100 Handelsbeobachtungen über einen volatilen Zeitraum
- Jeder Handel erstreckt sich im Durchschnitt über 4 Stunden (Triple-Barrier Exit-Zeit)
- Alle 15 Minuten beginnt ein neuer Handel
- Dies führt zu etwa 16 gleichzeitigen Handelsgeschäften zu einem bestimmten Zeitpunkt.
Standard Bootstrap Ergebnis:
- Zeichnet ~63 Beobachtungen.
- Aufgrund von Überschneidungen, effektive eindeutige Informationen ≈ 63/16 ≈ 4 wirklich unabhängige Ereignisse.
- Wir trainieren mit dem Äquivalent von 4 unabhängigen Stichproben, nicht mit 63!
Aus diesem Grund weisen Modelle, die mit dem Standard-Bootstrap trainiert wurden, in Finanzkontexten auf:
- Künstlich niedriger Trainingsfehler (16 Mal das gleiche Muster gelernt).
- Hohe Varianz zwischen den Bootstrap-Iterationen (zufällig unterschiedliche „Kopien“ der gleichen Ereignisse).
- Schlechte Leistung außerhalb der Stichprobe (die tatsächliche Häufigkeit der Muster ist viel geringer).
Sequentieller Bootstrap: Die Lösung
Sequentielles Bootstrapping stellt den Stichprobenprozess grundlegend neu dar. Anstatt Beobachtungen mit gleicher Wahrscheinlichkeit zu ziehen, werden die Wahrscheinlichkeiten dynamisch angepasst, um Beobachtungen zu bevorzugen, die der aktuellen Stichprobe einzigartige Informationen hinzufügen.
Konzeptionelle Grundlage
Die wichtigste Erkenntnis: Der Wert jeder Beobachtung in einer Bootstrap-Stichprobe hängt davon ab, was bereits ausgewählt worden ist.
Wenn die Stichprobe bereits Beobachtungen enthält, die den Zeitraum von Montag 9:00 Uhr bis 11:00 Uhr abdecken, fügt eine weitere Beobachtung von Montag 10:00 Uhr bis 12:00 Uhr relativ wenig neue Informationen hinzu. Eine Beobachtung vom Dienstagnachmittag ist jedoch sehr wertvoll.
Sequentieller Bootstrap setzt diese Intuition durch drei Schritte um, die für jede Ziehung wiederholt werden:
- Bewertung des aktuellen Zustands: Feststellen, welche Zeiträume bereits in der Stichprobe vertreten sind.
- Berechnen der Einzigartigkeit: Berechnen für jede verbleibende Beobachtung, wie viele einzigartige Informationen sie hinzufügen würde.
- Anpassen der Wahrscheinlichkeiten: Ziehen der nächste Beobachtung mit einer Wahrscheinlichkeit, die proportional zu ihrer Einzigartigkeit ist.
Mathematische Formulierung
Lassen Sie uns diese Intuition formalisieren. Betrachten Sie eine Menge von Kennzeichen {y[i]} i=1,2,3, wobei:
- Kennzeichen y[1] ist eine Funktion der Rückgabe r[0,3]
- Kennzeichen y[2] ist eine Funktion der Rückgabe r[2,4]
- Kennzeichen y[3] ist eine Funktion der Rückgabe r[4,6]
Die Zeilen der Matrix entsprechen dem Index der Erträge, die zur Kennzeichnung unseres Datensatzes verwendet wurden, und die Spalten entsprechen den Stichproben. Die Überschneidungen der Ergebnisse sind in der nachstehenden Indikatormatrix dargestellt:
| Zeit | Beobachtung 1 | Beobachtung 2 | Beobachtung 3 |
|---|---|---|---|
| 1 | 1 | 0 | 0 |
| 2 | 1 | 0 | 0 |
| 3 | 1 | 1 | 0 |
| 4 | 0 | 1 | 0 |
| 5 | 0 | 0 | 1 |
| 6 | 0 | 0 | 1 |
Die durchschnittliche Einzigartigkeit ū [i] wird wie folgt berechnet:
ū[i] = (1/L[i]) × Σ(t ∈ T[i]) [1 / c[t]]
wobei:
- L[i] = Anzahl der Zeiträume, die Beobachtung i umfasst
- T[i] = Menge der Zeiträume, in denen die Beobachtung i aktiv ist
- c[t] = Anzahl der zum Zeitpunkt t aktiven Beobachtungen, einschließlich aller bereits in der Stichprobe enthaltenen Beobachtungen sowie der Kandidatenbeobachtung i
Die Ziehungswahrscheinlichkeit für die Beobachtung i ist dann:
P(select i) = ū[i] / Σ(j ∈ candidates) ū[j]
Dies gewährleistet:
- Beobachtungen, die sich nicht überschneiden, erhalten die höchste Wahrscheinlichkeit.
- Beobachtungen, die sich stark mit der Stichprobe überschneiden, erhalten die geringste Wahrscheinlichkeit.
- Wahrscheinlichkeiten summieren sich immer zu 1 (gültige Wahrscheinlichkeitsverteilung).
Ein praktisches Zahlenbeispiel
Gehen wir ein Beispiel anhand der obigen Indikatormatrix durch.
Schritt 1: Erste Ziehung
Zu Beginn sind keine Beobachtungen ausgewählt worden, sodass alle die gleiche Wahrscheinlichkeit haben:
P(Obs 1) = P(Obs 2) = P(Obs 3) = 1/3 ≈ 33.3%
Ergebnis: Beobachtung 2 wird zufällig ausgewählt.
Aktuelle Probe: φ¹ = {2}
Schritt 2: Zweite Auslosung – Einzigartigkeit berechnen
Nun müssen wir die Eindeutigkeit für jede Beobachtung berechnen, vorausgesetzt, dass Beobachtung 2 in unserer Stichprobe enthalten ist.
Beobachtung (Obs) 1:
- Zu bestimmten Zeiten aktiv: {1, 2, 3}
- Zeitpunkt 1: c[1] = 1 (nur Obs 1), Eindeutigkeit = 1/1 = 1,0
- Zeit 2: c[2] = 1 (nur Obs 1), Eindeutigkeit = 1/1 = 1,0
- Zeit 3: c[3] = 2 (Obs 1 + Obs 2), Eindeutigkeit = 1/2 = 0,5
- Durchschnittliche Einzigartigkeit: (1.0 + 1.0 + 0.5)/3 = 2.5/3 = 5/6 ≈ 0.833
Beobachtung 2:
- Zeitweise aktiv: {3, 4}
- Zeit 3: c[3] = 2 (Obs 2 + sich selbst), Eindeutigkeit = 1/2 = 0,5
- Zeit 4: c[4] = 1 (nur Obs 2), Einzigartigkeit = 1/1 = 1,0
- Durchschnittliche Einzigartigkeit: (0.5 + 1.0)/2 = 1.5/2 = 3/6 = 0.5
Beobachtung 3:
- Zeitweise aktiv: {5, 6}
- Zeit 5: c[5] = 1 (nur Obs 3), Einzigartigkeit = 1/1 = 1,0
- Zeit 6: c[6] = 1 (nur Obs 3), Einzigartigkeit = 1/1 = 1,0
- Durchschnittliche Einzigartigkeit: (1.0 + 1.0)/2 = 2.0/2 = 6/6 = 1.0
Berechnen Sie Wahrscheinlichkeiten:
- Summe der Einzigartigkeit: 5/6 + 3/6 + 6/6 = 14/6
- P(Obs 1) = (5/6) / (14/6) = 5/14 ≈ 35,7%
- P(Obs 2) = (3/6) / (14/6) = 3/14 ≈ 21,4% ← niedrigste (bereits ausgewählt)
- P(Obs 3) = (6/6) / (14/6) = 6/14 ≈ 42,9% ← höchste (keine Überschneidung)
Ergebnis: Beobachtung 3 wird ausgewählt
Aktuelle Stichprobe: φ² = {2, 3}
Schritt 3: Dritte Auslosung – Einzigartigkeit berechnen
Die Überschneidungsstruktur hat sich nicht geändert (Obs 2 und 3 überschneiden sich immer noch nicht), sodass die Wahrscheinlichkeiten identisch bleiben:
- P(Obs 1) = 5/14 ≈ 35,7%
- P(Obs 2) = 3/14 ≈ 21,4%
- P(Obs 3) = 6/14 ≈ 42,9%
Vollständige Wahrscheinlichkeitsübersicht:
| Ziehung | Beobachtung 1 | Beobachtung 2 | Beobachtung 3 | Ausgewählte |
|---|---|---|---|---|
| 1 | 1/3 (33.3%) | 1/3 (33.3%) | 1/3 (33.3%) | 2 |
| 2 | 5/14 (35.7%) | 3/14 (21.4%) | 6/14 (42.9%) | 3 |
| 3 | 5/14 (35.7%) | 3/14 (21.4%) | 6/14 (42.9%) | ? |
Wichtige Beobachtungen aus diesem Beispiel
- Die geringste Wahrscheinlichkeit haben die zuvor ausgewählten Beobachtungen (Beobachtung 2 fällt von 33,3% auf 21,4%)
- Die höchste Wahrscheinlichkeit gilt für Beobachtungen ohne eine Überschneidungen (Beobachtung 3 springt auf 42,9 %)
- Teilweise Überschneidungen erhalten eine mittlere Gewichtung (Beobachtung 1 steigt leicht auf 35,7% an)
- Die Methode vermeidet erfolgreich Redundanz und ermöglicht gleichzeitig eine Vielfalt von Proben.
Umsetzung
Kern-Algorithmus
Die Implementierung erfordert zwei Schlüsselfunktionen: eine zur Berechnung der Indikatormatrix und eine zur Durchführung der sequentiellen Stichproben.
Funktion 1: Konstruktion der Indikatormatrix
def get_ind_matrix(bar_index, t1): """ Build an indicator matrix showing which observations are active at each time. :param bar_index: (pd.Index) Complete time index (all bars) :param t1: (pd.Series) End time for each observation (index = start time, value = end time) :return: (pd.DataFrame) Indicator matrix where ind_matrix[t, i] = 1 if obs i is active at time t """ ind_matrix = pd.DataFrame(0, index=bar_index, columns=range(t1.shape[0])) for i, (t_in, t_out) in enumerate(t1.items()): # Mark all times from t_in to t_out as active for observation i ind_matrix.loc[t_in:t_out, i] = 1.0 return ind_matrix
Funktion 2: Berechnung der durchschnittlichen Eindeutigkeit
def get_avg_uniqueness(ind_matrix): """ Calculate average uniqueness for each observation. Average uniqueness of observation i = mean of (1/c[t]) across all times t where i is active, where c[t] is the number of observations active at time t. :param ind_matrix: (pd.DataFrame) Indicator matrix from get_ind_matrix :return: (pd.Series) Average uniqueness for each observation """ # Count how many observations are active at each time (row sums) concurrency = ind_matrix.sum(axis=1) # Calculate uniqueness: 1/concurrency for each observation at each time # Replace concurrency with NaN where ind_matrix is 0 (observation not active) uniqueness = ind_matrix.div(concurrency, axis=0) # Average uniqueness across all times where observation is active avg_uniqueness = uniqueness[uniqueness > 0].mean(axis=0) return avg_uniqueness
Funktion 3: Sequentieller Bootstrap-Sampler
def seq_bootstrap(ind_matrix, sample_length=None): """ Generate a bootstrap sample using sequential bootstrap method. :param ind_matrix: (pd.DataFrame) Indicator matrix from get_ind_matrix :param sample_length: (int) Length of bootstrap sample. If None, uses len(ind_matrix.columns) :return: (list) Indices of selected observations """ if sample_length is None: sample_length = ind_matrix.shape[1] phi = [] # Bootstrap sample (list of selected observation indices) while len(phi) < sample_length: # Calculate average uniqueness for each observation avg_u = pd.Series(dtype=float) for i in ind_matrix.columns: # Create temporary indicator matrix with current sample + candidate i ind_matrix_temp = ind_matrix[phi + [i]] avg_u.loc[i] = get_avg_uniqueness(ind_matrix_temp).iloc[-1] # Convert uniqueness to probabilities prob = avg_u / avg_u.sum() # Draw next observation according to probabilities selected = np.random.choice(ind_matrix.columns, p=prob) phi.append(selected) return phi
Überlegungen zur rechnerischen Effizienz
Die obige Implementierung ist konzeptionell klar, aber bei großen Datensätzen rechenintensiv. Bei jeder Iteration wird die Eindeutigkeit für alle verbleibenden Beobachtungen neu berechnet, was zu O(n³) Komplexität führt.
Für Produktionssysteme sind mehrere Optimierungen von entscheidender Bedeutung:
- Inkrementelle Updates: Anstatt die gesamte Indikatormatrix neu zu berechnen, sollten wir eine laufende Gleichzeitigkeitszählung beibehalten, die aktualisiert wird, wenn Beobachtungen zur Stichprobe hinzugefügt werden.
- Vorberechnung: Wir berechnen die paarweisen Überschneidungen einmal zu Beginn und verwenden das dann einfache Lookups während der Probenahme.
- Parallelisierung: Mehrere Bootstrap-Stichproben können unabhängig voneinander parallel generiert werden.
- Operationen mit dünnen Matrizen: Finanzielle Zeitreihen haben oft eine begrenzte Gleichzeitigkeit; nutzen Sie spärliche Matrizen, um diese Struktur auszunutzen.
Erwartete Ergebnisse
In Abbildung 1 ist das Histogramm der Einzigartigkeit von Standard-Bootstrap-Stichproben (links) und von sequentiell gebootstrappten Stichproben (rechts) dargestellt. Der Median der durchschnittlichen Eindeutigkeit für die Standardmethode liegt bei 0,6, und der Median der durchschnittlichen Eindeutigkeit für die sequenzielle Methode liegt bei 0,7. Ein ANOVA-Test auf die Differenz der Mittelwerte ergibt eine verschwindend geringe Wahrscheinlichkeit. Statistisch gesehen haben die Stichproben der sequentiellen Bootstrap-Methode eine erwartete Eindeutigkeit, die die der Standard-Bootstrap-Methode bei jedem vernünftigen Konfidenzniveau übertrifft. Siehe die beigefügte Datei bootstrap_mc.py, wenn Sie selbst eine Monte-Carlo-Simulation durchführen möchten.

Abbildung 1: Monte-Carlo-Experiment zwischen Standard- und sequentiellen Bootstraps
Optimierte Implementierung
In diesem Abschnitt wird erläutert, warum die optimierte Implementierung (abgeflachte Indizes + Numba-beschleunigtes Sampling) dem Standard-Indikatormatrix-Ansatz überlegen ist und warum die einzelnen Funktionen der Implementierung gewählt wurden. Empirische Gedächtnis- und Komplexitätsvergleiche, auf die in dem Artikel verwiesen wird, unterstützen diese Behauptungen.
Schritt 1: get_active_indices – Parse-Zuordnung vs. Dichtematrix des Indikators
Zweck: Konvertierung der Start-/Endzeiten von Ereignissen in Listen der Balkenindizes, die die Stichprobe abdeckt.
Der Grund für die Überlegenheit: Es werden nur Nicht-Null-Einträge gespeichert (die genauen Balkenindizes, die jede Probe berührt), anstatt eine n × T Dichtematrix zuzuweisen. Dadurch wird der Speicherbedarf von O(n-T) auf O(Summe der Ereignislängen) reduziert, was bei realistischen Sparsamkeitsmustern oft nahezu linear zu n ist.
Praktischer Vorteil: drastisch reduzierter Speicherbedarf und besseres Cache-Verhalten beim Scannen der Probenabdeckung; diese Vorteile führen zu den in der optimierten Analyse angegebenen Kompressionsraten und Speicherreduzierungen.
def get_active_indices(samples_info_sets, price_bars_index): """ Build an indicator mapping from each sample to the bar indices it influences. Args: samples_info_sets (pd.Series): Triple-barrier events (t1) returned by labeling.get_events. Index: start times (t0) as pd.DatetimeIndex. Values: end times (t1) as pd.Timestamp (or NaT for open events). price_bars_index (pd.DatetimeIndex or array-like): Sorted bar timestamps (pd.DatetimeIndex or array-like). Will be converted to np.int64 timestamps for internal processing. Returns: dict: Standard Python dictionary mapping sample_id (int) to a numpy.ndarray of bar indices (dtype=int64). Example: {0: array([0,1,2], dtype=int64), 1: array([], dtype=int64), ...} """ t0 = samples_info_sets.index t1 = samples_info_sets.values n = len(samples_info_sets) active_indices = {} # precompute searchsorted positions to restrict scanning range starts = np.searchsorted(price_bars_index, t0, side="left") ends = np.searchsorted(price_bars_index, t1, side="right") # exclusive for sample_id in range(n): s = starts[sample_id] e = ends[sample_id] if e > s: active_indices[sample_id] = np.arange(s, e, dtype=int) else: active_indices[sample_id] = np.empty(0, dtype=int) return active_indices
Schritt 2: pack_active_indices – zusammenhängendes, abgeflachtes Layout für den Durchsatz
Zweck: Konvertierung von Diktat/Liste von Arrays in flat_indices, Offsets, Längen und sample_ids.
Der Grund für die Überlegenheit: Zusammenhängende Arrays beseitigen den Python-Objekt-Overhead und ermöglichen enge, lineare Speicherabfragen. Offsets bieten O(1)- Zugriff auf den Slice jeder Probe ohne Indexierung der Liste pro Iteration. Dieses „ragged-to-flat“-Muster ist für effiziente Numba/JIT-Schleifen erforderlich.
Praktischer Vorteil: Numba kann sequentiell über den Speicher iterieren, was das CPU-Prefetching verbessert und den Interpreter-Overhead im Vergleich zu wiederholten Operationen auf Python-Ebene oder Array-Objekten pro Sample reduziert.
def pack_active_indices(active_indices): """ Convert dict/list-of-arrays active_indices into flattened arrays and offsets. Args: active_indices (dict or list): mapping sample_id -> 1D ndarray of bar indices Returns: flat_indices (ndarray int64): concatenated bar indices for all samples offsets (ndarray int64): start index in flat_indices for each sample (len = n+1) lengths (ndarray int64): number of indices per sample (len = n) sample_ids (list): list of sample ids in the order used to pack data """ # Preserve sample id ordering to allow mapping between chosen index and original id if isinstance(active_indices, dict): sample_ids = list(active_indices.keys()) values = [active_indices[sid] for sid in sample_ids] else: # assume list-like ordered by sample id 0..n-1 sample_ids = list(range(len(active_indices))) values = list(active_indices) lengths = np.array([v.size for v in values], dtype=np.int64) offsets = np.empty(len(values) + 1, dtype=np.int64) offsets[0] = 0 offsets[1:] = np.cumsum(lengths) total = int(offsets[-1]) if total == 0: flat_indices = np.empty(0, dtype=np.int64) else: flat_indices = np.empty(total, dtype=np.int64) pos = 0 for v in values: l = v.size if l: flat_indices[pos : pos + l] = v pos += l return flat_indices, offsets, lengths, sample_ids
Schritt 3: _compute_scores_flat / _normalize_to_prob – lokale, inkrementelle Wertung
Zweck: Berechnung der Punktzahlen pro Stichprobe mit der Formel core = (1 / (1 + concurrency)).mean(), dann Normierung auf einen Wahrscheinlichkeitsvektor.
Der Grund für die Überlegenheit: Bei der Bewertung werden nur die Balken verwendet, die eine Stichprobe tatsächlich abdeckt (über ihren Slice in flat_indices) und die aktuelle Gleichzeitigkeit zählt. Die Kosten pro Probe sind proportional zur Ereignislänge k und nicht zum gesamten Zeithorizont T, sodass pro vollständigem Durchlauf O(n-k) Arbeit anfällt statt O(n-T).
Praktischer Vorteil: Die numerische Stabilität durch einen eps-Floor und eine deterministische Normalisierung verhindert Nullsummenfälle und hält die Stichprobenverteilung bei zunehmender Gleichzeitigkeit stabil.
@njit def _compute_scores_flat(flat_indices, offsets, lengths, concurrency): """ Compute average uniqueness for each sample using flattened indices. This follows de Prado's approach: for each bar in a sample, compute uniqueness as 1/(c+1), then average across all bars in that sample. Args: flat_indices (ndarray int64): concatenated indices offsets (ndarray int64): start positions (len = n+1) lengths (ndarray int64): counts per sample concurrency (ndarray int64): current concurrency counts per bar Returns: scores (ndarray float64): average uniqueness per sample """ n = offsets.shape[0] - 1 scores = np.empty(n, dtype=np.float64) for i in range(n): s = offsets[i] e = offsets[i + 1] length = lengths[i] if length == 0: # If a sample covers no bars, assign zero average uniqueness scores[i] = 0.0 else: # Compute uniqueness = 1/(c+1) for each bar, then average sum_uniqueness = 0.0 for k in range(s, e): bar = flat_indices[k] c = concurrency[bar] uniqueness = 1.0 / (c + 1.0) sum_uniqueness += uniqueness avg_uniqueness = sum_uniqueness / length scores[i] = avg_uniqueness return scores
@njit def _normalize_to_prob(scores): """ Normalize non-negative scores to a probability vector. If all zero, return uniform. """ n = scores.shape[0] total = 0.0 for i in range(n): total += scores[i] prob = np.empty(n, dtype=np.float64) if total == 0.0: # fallback to uniform distribution uni = 1.0 / n for i in range(n): prob[i] = uni else: for i in range(n): prob[i] = scores[i] / total return prob
Schritt 4: _increment_concurrency_flat – In-Place-Aktualisierungen der Gleichzeitigkeit
Zweck: Erhöhung von concurrency[bar] für jeden Balken, der von der gewählten Stichprobe abgedeckt wird.
Der Vorteil: Es werden nur die betroffenen Balken aktualisiert, statt große Arrays erneut zu scannen oder zu aggregieren. Ein Ansatz mit einer Dichtematrix könnte die Neuberechnung der Summen vieler Zeilen oder die Erstellung temporärer Strukturen erzwingen; hier sind die Aktualisierungen lokalisiert und O(k).
Praktischer Vorteil: Durch billige inkrementelle Aktualisierungen kann die Probe nach jeder Ziehung online angepasst werden, ohne dass teure globale Neuberechnungen erforderlich sind, was viele Bootstrap-Ziehungen effizient macht.
@njit def _increment_concurrency_flat(flat_indices, offsets, chosen, concurrency): """ Increment concurrency for the bars covered by sample `chosen`. """ s = offsets[chosen] e = offsets[chosen + 1] for k in range(s, e): bar = flat_indices[k] concurrency[bar] += 1
Schritt 5: _seq_bootstrap_loop und seq_bootstrap_optimized – volle Numba-Beschleunigung mit reproduzierbarem RNG
Zweck: Durchführung der vollständigen sequentiellen Bootstrap-Schleife innerhalb von Numba unter Verwendung von vorgezeichneten Uniformen aus Python für einen reproduzierbaren Zufallszahlengenerator (RNG) und das abgeflachte Layout für Speichereffizienz.
Warum es besser ist:
- Eliminiert den Python-pro-Iteration-Overhead: Bewertung, CDF-Auswahl und Gleichzeitigkeitsaktualisierungen werden innerhalb einer Jitted-Funktion ausgeführt, wodurch kostspielige Interpreter-Kontextwechsel vermieden werden, die beim Skalieren dominieren.
- Bewahrt die Reproduzierbarkeit: Der RNG wird in Python verwaltet (NumPy RandomState) und die Uniformen werden an die Jitted-Schleife weitergegeben, wodurch reproduzierbares Seeding mit schnellen inneren Schleifen kombiniert wird.
- Ermöglicht O(n-k)-Zeit mit kleinen konstanten Faktoren, verglichen mit den wiederholten Indikatormatrix-Neuberechnungen des Standardalgorithmus und potenziellen O(n²) oder schlechteren Scanmustern.
Praktischer Vorteil: Das abgeflachte Layout in Verbindung mit vollständigem JIT führt zu erheblichen Geschwindigkeits- und Speicherverbesserungen und ermöglicht sequenzielles Bootstrap für Zehn- oder Hunderttausende von Proben in Produktionsworkflows.
@njit def _choose_index_from_cdf(prob, u): """ Convert a uniform random number u in [0,1) to an index using the cumulative distribution. This avoids calling numpy.choice inside numba and is efficient. """ n = prob.shape[0] cum = 0.0 for i in range(n): cum += prob[i] if u < cum: return i # numerical fallback: return last index return n - 1
@njit def _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms): """ Njitted sequential bootstrap loop. Args: flat_indices, offsets, lengths: flattened index layout concurrency (ndarray int64): initial concurrency vector (will be mutated) uniforms (ndarray float64): pre-drawn uniform random numbers in [0,1), length = sample_length Returns: chosen_indices (ndarray int64): sequence of chosen sample indices (positions in packed order) """ sample_length = uniforms.shape[0] chosen_indices = np.empty(sample_length, dtype=np.int64) for it in range(sample_length): # compute scores and probabilities given current concurrency scores = _compute_scores_flat(flat_indices, offsets, lengths, concurrency) prob = _normalize_to_prob(scores) # map uniform to a sample index u = uniforms[it] idx = _choose_index_from_cdf(prob, u) chosen_indices[it] = idx # update concurrency for selected sample _increment_concurrency_flat(flat_indices, offsets, idx, concurrency) return chosen_indices
def seq_bootstrap_optimized(active_indices, sample_length=None, random_seed=None): """ End-to-end sequential bootstrap using flattened arrays + Numba. Implements the sequential bootstrap as described in de Prado's "Advances in Financial Machine Learning" Chapter 4: average uniqueness per sample where uniqueness per bar is 1/(concurrency+1). Args: active_indices (dict or list): mapping sample id -> ndarray of bar indices sample_length (int or None): requested number of draws; defaults to number of samples random_seed (int, RandomState, or None): seed controlling the pre-drawn uniforms Returns: phi (list): list of chosen original sample ids (length = sample_length) """ # Pack into contiguous arrays and keep mapping from packed index -> original sample id flat_indices, offsets, lengths, sample_ids = pack_active_indices(active_indices) n_samples = offsets.shape[0] - 1 if sample_length is None: sample_length = n_samples # Concurrency vector length: bars are indices into price-bar positions. # When there are no bars (flat_indices empty), create an empty concurrency of length 0. if flat_indices.size == 0: T = 0 else: # max bar index + 1 (bars are zero-based indices) T = int(flat_indices.max()) + 1 concurrency = np.zeros(T, dtype=np.int64) # Prepare reproducible uniforms. Accept either integer seed or RandomState. if random_seed is None: rng = np.random.RandomState() elif isinstance(random_seed, np.random.RandomState): rng = random_seed else: try: rng = np.random.RandomState(int(random_seed)) except (ValueError, TypeError): rng = np.random.RandomState() # Pre-draw uniforms in Python and pass them into njit function (numba cannot accept RandomState) uniforms = rng.random_sample(sample_length).astype(np.float64) # Run njit loop (this mutates concurrency but we don't need concurrency afterwards) chosen_packed = _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms) # Map packed indices back to original sample ids phi = [sample_ids[int(i)] for i in chosen_packed.tolist()] return phi
Komplexität und Auswirkungen auf die Bereitstellung
- Speicher: Der optimierte Ansatz reduziert das Wachstum von quadratisch auf nahezu linear in n, wodurch zuvor nicht durchführbare Problemgrößen in handhabbare Größen verwandelt werden.
- Zeit: Die Kosten pro Ziehung sind proportional zur durchschnittlichen Ereignislänge k, nicht zum gesamten Zeitraster T. Die Komplexität pro Zeichnung ist O(n-k), was eine Gesamtkomplexität von O(n²-k) für die Erstellung von n Stichproben ergibt. Dies ist immer noch weit besser als O(n³) oder schlechter, das mit naiven Indikatormatrixansätzen erreicht wird, wenn k << n.
- Technik: Das Muster „flattened + njit“ unterstützt weitere Optimierungen (Verwendung von int32 für Indizes, wenn es sicher ist, Parallelisierung des Samplings, Vorberechnung von wiederholten Score-Termen) und lässt sich sauber in eine Pipeline integrieren, die von labeling.get_events in den optimierten Sampler fließt.
Analyse der Speichereffizienz: Optimierte vs. Standard-Indikatormatrix-Implementierung
Vergleich des Speicherplatzbedarfs
Die Daten zum Speicherverbrauch zeigen dramatische Unterschiede zwischen den Standard- und den optimierten Implementierungen:
| Stichprobengröße | Standard | Optimiert | Speicherreduzierung | Verdichtungsverhältnis |
|---|---|---|---|---|
| 500 | 7.19 MB | 0.02 MB | 99.7% | 359:1 |
| 1,000 | 23.75 MB | 0.04 MB | 99.8% | 594:1 |
| 2,000 | 93.65 MB | 0.07 MB | 99.9% | 1,338:1 |
| 4,000 | 412.23 MB | 0.14 MB | 99.97% | 2,944:1 |
| 8,000 | 1,237.82 MB | 0.28 MB | 99.98% | 4,421:1 |
Mathematische Analyse von Wachstumsmustern
Standardimplementierung (quadratisches Wachstum)
Die Standardimplementierung weist eine Speicherkomplexität von O(n²) auf:
Memory(n) ≈ 0.0000188 × n² (MB) - Bei n=8.000: Vorausgesagt = 1.203 MB vs. Tatsächlich = 1.238 MB (97% Genauigkeit)
- Verdoppelung der Proben: 4-fache Erhöhung des Arbeitsspeichers (entspricht einem quadratischen Wachstum)
Optimierte Implementierung (lineares Wachstum)
Die optimierte Implementierung weist eine Speicherkomplexität von O(n) auf:
Memory(n) ≈ 0.000035 × n (MB) - Bei n=8.000: Vorausgesagt = 0,28 MB gegenüber tatsächlich = 0,28 MB (genaue Übereinstimmung)
- Verdoppelung der Proben: 2-fache Erhöhung des Arbeitsspeichers (im Einklang mit linearem Wachstum)
Praktische Auswirkungen für Financial ML
Grenzen der Skalierbarkeit
Standardimplementierung
- n=50.000: ~47 GB (unpraktisch)
- n=100.000: ~188 GB (unmöglich)
- Maximal machbar: ~15.000 Proben
Optimierte Implementierung
- n=50.000: ~1,75 MB (trivial)
- n=100.000: ~3,5 MB (leicht zu handhaben)
- Maximal machbar: Millionen von Proben
Einsatzszenarien in der realen Welt
# Typical financial dataset scenarios scenarios = { "Intraday Trading": { "samples": 50_000, # 2 years of 5-minute bars "standard_memory": "47 GB", "optimized_memory": "1.75 MB", "feasible": "Only with optimized" }, "Multi-Asset Portfolio": { "samples": 200_000, # 100 instruments × 2,000 bars "standard_memory": "752 GB", "optimized_memory": "7 MB", "feasible": "Only with optimized" }, "Research Backtesting": { "samples": 1_000_000, # Comprehensive market analysis "standard_memory": "18.8 TB", "optimized_memory": "35 MB", "feasible": "Only with optimized" } }
Einblicke in die technische Architektur
Warum der dramatische Unterschied?
Standardimplementierung (get_ind_matrix):
# Creates dense n × n matrix (O(n²) memory) ind_matrix = np.zeros((len(bar_index), len(label_endtime)), dtype=np.int8) for sample_num, label_array in enumerate(tokenized_endtimes): ind_mat[label_index:label_endtime+1, sample_num] = 1 # Fills entire ranges
Optimierte Implementierung (precompute_active_indices):
# Stores only active indices (O(k×n) memory, where k << n) active_indices = {} for sample_id in range(n_samples): mask = (price_bars_array >= t0) & (price_bars_array <= t1) indices = np.where(mask)[0] # Stores only non-zero indices active_indices[sample_id] = indices
Steigerung der Speichereffizienz
Das Komprimierungsverhältnis verbessert sich nämlich mit dem Stichprobenumfang:
- Die Sparsamkeit nimmt zu – jede Probe betrifft nur einen kleinen Teil der gesamten Balken.
- Fester Overhead – Wörterbuchstruktur hat minimale Basisspeicherkosten.
- Effiziente Speicherung – Integer-Arrays statt vollständiger Matrizen.
Auswirkungen auf die Leistung beim sequenziellen Bootstrapping.
Vergleich der algorithmischen Komplexität
# Standard implementation: O(n³) time, O(n²) memory def seq_bootstrap_standard(ind_mat): # Each iteration: O(n²) operations × n iterations for i in range(n_samples): avg_unique = _bootstrap_loop_run(ind_mat, prev_concurrency) # O(n²) # Optimized implementation: O(n×k) time, O(n) memory def seq_bootstrap_optimized(active_indices): # Each iteration: O(k) operations × n iterations (where k = avg event length) for i in range(n_samples): prob = _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms) # O(k)
Anhand der Speichermuster können wir die zeitliche Leistung hochrechnen:
| Betrieb | n=1,000 | n=8,000 | Skalierungsfaktor |
|---|---|---|---|
| Speicherzuweisung | 23,75 MB → 0,04 MB | 1.238 MB → 0,28 MB | 4,421x besser |
| Matrix-Operationen | O(1M) Elemente | O(64M) Elemente | 64x langsamer (Standard) |
| Cache-Effizienz | Schlecht (große Matrizen) | Ausgezeichnet (kleine Arrays) | Erheblicher Vorteil |
Aufbau eines Ensembles: SequentiallyBootstrappedBaggingClassifier
Nachdem wir nun einen optimierten sequentiellen Bootstrap-Sampler haben, können wir ihn in ein komplettes Ensemble für maschinelles Lernen integrieren. Der SequentiallyBootstrappedBaggingClassifier kombiniert die zeitliche Bewusstheit des sequentiellen Bootstrappings mit der Varianzreduktion von Ensemble-Methoden.
Warum Bagging funktioniert – und warum es sequenziellen Bootstrap braucht
Bootstrap Aggregieren (Bagging) ist eine der effektivsten Ensemble-Methoden im maschinellen Lernen. Die Kernidee ist elegant:
- Erzeuge mehrere Bootstrap-Stichproben aus Ihren Trainingsdaten.
- Trainiere ein separates Modell für jede Probe.
- Aggregiere Vorhersagen durch Abstimmung (Klassifizierung) oder Mittelwertbildung (Regression).
Dies funktioniert hervorragend, wenn die Proben unabhängig sind. Jede Bootstrap-Stichprobe bietet eine etwas andere „Sicht“ auf die Daten, und die Aggregation dieser Ansichten verringert die Varianz, ohne die Verzerrung zu erhöhen.
Aber in der ML-Finanzwelt mit sich überschneidenden Kennzeichen versagt das Standard-Bagging auf katastrophale Weise.
Jede Bootstrap-Stichprobe enthält aufgrund der Gleichzeitigkeit der Bezeichnungen versehentlich viele Kopien derselben zeitlichen Muster. Das Ensemble lernt dieselben Muster mehrfach, was zu einem Ergebnis führt:
- Übermütige Vorhersagen – Die Modelle haben dasselbe Muster mehr als 10 Mal gesehen und glauben, dass es sehr zuverlässig ist
- Unterschätzte Varianz – Verschiedene Bootstrap-Stichproben sind nicht wirklich unabhängig
- Schlechte Verallgemeinerung – Die tatsächliche Häufigkeit von Mustern ist viel geringer, als die Trainingsdaten vermuten lassen
Sequentielles Bootstrapping löst dieses Problem, indem es sicherstellt, dass jede Bootstrap-Stichprobe die zeitliche Unabhängigkeit maximiert, sodass das Ensemble wirklich vielfältige Trainingssätze erhält.
Überblick über die Architektur
Der SequentiallyBootstrappedBaggingClassifier erweitert den BaggingClassifier von scikit-learn mit drei wesentlichen Änderungen:
- Sequentielles Sampling – Verwendung von seq_bootstrap_optimized anstelle von uniform random sampling
- Zeitliche Verfolgung von Metadaten – Verwaltet samples_info_sets (Start-/Endzeitpunkte der Kennzeichen) und price_bars_index
- Vorberechnung aktiver Indizes – einmalige Erstellung einer spärlichen Indexzuordnung, Wiederverwendung für alle Schätzer
Durchgang zur Implementierung
Schritt 1: Initialisierung und Metadaten
Der Klassifikator benötigt zeitliche Metadaten, die beim Standard-Bagging nicht erforderlich sind:def __init__( self, samples_info_sets, # NEW: label temporal spans price_bars_index, # NEW: price bar timestamps estimator=None, n_estimators=10, max_samples=1.0, max_features=1.0, bootstrap_features=False, oob_score=False, warm_start=False, n_jobs=None, random_state=None, verbose=0, ):
Erläuterung der wichtigsten Parameter:
- samples_info_sets (pd.Series): Index enthält die Startzeiten der Etiketten (t0), Werte enthalten die Endzeiten der Kennzeichen (t1). Damit wird die zeitliche Spanne der Kennzeichnung der einzelnen Beobachtungen erfasst.
- preis_balken_index (pd.DatetimeIndex) : Zeitstempel aller Kursbalken, die zur Erstellung der Kennzeichen verwendet werden. Erforderlich für die Zuordnung von Zeitspannen zu Balkenindizes.
- Schätzer : Basisklassifikator (Standardwert ist DecisionTreeClassifier). Jedes Ensemblemitglied ist ein Klon dieses Schätzers.
- n_estimators: Anzahl der Modelle im Ensemble. Mehr Schätzer = glattere Vorhersagen, aber längeres Training.
- max_samples : Bootstrap-Stichprobengröße. Wenn float in (0,1] liegt, ist es ein Bruchteil der Trainingsgröße; wenn int, ist es die genaue Anzahl.
- bootstrap_features: ob auch Teilstichproben von Merkmalen genommen werden sollen (erhöht die Vielfalt, kann aber einzelne Modelle schwächen).
Schritt 2: Berechnung der aktiven Indizes
Vor dem Sampling wird die spärliche Indexabbildung einmal vorberechnet:
def _fit(self, X, y, max_samples=None, sample_weight=None): # ... validation and setup ... # Compute active indices mapping (once, cached for all estimators) if self.active_indices_ is None: self.active_indices_ = get_active_indices( self.samples_info_sets, self.price_bars_index )
Warum vorberechnen? Die Berechnung aktiver Indizes ist O(n) und deterministisch – sie hängt nur von den Zeitstempeln der Kennzeichen ab, nicht von Zufälligkeiten. Die einmalige Berechnung und Wiederverwendung spart Zeit beim Training vieler Schätzer.
Speichereffizienz: Wie im Abschnitt über die Speicheranalyse gezeigt, benötigt active_indices_ O(n-k) Speicher, wobei k die durchschnittliche Kennzeichenlänge ist, verglichen mit O(n²) für eine dichte Indikatormatrix. Bei 8.000 Samples bedeutet dies 0,28 MB gegenüber 1.238 MB – ein Kompressionsverhältnis von 4.421:1.
Schritt 3: Nutzerdefinierte Bootstrap-Beispielgenerierung
Die wichtigste Neuerung ist die Ersetzung der einheitlichen Zufallsstichprobe durch ein sequentielles Bootstrap:
def _generate_bagging_indices( random_state, bootstrap_features, n_features, max_features, max_samples, active_indices ): """Randomly draw feature and sample indices.""" # Get valid random state - this returns a RandomState object random_state_obj = check_random_state(random_state) # Draw samples using sequential bootstrap if isinstance(max_samples, numbers.Integral): sample_indices = seq_bootstrap( active_indices, sample_length=max_samples, random_seed=random_state_obj ) elif isinstance(max_samples, numbers.Real): n_samples = int(round(max_samples * len(active_indices))) sample_indices = seq_bootstrap( active_indices, sample_length=n_samples, random_seed=random_state_obj ) else: sample_indices = seq_bootstrap( active_indices, sample_length=None, random_seed=random_state_obj ) # Draw feature indices only if bootstrap_features is True if bootstrap_features: if isinstance(max_features, numbers.Integral): n_feat = max_features elif isinstance(max_features, numbers.Real): n_feat = int(round(max_features * n_features)) else: raise ValueError("max_features must be int or float when bootstrap_features=True") feature_indices = _generate_random_features( random_state_obj, bootstrap_features, n_features, n_feat ) else: # When not bootstrapping features, return None (will be handled downstream) feature_indices = None return sample_indices, feature_indices
Kritischer Einblick: Wir verwenden sequentiellen Bootstrap für Stichproben (zeitliche Dimension), aber Standard-Zufallsstichproben für Merkmale (wenn bootstrap_features=True). Das ist richtig, denn:
- Zeitliche Überschneidungen treten bei Beobachtungen (Zeilen), nicht bei Merkmalen (Spalten) auf.
- Merkmalskorrelation ist orthogonal zur zeitlichen Gleichzeitigkeit
- Standard Feature Bagging erhöht die Vielfalt ohne zeitliche Probleme
Schritt 4: Schulung zum Parallelschätzer
Das Training mehrerer Schätzer ist peinlich parallel – jeder kann unabhängig trainiert werden:
def _parallel_build_estimators( n_estimators, ensemble, X, y, active_indices, sample_weight, seeds, total_n_estimators, verbose ): """Private function used to build a batch of estimators within a job.""" # Retrieve settings n_samples, n_features = X.shape max_samples = ensemble._max_samples max_features = ensemble.max_features bootstrap_features = ensemble.bootstrap_features support_sample_weight = has_fit_parameter(ensemble.estimator_, "sample_weight") # Build estimators estimators = [] estimators_samples = [] estimators_features = [] for i in range(n_estimators): if verbose > 1: print( "Building estimator %d of %d for this parallel run (total %d)..." % (i + 1, n_estimators, total_n_estimators) ) random_state = seeds[i] estimator = ensemble._make_estimator(append=False, random_state=random_state) # Draw samples and features sample_indices, feature_indices = _generate_bagging_indices( random_state, bootstrap_features, n_features, max_features, max_samples, active_indices ) # Draw samples, using sample weights if supported if support_sample_weight and sample_weight is not None: curr_sample_weight = sample_weight[sample_indices] else: curr_sample_weight = None # Store None for features if no bootstrapping (memory optimization) if bootstrap_features: estimators_features.append(feature_indices) else: estimators_features.append(None) # Don't store redundant feature arrays estimators_samples.append(sample_indices) # Select data if bootstrap_features: X_ = X[sample_indices][:, feature_indices] else: X_ = X[sample_indices] # Use all features y_ = y[sample_indices] estimator.fit(X_, y_, sample_weight=curr_sample_weight) estimators.append(estimator) return estimators, estimators_features, estimators_samples
Parallele Effizienz: Wenn n_jobs=-1 ist, verwendet die Implementierung alle CPU-Kerne. Die Ausbildung von 100 Schätzern auf 8 Kernen bedeutet ~12 Schätzer pro Kern, die gleichzeitig verarbeitet werden. Dies ermöglicht eine nahezu lineare Beschleunigung für große Ensembles.
Schritt 5: Out-of-Bag (OOB)-Wertung
Eines der wertvollsten Merkmale des Bagging ist die integrierte Validierung durch OOB-Stichproben:
def _set_oob_score(self, X, y): """Compute out-of-bag score""" # Safeguard: Ensure n_classes_ is set if not hasattr(self, "n_classes_"): self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) n_samples = y.shape[0] n_classes = self.n_classes_ predictions = np.zeros((n_samples, n_classes)) for estimator, samples, features in zip( self.estimators_, self._estimators_samples, self.estimators_features_ ): # Create mask for OOB samples mask = ~indices_to_mask(samples, n_samples) if np.any(mask): # Get predictions for OOB samples X_oob = X[mask] # If features is None, use all features; otherwise subset if features is not None: X_oob = X_oob[:, features] predictions[mask] += estimator.predict_proba(X_oob) # Average predictions denominator = np.sum(predictions != 0, axis=1) denominator[denominator == 0] = 1 # avoid division by zero predictions /= denominator[:, np.newaxis] # Compute OOB score oob_decision_function = predictions oob_prediction = np.argmax(predictions, axis=1) if n_classes == 2: oob_prediction = oob_prediction.astype(np.int64) self.oob_decision_function_ = oob_decision_function self.oob_prediction_ = oob_prediction self.oob_score_ = accuracy_score(y, oob_prediction)
Warum OOB in der ML-Finanzwelt wichtig ist:
- Kein Datenmüll: Jede Stichprobe erfüllt eine doppelte Aufgabe – Training einiger Schätzer, Validierung anderer
- Ehrliche Schätzungen: Die OOB-Punktzahl ist eine Annäherung an die Kreuzvalidierung, ohne dass dies mit einem hohen Rechenaufwand verbunden ist.
- Frühzeitiges Stoppsignal: Überwachen der OOB-Punktzahl während des Trainings, um eine Überanpassung zu erkennen.
- Zeitliche Sicherheit: Beim sequentiellen Bootstrap sind die OOB-Stichproben zeitlich wirklich unabhängig.
Schritt 6: Klasse Erweiterung
Wir fassen alles zusammen, indem wir die Klassen SequentiallyBootstrappedBaseBagging, SequentiallyBootstrappedBaggingClassifier und SequentiallyBootstrappedBaggingRegressor erstellen , die in sb_bagging.py. zu finden sind.
Vollständiges Verwendungsbeispiel
import pandas as pd import numpy as np from sklearn.tree import DecisionTreeClassifier from sklearn.metrics import classification_report # Assume we have triple-barrier labels from Part 3 # samples_info_sets: pd.Series with index=t0, values=t1 # price_bars: pd.DataFrame with DatetimeIndex # X: feature matrix, y: labels # Initialize classifier clf = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, # Label temporal spans price_bars_index=price_bars.index, # Bar timestamps estimator=DecisionTreeClassifier( max_depth=6, min_samples_leaf=50 ), n_estimators=100, # Large ensemble for stability max_samples=0.5, # Use 50% of data per estimator bootstrap_features=True, # Also subsample features max_features=0.7, # Use 70% of features per estimator oob_score=True, # Enable OOB validation n_jobs=-1, # Use all CPU cores random_state=42, # Reproducibility verbose=1 ) # Train ensemble clf.fit(X_train, y_train) # Inspect OOB performance (no test set needed!) print(f"OOB Score: {clf.oob_score_:.4f}") # Make predictions on test set y_pred = clf.predict(X_test) y_proba = clf.predict_proba(X_test) # Evaluate print(classification_report(y_test, y_pred)) # Access individual estimators if needed print(f"Number of estimators: {len(clf.estimators_)}") print(f"Average sample size: {np.mean([len(s) for s in clf.estimators_samples_]):.0f}")
Richtlinien für die Parameterabstimmung
n_estimators (Anzahl der Modelle)
- Kleine Datensätze (<1.000 Proben): 50-100 Schätzer
- Mittlere Datensätze (1.000-10.000): 100-200 Schätzer
- Große Datensätze (>10.000): 200-500 Schätzer
- Faustformel: Mehr ist besser, bis die OOB-Punktzahl ein Plateau erreicht (während des Trainings überwachen)
max_samples (Bootstrap-Stichprobenumfang)
- Hohe Gleichzeitigkeit (>10 Überschneidungen/Balken): Verwenden Sie kleinere Stichproben (0,3-0,5), um die Vielfalt zu maximieren.
- Geringe Gleichzeitigkeit (<5 Überschneidungen/Balken): Kann größere Proben (0,6-0,8) sicher verwenden
- Kompromiss: Kleinere Stichproben → mehr Vielfalt, aber schwächere individuelle Modelle
- Aktivieren, wenn: Hohe Anzahl von Merkmalen (>50), Merkmale sind korreliert, um maximale Vielfalt zu erreichen
- Deaktivieren, wenn: Wenige Merkmale (<20), jedes Merkmal ist entscheidend, Interpretierbarkeit ist wichtig
- Empfohlene max_features: 0,5-0,7 wenn aktiviert (zu niedrig schwächt einzelne Modelle)
Vergleich: Standard vs. Sequentielle Bootstrap-Verpackung
Schauen wir uns den Leistungsunterschied bei einer echten Handelsstrategie an:
from sklearn.ensemble import BaggingClassifier # Standard bagging (temporal leakage) standard_clf = BaggingClassifier( estimator=DecisionTreeClassifier(max_depth=6), n_estimators=100, max_samples=0.5, oob_score=True, random_state=42 ) # Sequential bootstrap bagging (temporal awareness) sequential_clf = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars.index, estimator=DecisionTreeClassifier(max_depth=6), n_estimators=100, max_samples=0.5, oob_score=True, random_state=42 ) # Train both standard_clf.fit(X_train, y_train) sequential_clf.fit(X_train, y_train) # Compare results print("Standard Bagging:") print(f" OOB Score: {standard_clf.oob_score_:.4f}") print(f" Test Accuracy: {standard_clf.score(X_test, y_test):.4f}") print("\nSequential Bootstrap Bagging:") print(f" OOB Score: {sequential_clf.oob_score_:.4f}") print(f" Test Accuracy: {sequential_clf.score(X_test, y_test):.4f}")
Typische Ergebnisse bei Finanzdaten mit hoher Gleichzeitigkeit:
| Metrik | Standard-Bagging | Sequentielles Bagging | Verbesserung |
|---|---|---|---|
| OOB-Test Gap | 0.124 | 0.013 | -89.5% |
Integration mit Kreuzvalidierung
Die OOB-Bewertung ist zwar praktisch, aber eine korrekte Bewertung erfordert eine bereinigte Kreuzvalidierung, um zeitliche Lecks zu vermeiden:
from mlfinlab.cross_validation import PurgedKFold # Setup purged cross-validation cv = PurgedKFold( n_splits=5, samples_info_sets=samples_info_sets, pct_embargo=0.01 # Embargo 1% of data after each fold )
Das ist wichtig: Selbst mit sequentiellen Bootstrap-Stichproben innerhalb jedes Schätzers benötigen wir immer noch einen bereinigten Lebenslauf, um das Ensemble richtig zu bewerten. Sequentieller Bootstrap behandelt Überschneidungen zwischen den Schätzern; bereinigte Kreuzvalidierung behandelt zeitliche Lecks zwischen den Falten.
Wir müssen unsere eigenen Kreuzvalidierungsmethoden implementieren, um unsere sequentiell gebootstrappten Bagging-Modelle zu berücksichtigen. Die nachstehende Funktion bietet eine umfassende Analyse dessen, was in jeder Falte geschieht, und ist für eine tiefere Analyse der zeitlichen Abhängigkeiten Ihrer Daten erforderlich.
def analyze_cross_val_scores( classifier: ClassifierMixin, X: pd.DataFrame, y: pd.Series, cv_gen: BaseCrossValidator, sample_weight_train: np.ndarray = None, sample_weight_score: np.ndarray = None, ): # pylint: disable=invalid-name # pylint: disable=comparison-with-callable """ Advances in Financial Machine Learning, Snippet 7.4, page 110. Using the PurgedKFold Class. Function to run a cross-validation evaluation of the classifier using sample weights and a custom CV generator. Scores are computed using accuracy_score, probability_weighted_accuracy, log_loss and f1_score. Note: This function is different to the book in that it requires the user to pass through a CV object. The book will accept a None value as a default and then resort to using PurgedCV, this also meant that extra arguments had to be passed to the function. To correct this we have removed the default and require the user to pass a CV object to the function. Example: .. code-block:: python cv_gen = PurgedKFold(n_splits=n_splits, t1=t1, pct_embargo=pct_embargo) scores_array = ml_cross_val_scores_all(classifier, X, y, cv_gen, sample_weight_train=sample_train, sample_weight_score=sample_score, scoring=accuracy_score) :param classifier: (BaseEstimator) A scikit-learn Classifier object instance. :param X: (pd.DataFrame) The dataset of records to evaluate. :param y: (pd.Series) The labels corresponding to the X dataset. :param cv_gen: (BaseCrossValidator) Cross Validation generator object instance. :param sample_weight_train: (np.array) Sample weights used to train the model for each record in the dataset. :param sample_weight_score: (np.array) Sample weights used to evaluate the model quality. :return: tuple(dict, pd.DataFrame, dict) The computed scores, a data frame of mean and std. deviation, and a dict of data in each fold """ scoring_methods = [ accuracy_score, probability_weighted_accuracy, log_loss, precision_score, recall_score, f1_score, ] ret_scores = { ( scoring.__name__.replace("_score", "") .replace("probability_weighted_accuracy", "pwa") .replace("log_loss", "neg_log_loss") ): np.zeros(cv_gen.n_splits) for scoring in scoring_methods } # If no sample_weight then broadcast a value of 1 to all samples (full weight). if sample_weight_train is None: sample_weight_train = np.ones((X.shape[0],)) if sample_weight_score is None: sample_weight_score = np.ones((X.shape[0],)) seq_bootstrap = isinstance(classifier, SequentiallyBootstrappedBaggingClassifier) if seq_bootstrap: t1 = classifier.samples_info_sets.copy() common_idx = t1.index.intersection(y.index) X, y, t1 = X.loc[common_idx], y.loc[common_idx], t1.loc[common_idx] if t1.empty: raise KeyError(f"samples_info_sets not aligned with data") classifier.set_params(oob_score=False) cms = [] # To store confusion matrices # Score model on KFolds for i, (train, test) in enumerate(cv_gen.split(X=X, y=y)): if seq_bootstrap: classifier = clone(classifier).set_params( samples_info_sets=t1.iloc[train] ) # Create new instance fit = classifier.fit( X=X.iloc[train, :], y=y.iloc[train], sample_weight=sample_weight_train[train], ) prob = fit.predict_proba(X.iloc[test, :]) pred = fit.predict(X.iloc[test, :]) params = dict( y_true=y.iloc[test], y_pred=pred, labels=classifier.classes_, sample_weight=sample_weight_score[test], ) for method, scoring in zip(ret_scores.keys(), scoring_methods): if scoring in (probability_weighted_accuracy, log_loss): score = scoring( y.iloc[test], prob, sample_weight=sample_weight_score[test], labels=classifier.classes_, ) if method == "neg_log_loss": score *= -1 else: try: score = scoring(**params) except: del params["labels"] score = scoring(**params) params["labels"] = classifier.classes_ ret_scores[method][i] = score cms.append(confusion_matrix(**params).round(2)) # Mean and standard deviation of scores scores_df = pd.DataFrame.from_dict( { scoring: {"mean": scores.mean(), "std": scores.std()} for scoring, scores in ret_scores.items() }, orient="index", ) # Extract TN, TP, FP, FN for each fold confusion_matrix_breakdown = [] for i, cm in enumerate(cms, 1): if cm.shape == (2, 2): # Binary classification tn, fp, fn, tp = cm.ravel() confusion_matrix_breakdown.append({"fold": i, "TN": tn, "FP": fp, "FN": fn, "TP": tp}) else: # For multi-class, you might want different handling confusion_matrix_breakdown.append({"fold": i, "confusion_matrix": cm}) return ret_scores, scores_df, confusion_matrix_breakdown
Nachfolgend sind die Ergebnisse einer meta-bezeichneten Bollinger Band Mean-Reversion-Strategie aufgeführt (siehe sample_weights.ipynb). Alle Ergebnisse sind besser und haben eine geringere Varianz, wenn sequentielles Bootstrapping verwendet wird.
Ergebnisse der Kreuzvalidierung:
| Random Forest | Standard-Bagging | Sequentielles Bagging | |
|---|---|---|---|
| Genauigkeit | 0.509 ± 0.024 | 0.515 ± 0.024 | 0.527 ± 0.015 |
| pwa | 0.513 ± 0.038 | 0.519 ± 0.039 | 0.544 ± 0.018 |
| neg_log_loss | -0.695 ± 0.005 | -0.694 ± 0.005 | -0.692 ± 0.001 |
| Präzision | 0.637 ± 0.027 | 0.643 ± 0.026 | 0.637 ± 0.026 |
| recall | 0.476 ± 0.095 | 0.484 ± 0.098 | 0.567 ± 0.038 |
| f1 | 0.539 ± 0.065 | 0.546 ± 0.067 | 0.599 ± 0.026 |
Ergebnisse außerhalb der Stichprobe:
| Random Forest | Standard-Bagging | Sequentielles Bagging | |
|---|---|---|---|
| Genauigkeit | 0.505780 | 0.496628 | 0.519750 |
| pwa | 0.493505 | 0.495487 | 0.523738 |
| neg_log_loss | -0.696703 | -0.696612 | -0.692669 |
| Präzision | 0.650811 | 0.646396 | 0.633913 |
| recall | 0.461303 | 0.439847 | 0.558621 |
| f1 | 0.539910 | 0.523484 | 0.593890 |
| oob | 0.516976 | 0.516133 | 0.522153 |
| oob_test_gap | 0.011195 | 0.019505 | 0.002403 |
Wichtige Beobachtungen:
- Der geringere Abstand zwischen OOB und Test für sequentielles Bagging (0,002 vs. 0,019) zeigt, dass die OOB-Schätzung vertrauenswürdig ist – OOB und Testleistung stimmen eng überein, was darauf hindeutet, dass es keine versteckten zeitlichen Lecks gibt.
- Höhere Testgenauigkeit zeigt eine bessere Verallgemeinerung auf wirklich ungesehene Daten.
Fortgeschrittene: Nutzerdefinierte OOB-Metriken
Der integrierte oob_score_ verwendet die Genauigkeit für die Klassifizierung und R² für die Regression. Für Finanzanwendungen benötigen Sie häufig nutzerdefinierte Metriken:
def compute_custom_oob_metrics(clf, X, y, sample_weight=None): """ Compute custom OOB metrics (F1, AUC, precision/recall) for a fitted ensemble. Args: clf: Fitted SequentiallyBootstrappedBaggingClassifier X: Feature matrix used in training y: True labels sample_weight: Optional sample weights Returns: dict: Custom OOB metric values """ from sklearn.metrics import f1_score, roc_auc_score, precision_score, recall_score n_samples = y.shape[0] n_classes = clf.n_classes_ # Accumulate OOB predictions oob_proba = np.zeros((n_samples, n_classes)) oob_count = np.zeros(n_samples) for estimator, samples, features in zip( clf.estimators_, clf.estimators_samples_, clf.estimators_features_ ): mask = ~indices_to_mask(samples, n_samples) if np.any(mask): X_oob = X[mask][:, features] oob_proba[mask] += estimator.predict_proba(X_oob) oob_count[mask] += 1 # Average and get predictions oob_mask = oob_count > 0 oob_proba[oob_mask] /= oob_count[oob_mask, np.newaxis] oob_pred = np.argmax(oob_proba, axis=1) # Compute metrics on samples with OOB predictions y_oob = y[oob_mask] pred_oob = oob_pred[oob_mask] proba_oob = oob_proba[oob_mask] metrics = { 'f1': f1_score(y_oob, pred_oob, average='weighted'), 'precision': precision_score(y_oob, pred_oob, average='weighted'), 'recall': recall_score(y_oob, pred_oob, average='weighted'), 'coverage': oob_mask.sum() / n_samples # Fraction with OOB predictions } # Add AUC for binary classification if n_classes == 2: metrics['auc'] = roc_auc_score(y_oob, proba_oob[:, 1]) return metrics # Usage oob_metrics = compute_custom_oob_metrics(sequential_clf, X_train, y_train) print("Custom OOB Metrics:") for metric, value in oob_metrics.items(): print(f" {metric}: {value:.4f}")
Überlegungen zur Produktionsbereitstellung
Speicherverwaltung
Große Ensembles können viel Speicherplatz beanspruchen. Überwachen und optimieren:
import sys # Check ensemble memory footprint def estimate_ensemble_size(clf): """Estimate memory usage of fitted ensemble.""" total_bytes = 0 # Estimators for est in clf.estimators_: total_bytes += sys.getsizeof(est) # Sample indices for samples in clf.estimators_samples_: total_bytes += samples.nbytes # Feature indices if clf.estimators_features_ is not None: for features in clf.estimators_features_: total_bytes += features.nbytes return total_bytes / (1024 ** 2) # Convert to MB size_mb = estimate_ensemble_size(sequential_clf) print(f"Ensemble size: {size_mb:.2f} MB")
Modell-Serialisierung
Speichern und laden Sie trainierte Ensembles effizient:
import joblib # Save entire ensemble joblib.dump(sequential_clf, 'sequential_bagging_model.pkl', compress=3) # Load for prediction loaded_clf = joblib.load('sequential_bagging_model.pkl') # Verify predictions match original_pred = sequential_clf.predict_proba(X_test) loaded_pred = loaded_clf.predict_proba(X_test) assert np.allclose(original_pred, loaded_pred)
Häufige Fallstricke und Lösungen
Fallstrick 1: Vergessen, zeitliche Metadaten zu übergeben
Problem: Der Versuch, den Klassifikator ohne samples_info_sets oder price_bars_index zu verwenden.Lösung: Achten Sie immer darauf, dass diese in Ihrem Prozess für die Kennzeichnung richtig aufgebaut sind:
# From triple-barrier labeling (Part 3) events = get_events( close=close_prices, t_events=trigger_times, pt_sl=[1, 1], target=daily_vol, min_ret=0.01, num_threads=4, vertical_barrier_times=vertical_barriers ) # events['t1'] contains end times - this is samples_info_sets samples_info_sets = events['t1'] price_bars_index = close_prices.index # Now safe to use clf = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars_index, # ... other params ... )
Fallstrick 2: Nicht übereinstimmende Indexlängen
Problem: len(samples_info_sets) != len(X) verursacht kryptische Fehler.Lösung: Bringen Sie Ihre Merkmale, Kennzeichnungen und Metadaten immer in Einklang:
# After computing features and labels, ensure alignment assert len(X) == len(y) == len(samples_info_sets), \ "Feature matrix, labels, and metadata must have same length" # If they don't match, use index intersection common_idx = X.index.intersection(y.index).intersection(samples_info_sets.index) X_aligned = X.loc[common_idx] y_aligned = y.loc[common_idx] samples_aligned = samples_info_sets.loc[common_idx]
Fallstrick 3: Ignorieren des Warmstart-Verhaltens
Problem: Durch das Setzen von warm_start=True und dann das Ändern von n_estimators werden die vorhandenen Schätzer nicht neu trainiert.Lösung: Verstehen Sie, dass der Warmstart nur Schätzer hinzufügt:
# Initial training with 50 estimators clf = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars_index, n_estimators=50, warm_start=True, random_state=42 ) clf.fit(X_train, y_train) # Add 50 more estimators (total=100) clf.n_estimators = 100 clf.fit(X_train, y_train) # Only trains 50 new estimators print(len(clf.estimators_)) # Output: 100
Benchmarking mit Alternativen
Wie schlägt sich das sequentielle Bootstrap-Bagging im Vergleich zu anderen Ensemble-Methoden bei Finanzdaten?
from sklearn.ensemble import ( RandomForestClassifier, GradientBoostingClassifier, BaggingClassifier ) from sklearn.model_selection import cross_val_score # Define models models = { 'Standard Bagging': BaggingClassifier( estimator=DecisionTreeClassifier(max_depth=6), n_estimators=100, random_state=42 ), 'Random Forest': RandomForestClassifier( n_estimators=100, max_depth=6, random_state=42 ), 'Sequential Bagging': SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars_index, estimator=DecisionTreeClassifier(max_depth=6), n_estimators=100, random_state=42 ) } # Benchmark with purged K-Fold CV results = {} cv_gen = PurgedKFold(n_splits, t1, pct_embargo) for name, model in models.items(): raw_scores, scores_df, folds = analyze_cross_val_scores( model, X, y, cv_gen, sample_weights_train=w, sample_weights_score=w, ) results[name] = dict(scores=scores_df, folds=folds)
Zusammenfassung und bewährte Praktiken
Der SequentiallyBootstrappedBaggingClassifier bringt die Leistung des Ensemble-Learnings in die Finanzzeitreihen ein, indem er das grundlegende Problem der Gleichzeitigkeit von Bezeichnungen angeht. Hier sind die wichtigsten Erkenntnisse:
Wann sollte man Sequential Bootstrap Bagging verwenden?
- Triple-Barrier-Kennzeichnung oder jede Methode, die zeitlich überlappende Kennzeichen erzeugt.
- Hochfrequenzdaten, bei denen sich die Beobachtungen natürlich überschneiden.
- Jede finanzielle ML-Aufgabe, bei der die zeitliche Struktur eine Rolle spielt.
- Produktionssysteme, die ehrliche Varianzschätzungen erfordern.
Wenn das Standard-Bagging ausreicht:
- Tägliche oder weniger häufige Daten mit minimaler Überschneidung der Kennzeichen
- Querschnittsprognosen (Vorhersagen über Vermögenswerte, nicht über die Zeit)
- Szenarien, in denen zeitliche Leckagen durch andere Mittel beseitigt wurden
Konfigurations-Checkliste für die Produktion:
- ✓ Überprüfe, ob samples_info_sets und price_bars_index richtig ausgerichtet sind.
- ✓ Aktiviere oob_score=True für die Überwachung während des Trainings.
- ✓ Setze n_jobs=-1, um alle CPU-Kerne zu nutzen.
- ✓ Verwende random_state für Reproduzierbarkeit.
- ✓ Überwachung der Speichernutzung bei großen Ensembles.
- ✓ Validiere mit bereinigter/gesperrter Kreuzvalidierung.
- ✓ Vergleiche zwischen OOB und Testleistung, um verbleibende Leckagen zu erkennen.
Tipps zur Leistungsoptimierung:
- active_indices_ einmal vorberechnen und zwischenspeichern
- Verwende kleinerer max_samples bei hoher Gleichzeitigkeit.
- Aktiviere bootstrap_features für hochdimensionale Daten.
- Batch-Vorhersagen für Anwendungen mit niedriger Latenzzeit.
- Serialisierung von Modellen mit Komprimierung für die Bereitstellung.
Mit dem sequentiellen Bootstrap-Bagging steht Ihnen nun eine produktionsreife Ensemble-Methode zur Verfügung, die die zeitliche Struktur von Finanzdaten berücksichtigt und gleichzeitig die Vorteile der Varianzreduzierung bietet, die das Bagging beim traditionellen maschinellen Lernen so leistungsstark machen.
Bereitstellung von sequenziellen Bootstrap-Modellen für MQL5 über ONNX
Nach dem Training robuster sequentieller Bootstrap-Modelle in Python besteht der nächste kritische Schritt darin, sie im MetaTrader 5 für den Live-Handel einzusetzen. ONNX (Open Neural Network Exchange) bietet die zuverlässigste Brücke zwischen dem reichen ML-Ökosystem von Python und der Produktionsumgebung von MQL5.
Warum ONNX für die MQL5-Bereitstellung
ONNX bietet mehrere überzeugende Vorteile für den Einsatz von ML-Modellen im Finanzbereich:
- Native MetaTrader 5-Unterstützung – MetaTrader 5 hat eine eingebaute ONNX-Laufzeit, keine externen Abhängigkeiten erforderlich
- Leistung – Modelle werden als kompilierter C++-Code ausgeführt und liefern Vorhersagen im Mikrosekundenbereich
- Plattformübergreifend – Dasselbe Modell funktioniert auf Windows-, Mac- und Linux-Installationen von MetaTrader 5
- Breite Kompatibilität – Unterstützt scikit-learn-Ensembles, einschließlich unserer sequentiellen Bootstrap-Modelle
- Versionskontrolle – Binäre Modelldateien lassen sich leicht versionieren und bereitstellen
Die wichtigsten Einschränkungen sind zu verstehen:
- Ensemble-Metadaten (OOB-Bewertungen, Schätzer-Stichproben) werden nicht beibehalten – nur die Vorhersagelogik
- Modelle können in MQL5 nicht neu trainiert werden; Python bleibt die Trainingsumgebung
- Große Ensembles (200+ Schätzer) erhöhen die Ladezeit des Modells und den Speicherbedarf
- Merkmalsberechnungen müssen manuell in MQL5 mit exakter Parität zu Python repliziert werden
Vollständige Bereitstellungspipeline
Schritt 1: Exportieren des trainierten Modells in das ONNX-Format
Nachdem Sie Ihren sequentiellen Bootstrap-Klassifikator trainiert haben, konvertieren Sie ihn in ONNX:
import onnx from skl2onnx import convert_sklearn from skl2onnx.common.data_types import FloatTensorType import numpy as np # Your trained sequential bootstrap model clf = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars.index, estimator=DecisionTreeClassifier(max_depth=6, min_samples_leaf=50), n_estimators=100, max_samples=0.5, random_state=42 ) clf.fit(X_train, y_train) # Define input shape - CRITICAL: must match feature count exactly n_features = X_train.shape[1] initial_type = [('float_input', FloatTensorType([None, n_features]))] # Convert to ONNX with appropriate settings onnx_model = convert_sklearn( clf, initial_types=initial_type, target_opset=12, # MT5 supports opset 9-15 options={ 'zipmap': False # Return raw probabilities, not dictionary } ) # Save model file model_filename = "sequential_bagging_model.onnx" with open(model_filename, "wb") as f: f.write(onnx_model.SerializeToString()) print(f"Model exported: {len(onnx_model.SerializeToString()) / 1024:.2f} KB") print(f"Input features: {n_features}") print(f"Output classes: {len(clf.classes_)}")
Schritt 2: Überprüfung der Korrektheit des ONNX-Modells
Überprüfen Sie vor dem Einsatz immer, ob die ONNX-Vorhersagen mit Ihrem ursprünglichen Modell übereinstimmen:
import onnxruntime as rt # Load ONNX model sess = rt.InferenceSession(model_filename) # Inspect model structure input_name = sess.get_inputs()[0].name output_name = sess.get_outputs()[0].name print(f"Input tensor name: {input_name}") print(f"Output tensor name: {output_name}") # Test with sample data X_test_sample = X_test[:5].astype(np.float32) # Original model predictions sklearn_pred = clf.predict_proba(X_test_sample) # ONNX model predictions onnx_pred = sess.run([output_name], {input_name: X_test_sample})[0] # Verify predictions match within tolerance print("\nVerification Results:") print("Scikit-learn predictions:\n", sklearn_pred[:3]) print("\nONNX predictions:\n", onnx_pred[:3]) print(f"\nMax absolute difference: {np.abs(sklearn_pred - onnx_pred).max():.2e}") assert np.allclose(sklearn_pred, onnx_pred, atol=1e-5), "ERROR: Predictions don't match!" print("✓ Model verification passed")
Schritt 3: Pipeline für die Entwicklung von Dokumentenmerkmalen
Der häufigste Fehler bei der Bereitstellung ist eine falsche Ausrichtung der Merkmale. Dokumentieren Sie Ihre genauen Merkmalsberechnungen:
import json from datetime import datetime # Document feature metadata for MQL5 implementation feature_metadata = { 'model_version': 'v1.0_seq_bagging', 'timestamp': datetime.now().isoformat(), 'n_features': n_features, 'n_estimators': 100, 'lookback_period': 20, 'feature_names': [ 'bb_position', # (close - bb_middle) / (bb_upper - bb_lower) 'bb_width', # (bb_upper - bb_lower) / bb_middle 'return_1d', # (close[0] - close[1]) / close[1] 'return_5d', # (close[0] - close[5]) / close[5] 'volatility_20d', # std(returns, 20) / close[0] 'volume_ratio', # volume[0] / ma(volume, 20) 'rsi_14', # RSI with 14-period lookback 'mean_reversion_z', # (close - ma_20) / std_20 ], 'bb_parameters': { 'period': 20, 'std_dev': 2.0 } } with open('feature_metadata.json', 'w') as f: json.dump(feature_metadata, f, indent=2) # Create test dataset for validation in MQL5 test_data = { 'features': X_test[:10].tolist(), 'expected_predictions': clf.predict_proba(X_test[:10]).tolist(), 'expected_classes': clf.predict(X_test[:10]).tolist() } with open('test_predictions.json', 'w') as f: json.dump(test_data, f, indent=2)
MQL5-Implementierung: Die Strategie der Rückkehr zum Mittelwert mit den Bollinger Bändern
Jetzt implementieren wir das komplette MQL5-System, beginnend mit einem präzisen Feature-Engineering, das unserem Python-Training entspricht.
Modul zur Merkmalsberechnung
Wir erstellen FeatureEngine.mqh, um die Python-Featureberechnungen exakt zu replizieren:
//+------------------------------------------------------------------+ //| FeatureEngine.mqh | //| Feature calculation engine matching Python training pipeline | //+------------------------------------------------------------------+ #property strict class CFeatureEngine { private: int m_lookback; int m_bb_period; double m_bb_deviation; int m_rsi_period; public: CFeatureEngine(int lookback=20, int bb_period=20, double bb_dev=2.0, int rsi_period=14) { m_lookback = lookback; m_bb_period = bb_period; m_bb_deviation = bb_dev; m_rsi_period = rsi_period; } // Main feature calculation - must match Python exactly bool CalculateFeatures(const double &close[], const double &high[], const double &low[], const long &volume[], double &features[]) { if(ArraySize(close) < m_lookback + 10) return false; // Must have exactly 8 features to match Python ArrayResize(features, 8); int idx = 0; // Calculate Bollinger Bands double bb_upper, bb_middle, bb_lower; CalculateBollingerBands(close, m_bb_period, m_bb_deviation, bb_upper, bb_middle, bb_lower); // Feature 1: Bollinger Band Position // Measures where price sits within the bands (-1 to +1) double bb_range = bb_upper - bb_lower; if(bb_range > 0) { features[idx++] = (close[0] - bb_middle) / bb_range; } else { features[idx++] = 0.0; } // Feature 2: Bollinger Band Width // Normalized measure of volatility if(bb_middle > 0) { features[idx++] = bb_range / bb_middle; } else { features[idx++] = 0.0; } // Feature 3: 1-day return features[idx++] = SafeReturn(close[0], close[1]); // Feature 4: 5-day return features[idx++] = SafeReturn(close[0], close[5]); // Feature 5: 20-day volatility (annualized) double returns_std = CalculateReturnsStdDev(close, m_lookback); features[idx++] = returns_std / close[0]; // Feature 6: Volume ratio double vol_ma = CalculateVolumeMA(volume, m_lookback); if(vol_ma > 0) { features[idx++] = (double)volume[0] / vol_ma; } else { features[idx++] = 1.0; } // Feature 7: RSI features[idx++] = CalculateRSI(close, m_rsi_period) / 100.0; // Feature 8: Mean reversion Z-score double ma_20 = CalculateMA(close, 20); double std_20 = CalculateStdDev(close, 20); if(std_20 > 0) { features[idx++] = (close[0] - ma_20) / std_20; } else { features[idx++] = 0.0; } return true; } private: // Calculate Bollinger Bands using SMA and standard deviation void CalculateBollingerBands(const double &close[], int period, double deviation, double &upper, double &middle, double &lower) { middle = CalculateMA(close, period); double std = CalculateStdDev(close, period); upper = middle + deviation * std; lower = middle - deviation * std; } // Simple Moving Average double CalculateMA(const double &data[], int period) { double sum = 0.0; for(int i = 0; i < period; i++) { sum += data[i]; } return sum / period; } // Standard Deviation double CalculateStdDev(const double &data[], int period) { double mean = CalculateMA(data, period); double sum_sq = 0.0; for(int i = 0; i < period; i++) { double diff = data[i] - mean; sum_sq += diff * diff; } return MathSqrt(sum_sq / period); } // Standard deviation of returns (not prices) double CalculateReturnsStdDev(const double &close[], int period) { double returns[]; ArrayResize(returns, period); for(int i = 0; i < period; i++) { returns[i] = SafeReturn(close[i], close[i+1]); } return CalculateStdDev(returns, period); } // RSI calculation // RSI calculation double CalculateRSI(const double &close[], int period) { double gains = 0.0, losses = 0.0; for(int i = 1; i <= period; i++) { double change = close[i-1] - close[i]; if(change > 0) { gains += change; } else { losses -= change; } } double avg_gain = gains / period; double avg_loss = losses / period; if(avg_loss == 0.0) return 100.0; double rs = avg_gain / avg_loss; return 100.0 - (100.0 / (1.0 + rs)); } // Volume moving average double CalculateVolumeMA(const long &volume[], int period) { double sum = 0.0; for(int i = 0; i < period; i++) { sum += (double)volume[i]; } return sum / period; } // Safe return calculation with division by zero protection double SafeReturn(double current, double previous) { if(previous == 0.0 || MathAbs(previous) < 1e-10) return 0.0; return (current - previous) / previous; } };
Hauptintegration des Expert Advisors mit ONNX
Wir erstellen den EA, der das ONNX-Modell lädt und die Strategie der Bollinger Bänder mit der Rückkehr zum Mittelwert ausführt:
//+------------------------------------------------------------------+ //| SequentialBaggingEA.mq5 | //| Bollinger Band Mean Reversion with Sequential Bootstrap Model | //+------------------------------------------------------------------+ #property copyright "Your Name" #property version "1.00" #property strict #include <Trade\Trade.mqh> #include "FeatureEngine.mqh" //--- Input parameters input group "Model Settings" input string InpModelFile = "sequential_bagging_model.onnx"; // ONNX model filename input double InpConfidenceThreshold = 0.60; // Minimum confidence for trade input group "Feature Parameters" input int InpLookback = 20; // Feature lookback period input int InpBBPeriod = 20; // Bollinger Bands period input double InpBBDeviation = 2.0; // Bollinger Bands deviation input int InpRSIPeriod = 14; // RSI period input group "Risk Management" input double InpRiskPercent = 1.0; // Risk per trade (%) input int InpStopLoss = 200; // Stop loss (points) input int InpTakeProfit = 400; // Take profit (points) input int InpMaxTrades = 1; // Maximum concurrent trades input group "Trading Hours" input bool InpUseTradingHours = false; // Enable trading hours filter input int InpStartHour = 9; // Trading start hour input int InpEndHour = 17; // Trading end hour //--- Global variables long g_model_handle = INVALID_HANDLE; CTrade g_trade; CFeatureEngine g_features; datetime g_last_bar_time = 0; //+------------------------------------------------------------------+ //| Expert initialization function | //+------------------------------------------------------------------+ int OnInit() { // Initialize feature engine g_features.CFeatureEngine(InpLookback, InpBBPeriod, InpBBDeviation, InpRSIPeriod); // Load ONNX model from MQL5/Files directory g_model_handle = OnnxCreateFromFile( InpModelFile, ONNX_DEFAULT ); if(g_model_handle == INVALID_HANDLE) { Print("❌ Failed to load ONNX model: ", InpModelFile); Print("Ensure model is in: Terminal_Data_Folder/MQL5/Files/"); return INIT_FAILED; } // Verify model structure long input_count, output_count; OnnxGetInputCount(g_model_handle, input_count); OnnxGetOutputCount(g_model_handle, output_count); vector input_shape; OnnxGetInputShape(g_model_handle, 0, input_shape); Print("✓ Model loaded successfully"); Print(" Model file: ", InpModelFile); Print(" Input count: ", input_count); Print(" Output count: ", output_count); Print(" Expected features: ", (int)input_shape[1]); Print(" Confidence threshold: ", InpConfidenceThreshold); // Set trade parameters g_trade.SetExpertMagicNumber(20241102); g_trade.SetDeviationInPoints(10); g_trade.SetTypeFilling(ORDER_FILLING_FOK); return INIT_SUCCEEDED; } //+------------------------------------------------------------------+ //| Expert deinitialization function | //+------------------------------------------------------------------+ void OnDeinit(const int reason) { if(g_model_handle != INVALID_HANDLE) { OnnxRelease(g_model_handle); Print("Model released"); } } //+------------------------------------------------------------------+ //| Expert tick function | //+------------------------------------------------------------------+ void OnTick() { // Check for new bar datetime current_bar_time = iTime(_Symbol, _Period, 0); if(current_bar_time == g_last_bar_time) return; g_last_bar_time = current_bar_time; // Trading hours filter if(InpUseTradingHours) { MqlDateTime dt; TimeToStruct(TimeCurrent(), dt); if(dt.hour < InpStartHour || dt.hour >= InpEndHour) return; } // Get market data double close[], high[], low[]; long volume[]; ArraySetAsSeries(close, true); ArraySetAsSeries(high, true); ArraySetAsSeries(low, true); ArraySetAsSeries(volume, true); int required_bars = InpLookback + 10; int copied = CopyClose(_Symbol, _Period, 0, required_bars, close); if(copied < required_bars) { Print("Insufficient bars: ", copied, " < ", required_bars); return; } CopyHigh(_Symbol, _Period, 0, required_bars, high); CopyLow(_Symbol, _Period, 0, required_bars, low); CopyTickVolume(_Symbol, _Period, 0, required_bars, volume); // Calculate features double feature_array[]; if(!g_features.CalculateFeatures(close, high, low, volume, feature_array)) { Print("Feature calculation failed"); return; } // Prepare input matrix for ONNX (must be float32) matrix input_matrix(1, ArraySize(feature_array)); for(int i = 0; i < ArraySize(feature_array); i++) { input_matrix[0][i] = (float)feature_array[i]; } // Run model inference matrix output_matrix; if(!OnnxRun(g_model_handle, ONNX_NO_CONVERSION, input_matrix, output_matrix)) { Print("❌ Model prediction failed!"); return; } // Extract probabilities // Output shape: [1, 2] for binary classification // Class 0 = SELL signal, Class 1 = BUY signal double prob_sell = output_matrix[0][0]; double prob_buy = output_matrix[0][1]; // Log predictions for monitoring Comment(StringFormat( "Sequential Bootstrap EA\n" + "Time: %s\n" + "Prob SELL: %.2f%%\n" + "Prob BUY: %.2f%%\n" + "Threshold: %.2f%%\n" + "Positions: %d/%d", TimeToString(TimeCurrent(), TIME_DATE|TIME_MINUTES), prob_sell * 100, prob_buy * 100, InpConfidenceThreshold * 100, PositionsTotal(), InpMaxTrades )); // Trading logic: Mean reversion strategy if(PositionsTotal() < InpMaxTrades) { // BUY signal: Price at lower band, model predicts reversion up if(prob_buy > InpConfidenceThreshold) { ExecuteBuy(prob_buy, feature_array); } // SELL signal: Price at upper band, model predicts reversion down else if(prob_sell > InpConfidenceThreshold) { ExecuteSell(prob_sell, feature_array); } } } //+------------------------------------------------------------------+ //| Execute BUY order | //+------------------------------------------------------------------+ void ExecuteBuy(double confidence, const double &features[]) { double price = SymbolInfoDouble(_Symbol, SYMBOL_ASK); double sl = price - InpStopLoss * _Point; double tp = price + InpTakeProfit * _Point; // Calculate position size based on risk double lot = CalculateLotSize(InpRiskPercent, InpStopLoss); // Build comment with BB position for analysis string comment = StringFormat( "SB|BUY|Conf:%.2f|BB:%.3f", confidence, features[0] // BB position feature ); if(g_trade.Buy(lot, _Symbol, price, sl, tp, comment)) { Print("✓ BUY executed: Lot=", lot, " Conf=", confidence, " BB=", features[0]); } else { Print("❌ BUY failed: ", g_trade.ResultRetcodeDescription()); } } //+------------------------------------------------------------------+ //| Execute SELL order | //+------------------------------------------------------------------+ void ExecuteSell(double confidence, const double &features[]) { double price = SymbolInfoDouble(_Symbol, SYMBOL_BID); double sl = price + InpStopLoss * _Point; double tp = price - InpTakeProfit * _Point; double lot = CalculateLotSize(InpRiskPercent, InpStopLoss); string comment = StringFormat( "SB|SELL|Conf:%.2f|BB:%.3f", confidence, features[0] ); if(g_trade.Sell(lot, _Symbol, price, sl, tp, comment)) { Print("✓ SELL executed: Lot=", lot, " Conf=", confidence, " BB=", features[0]); } else { Print("❌ SELL failed: ", g_trade.ResultRetcodeDescription()); } } //+------------------------------------------------------------------+ //| Calculate lot size based on risk percentage | //+------------------------------------------------------------------+ double CalculateLotSize(double risk_percent, int sl_points) { double account_balance = AccountInfoDouble(ACCOUNT_BALANCE); double risk_amount = account_balance * risk_percent / 100.0; double tick_value = SymbolInfoDouble(_Symbol, SYMBOL_TRADE_TICK_VALUE); double tick_size = SymbolInfoDouble(_Symbol, SYMBOL_TRADE_TICK_SIZE); // Calculate value of stop loss in account currency double point_value = tick_value / tick_size; double sl_value = sl_points * _Point * point_value; // Calculate lot size double lot_size = risk_amount / sl_value; // Normalize to broker's lot step double lot_step = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_STEP); lot_size = MathFloor(lot_size / lot_step) * lot_step; // Apply broker limits double min_lot = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_MIN); double max_lot = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_MAX); return MathMax(min_lot, MathMin(max_lot, lot_size)); }
Checkliste für den Einsatz und die Validierung
Überprüfung vor Einsatzbeginn
Bevor wir unseren EA in der produktiv einsetzen, sollten wir diese wichtigen Validierungsschritte durchführen:
1. Merkmal Paritätstest
# Python: Generate test vectors with known outputs import json # Create detailed test cases test_cases = [] for i in range(10): features = X_test[i] prediction = clf.predict_proba([features])[0] test_cases.append({ 'test_id': i, 'features': features.tolist(), 'expected_prob_sell': float(prediction[0]), 'expected_prob_buy': float(prediction[1]), 'expected_class': int(clf.predict([features])[0]), 'tolerance': 1e-4 }) with open('mql5_validation_tests.json', 'w') as f: json.dump(test_cases, f, indent=2) print(f"Generated {len(test_cases)} test cases for MQL5 validation")
2. MQL5-Validierungsskript erstellen
//+------------------------------------------------------------------+ //| ValidationScript.mq5 | //| Validates ONNX model predictions against Python test cases | //+------------------------------------------------------------------+ #property script_show_inputs input string InpModelFile = "sequential_bagging_model.onnx"; void OnStart() { // Load model long model = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT); if(model == INVALID_HANDLE) { Print("Failed to load model"); return; } // Test case 1: Manually input features from Python double test_features[] = { -0.523, // bb_position 0.042, // bb_width -0.012, // return_1d -0.034, // return_5d 0.018, // volatility_20d 1.234, // volume_ratio 0.425, // rsi_14 (normalized) -1.823 // mean_reversion_z }; // Expected output from Python (copy from test file) double expected_prob_sell = 0.234; double expected_prob_buy = 0.766; // Run prediction matrix input(1, 8); for(int i=0; i<8; i++) { input[0][i] = (float)test_features[i]; } matrix output; OnnxRun(model, ONNX_NO_CONVERSION, input, output); double mql5_prob_sell = output[0][0]; double mql5_prob_buy = output[0][1]; // Validate double tolerance = 0.0001; bool sell_match = MathAbs(mql5_prob_sell - expected_prob_sell) < tolerance; bool buy_match = MathAbs(mql5_prob_buy - expected_prob_buy) < tolerance; Print("========== VALIDATION RESULTS =========="); Print("Expected SELL prob: ", expected_prob_sell); Print("MQL5 SELL prob: ", mql5_prob_sell); Print("Difference: ", MathAbs(mql5_prob_sell - expected_prob_sell)); Print("Match: ", sell_match ? "✓ PASS" : "✗ FAIL"); Print(""); Print("Expected BUY prob: ", expected_prob_buy); Print("MQL5 BUY prob: ", mql5_prob_buy); Print("Difference: ", MathAbs(mql5_prob_buy - expected_prob_buy)); Print("Match: ", buy_match ? "✓ PASS" : "✗ FAIL"); Print("========================================"); if(sell_match && buy_match) { Print("✓✓✓ VALIDATION PASSED ✓✓✓"); } else { Print("✗✗✗ VALIDATION FAILED ✗✗✗"); Print("Check feature calculations!"); } OnnxRelease(model); }
Häufige Probleme bei der Bereitstellung und Lösungen
| Ausgabe | Symptom | Lösung |
|---|---|---|
| Merkmal Versatz | Vorhersagen weichen um >1% von Python ab | Validierungsskript verwenden. Überprüfung der Berechnungsreihenfolge, der Rückblickzeiträume und der Handhabung der Division durch Null |
| Ladefehler des Modells | INVALID_HANDLE bei OnnxCreateFromFile | Überprüfen Sie, ob die Datei in MQL5/Dateien/ liegt, prüfen Sie die Schreibweise des Dateinamens, stellen Sie die Kompatibilität der Opsets sicher (9-15) |
| Falsche Eingabeform | OnnxRun gibt false zurück | Überprüfen Sie, ob die Anzahl der Merkmale mit dem Training übereinstimmt. Verwendung von OnnxGetInputShape zur Überprüfung der erwarteten Abmessungen. |
| Langsame Vorhersagen | EA verzögert bei jedem Tick | Verringern Sie n_estimators, vereinfachen Sie die Bäume (geringere max_depth), oder führen Sie Vorhersagen nur für neue Balken durch. |
| Index-Array-Fehler | Warnungen von ArraySetAsSeries | Immer ArraySetAsSeries(array, true) vor CopyClose/High/Low Operationen aufrufen. |
Dashboard zur Einsatzüberwachung
Fügen Sie diesen Code hinzu, um die Leistung des Modells in Echtzeit zu verfolgen:
//--- Add to global variables section struct PredictionStats { int total_predictions; int buy_signals; int sell_signals; double avg_confidence; double max_confidence; double min_confidence; } g_stats; //--- Add to OnInit() void ResetStats() { g_stats.total_predictions = 0; g_stats.buy_signals = 0; g_stats.sell_signals = 0; g_stats.avg_confidence = 0.0; g_stats.max_confidence = 0.0; g_stats.min_confidence = 1.0; } //--- Add after model prediction in OnTick() void UpdateStats(double prob_sell, double prob_buy) { g_stats.total_predictions++; double max_prob = MathMax(prob_sell, prob_buy); if(prob_buy > InpConfidenceThreshold) g_stats.buy_signals++; if(prob_sell > InpConfidenceThreshold) g_stats.sell_signals++; g_stats.avg_confidence = (g_stats.avg_confidence * (g_stats.total_predictions - 1) + max_prob) / g_stats.total_predictions; g_stats.max_confidence = MathMax(g_stats.max_confidence, max_prob); g_stats.min_confidence = MathMin(g_stats.min_confidence, max_prob); } //--- Enhanced Comment() display Comment(StringFormat( "=== Sequential Bootstrap EA ===\n" + "Time: %s\n\n" + "Current Prediction:\n" + " SELL: %.2f%% %s\n" + " BUY: %.2f%% %s\n\n" + "Statistics (Session):\n" + " Predictions: %d\n" + " BUY signals: %d\n" + " SELL signals: %d\n" + " Avg confidence: %.2f%%\n" + " Range: %.2f%% - %.2f%%\n\n" + "Positions: %d / %d", TimeToString(TimeCurrent(), TIME_DATE|TIME_MINUTES), prob_sell * 100, prob_sell > InpConfidenceThreshold ? "[SIGNAL]" : "", prob_buy * 100, prob_buy > InpConfidenceThreshold ? "[SIGNAL]" : "", g_stats.total_predictions, g_stats.buy_signals, g_stats.sell_signals, g_stats.avg_confidence * 100, g_stats.min_confidence * 100, g_stats.max_confidence * 100, PositionsTotal(), InpMaxTrades ));
Leistungsoptimierung für den Einsatz
Optimierung der Modellgröße
Für den Live-Handel sind kleinere Modelle mit vergleichbarer Leistung zu bevorzugen:
# Option 1: Train a smaller production model clf_prod = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars.index, estimator=DecisionTreeClassifier( max_depth=4, # Reduced from 6 min_samples_leaf=100 # Increased from 50 ), n_estimators=50, # Reduced from 100 max_samples=0.5, random_state=42 ) clf_prod.fit(X_train, y_train) # Compare performance print("Full model test accuracy:", clf.score(X_test, y_test)) print("Production model test accuracy:", clf_prod.score(X_test, y_test)) # Option 2: Feature selection to reduce input dimensionality from sklearn.feature_selection import SelectKBest, f_classif selector = SelectKBest(f_classif, k=6) # Keep only 6 best features X_train_selected = selector.fit_transform(X_train, y_train) X_test_selected = selector.transform(X_test) # Train on reduced features clf_reduced = SequentiallyBootstrappedBaggingClassifier( samples_info_sets=samples_info_sets, price_bars_index=price_bars.index, n_estimators=50, random_state=42 ) clf_reduced.fit(X_train_selected, y_train) # Show which features were selected selected_features = selector.get_support(indices=True) print("Selected feature indices:", selected_features) print("Reduced model accuracy:", clf_reduced.score(X_test_selected, y_test))
Alternative Einsatzmöglichkeiten: REST-API
Wenn die Beschränkungen von ONNX einschränkend werden (z. B. Notwendigkeit einer komplexen Vorverarbeitung oder häufiger Modellaktualisierungen), bietet eine REST-API mehr Flexibilität:
# Python: Simple Flask API from flask import Flask, request, jsonify import joblib import numpy as np app = Flask(__name__) model = joblib.load('sequential_bagging_model.pkl') @app.route('/predict', methods=['POST']) def predict(): try: features = np.array(request.json['features']).reshape(1, -1) proba = model.predict_proba(features)[0] return jsonify({ 'success': True, 'probability_sell': float(proba[0]), 'probability_buy': float(proba[1]), 'model_version': 'v1.0' }) except Exception as e: return jsonify({'success': False, 'error': str(e)}), 400 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)
//--- MQL5: HTTP client for REST API #include <JAson.mqh> // Or your preferred JSON library bool PredictViaAPI(const double &features[], double &prob_sell, double &prob_buy) { string url = "http://localhost:5000/predict"; // Build JSON request string json_request = "{"; json_request += "\"features\":["; for(int i=0; i<ArraySize(features); i++) { json_request += DoubleToString(features[i], 6); if(i < ArraySize(features)-1) json_request += ","; } json_request += "]}"; // Send HTTP POST request char post_data[]; char result_data[]; string headers = "Content-Type: application/json\r\n"; StringToCharArray(json_request, post_data, 0, WHOLE_ARRAY, CP_UTF8); int res = WebRequest("POST", url, headers, 5000, post_data, result_data, headers); if(res == -1) { Print("API request failed: ", GetLastError()); return false; } // Parse JSON response string response = CharArrayToString(result_data, 0, WHOLE_ARRAY, CP_UTF8); // Parse using your JSON library // prob_sell = parsed_value; // prob_buy = parsed_value; return true; }
REST API Kompromisse:
| Aspekt | ONNX (empfohlen) | REST-API |
|---|---|---|
| Latenzzeit | ~1ms | ~10-50ms |
| Komplexität | Niedrig (in sich geschlossen) | Mittel (Server erforderlich) |
| Aktualisierungen | Manuelle Dateiersetzung | Hot Reload möglich |
| Vorverarbeitung | Begrenzt (muss in MQL5 repliziert werden) | Vollständiges Python-Ökosystem |
| Infrastruktur | Keine erforderlich | Webserver + Überwachung |
Zusammenfassung bewährter Praktiken
Kritische Erfolgsfaktoren:
- Merkmalsparität ist von größter Bedeutung – Verwenden Sie Validierungsskripte, um zu überprüfen, ob die MQL5-Merkmale genau mit Python übereinstimmen. Sogar kleine Diskrepanzen verstärken sich in den Ensemble-Vorhersagen
- Dokumentieren Sie alles – Speichern Sie Feature-Metadaten, Testfälle und Modellversionen. Die Zukunft wird es der Ihnen danken
- Konservativer Start – Beginnen Sie mit kleineren Ensembles (50 Schätzer) und einfacheren Bäumen (max_depth=4-6) für schnellere Iterationen
- Stufenweiser Test – Validierung → Papierhandel → Kleine Live-Position → Vollständiger Einsatz
- Kontinuierliche Überwachung – Verfolgen Sie die Vorhersagezuverlässigkeit, die Signalhäufigkeit und vergleichen Sie die tatsächliche Leistung mit den Backtest-Erwartungen.
Modell Update Workflow:
# Step 1: Train new model with updated data clf_v2 = SequentiallyBootstrappedBaggingClassifier(...) clf_v2.fit(X_train_updated, y_train_updated) # Step 2: Validate against held-out test set score_v2 = clf_v2.score(X_test, y_test) assert score_v2 >= previous_score * 0.95, "New model performs worse!" # Step 3: Export with version tag model_file = f"sequential_bagging_v2_{datetime.now().strftime('%Y%m%d')}.onnx" onnx_model = convert_sklearn(clf_v2, initial_types=initial_type) with open(model_file, "wb") as f: f.write(onnx_model.SerializeToString()) # Step 4: Run backtests comparing v1 vs v2 # Step 5: Deploy to paper trading first # Step 6: Monitor for 1-2 weeks before live deployment # Step 7: Keep v1 as fallback
Wann umgeschult werden sollte:
- Regelmäßiger Zeitplan – monatliche oder vierteljährliche Nachschulung mit erweitertem Datensatz
- Leistungsverschlechterung – Wenn die Live-Genauigkeit um mehr als 10 % unter die Backtest-Erwartungen fällt
- Änderung des Marktregimes – Größere Veränderungen der Volatilität, der Korrelationen oder der Marktstruktur
- Funktionserweiterungen – Beim Hinzufügen neuer technischer Indikatoren oder Datenquellen
Leitfaden zur Fehlerbehebung
Problem: Vorhersagen sind zufällig (alle nahe 0,5)
Die Diagnose:
# Check if features have variance print("Feature statistics:") print(pd.DataFrame(X_train).describe()) # Check class balance print("Class distribution:", np.bincount(y_train)) # Verify model actually learned something print("Training accuracy:", clf.score(X_train, y_train)) print("Test accuracy:", clf.score(X_test, y_test))
Lösungen:
- Stellen Sie sicher, dass die Merkmale nicht nur aus Nullen oder Konstanten bestehen.
- Prüfung auf schweres Klassenungleichgewicht (sample_weight berücksichtigen)
- Überprüfen Sie die Konvergenz des Modells während des Trainings
- Erhöhen Sie n_estimators oder die Baumtiefe, wenn die Anpassung zu gering ist.
Problem: MQL5-Vorhersagen unterscheiden sich deutlich von Python
Systematischer Ansatz zur Fehlersuche:
# 1. Print raw feature values from both systems # Python: print("Python features:", X_test[0]) # MQL5: Add to EA // Print all features before prediction string feat_str = ""; for(int i=0; i<ArraySize(feature_array); i++) { feat_str += StringFormat("[%d]:%.6f ", i, feature_array[i]); } Print("MQL5 features: ", feat_str); # 2. Check intermediate calculations # Add debug prints to FeatureEngine.mqh for BB, RSI, etc. Print("BB Upper:", bb_upper, " Middle:", bb_middle, " Lower:", bb_lower); Print("RSI:", rsi_value); # 3. Verify data alignment # Ensure MQL5 arrays are time-series ordered (most recent first) # Python typically uses oldest first
Problem: Modell lädt langsam oder EA friert ein
Optimierungsstrategien:
// 1. Load model once in OnInit, not on every tick // ✓ Correct: int OnInit() { g_model_handle = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT); } // ✗ Wrong: void OnTick() { long model = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT); // DON'T DO THIS! } // 2. Reduce model complexity # Python: Train lighter model clf_fast = SequentiallyBootstrappedBaggingClassifier( n_estimators=30, # Reduced from 100 max_depth=3 # Reduced from 6 ) // 3. Predict only on new bar, not every tick datetime current_bar = iTime(_Symbol, _Period, 0); if(current_bar == g_last_bar_time) return; g_last_bar_time = current_bar;
Beispiel für den Einsatz in der realen Welt
Hier finden Sie einen vollständigen Zeitplan für den Produktionseinsatz, der erfolgreich funktioniert hat:
| Woche | Tätigkeit | Erfolgskriterien |
|---|---|---|
| 1 | Modell trainieren, ONNX exportieren, Vorhersagen mit Python validieren | Maximale Vorhersagedifferenz < 0,01% |
| 2 | Strategietester Backtest mit 2+ Jahren historischen Daten | Sharpe > 1,5, Max DD < 15% |
| 3 | Vorwärtstest auf dem Demokonto mit voller Positionsgröße | 20+ ausgeführte HandelsgeschäfteTrades, keine technischen Fehler |
| 4-5 | Live-Handel mit 10% des Zielkapitals | Performance innerhalb von 20% der Backtest-Erwartungen |
| 6-8 | Schrittweise Skalierung auf 50% und dann 100% des Zielkapitals | Konsistente Leistung, kein unerwartetes Verhalten |
| 9+ | Vollständige Produktion mit monatlichen Leistungsüberprüfungen | Monatlich erneutes Trainieren, jährliche Modellbewertung |
Schlussfolgerung: ONNX-Checkliste für die Bereitstellung
Bevor Sie mit Ihrem sequentiellen Bootstrap-Modell in MQL5 in Betrieb gehen:
Einsatzvorbereitung (Python):
- ☐ Das Modell erreicht eine akzeptable Leistung außerhalb der Stichprobe
- ☐ ONNX-Export erfolgreich (skl2onnx)
- ☐ ONNX-Vorhersagen gegenüber dem ursprünglichen Modell verifiziert
- ☐ Metadaten der Merkmale dokumentiert (Namen, Reihenfolge, Berechnungen)
- ☐ Testfälle mit bekannten Ein- und Ausgängen erstellt
- ☐ Modelldatei versioniert und gesichert
Implementierung (MQL5):
- ☐ FeatureEngine.mqh entspricht genau den Python-Berechnungen
- ☐ Validierungsskript besteht alle Testfälle
- ☐ Modell wird bei OnInit erfolgreich geladen
- ☐ Vorhersagen werden ohne Fehler ausgeführt
- ☐ Parameter für das Risikomanagement konfiguriert
- ☐ Protokollierung und Überwachung implementiert
Tests:
- ☐ Strategie-Tester Backtest abgeschlossen (2+ Jahre)
- ☐ Vorabtest auf Demokonto (2+ Wochen)
- ☐ Leistungskennzahlen innerhalb akzeptabler Bereiche
- ☐ Bearbeitung von Grenzfällen (Nullvolumen, Marktlücken usw.)
Produktion:
- ☐ Start mit minimalem Kapital (10%)
- ☐ Tägliche Überwachung während der ersten 2 Wochen
- ☐ Wöchentliche Leistungsbeurteilung
- ☐ Zeitplan für die Aktualisierung des Modells festgelegt
- ☐ Dokumentiertes Ausweichverfahren
Mit sequentiellem Bootstrap, das zeitliche Lecks im Training behebt, und ONNX, das eine zuverlässige Bereitstellung auf MQL5 ermöglicht, haben Sie jetzt eine vollständige Pipeline von der Forschung bis zur Produktion. Durch diese Kombination wird sichergestellt, dass sich die solide Ausbildung Ihres Modells in einer verlässlichen Performance beim Live-Handel niederschlägt.
Anlagen
| Dateiname | Beschreibung |
|---|---|
| bootstrap_mc.py | Enthält Monte-Carlo-Simulationscode zum Vergleich der Effektivität von Standard- und sequentiellen Bootstraps. Erzeugt zufällige Zeitreihendaten und führt Experimente durch, um die Eindeutigkeitsmetriken für beide Methoden zu messen. |
| bootstrapping.py | Kernimplementierung von sequentiellen Bootstrapping-Algorithmen. Enthält Funktionen zur Erstellung von Indikatormatrizen, zur Berechnung von Einzigartigkeitswerten und zur Durchführung optimierter sequentieller Stichproben unter Verwendung der Numba-Beschleunigung. |
| misc.py | Sammlung von Utility-Funktionen wie Datenformatierung, Speicheroptimierung, Logging-Dekoratoren, Leistungsüberwachung, Time-Helper und Dateikonvertierungs-Utilities. |
| multiprocess.py | Implementiert Dienstprogramme zur Parallelverarbeitung für effiziente Berechnungen. Enthält Funktionen für die Auftragspartitionierung, Fortschrittsberichte und die parallele Ausführung auf mehreren CPU-Kernen. |
| sb_bagging.py | Implementiert Sequentially Bootstrapped Bagging Classifier und Regressor – Ensemble-Methoden, die sequentielles Bootstrap Sampling mit dem Scikit-Learn Bagging Framework für Financial ML integrieren. |
Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/20059
Warnung: Alle Rechte sind von MetaQuotes Ltd. vorbehalten. Kopieren oder Vervielfältigen untersagt.
Dieser Artikel wurde von einem Nutzer der Website verfasst und gibt dessen persönliche Meinung wieder. MetaQuotes Ltd übernimmt keine Verantwortung für die Richtigkeit der dargestellten Informationen oder für Folgen, die sich aus der Anwendung der beschriebenen Lösungen, Strategien oder Empfehlungen ergeben.
Markets Positioning Codex in MQL5 (Teil 1): Bitwise Learning für Nvidia
Bivariate Copulae in MQL5 (Teil 2): Implementierung archimedischer Copulae in MQL5
Statistische Arbitrage durch kointegrierte Aktien (Teil 7): Punktesystem 2
Blaupause für maschinelles Lernen (Teil 4): Die versteckte Schwachstelle in Ihrer ML-Pipeline – Gleichzeitigkeit der Kennzeichnungen
- Freie Handelsapplikationen
- Über 8.000 Signale zum Kopieren
- Wirtschaftsnachrichten für die Lage an den Finanzmärkte
Sie stimmen der Website-Richtlinie und den Nutzungsbedingungen zu.