English 日本語
preview
MetaTrader 5 Machine Learning Blueprint (Teil 6): Entwicklung eines produktionsgerechten Caching-Systems

MetaTrader 5 Machine Learning Blueprint (Teil 6): Entwicklung eines produktionsgerechten Caching-Systems

MetaTrader 5Handelssysteme |
21 1
Patrick Murimi Njoroge
Patrick Murimi Njoroge


Einführung

In den vorangegangenen Teilen der Serie „Machine Learning Blueprint“ haben wir eine robuste Pipeline für maschinelles Lernen im Finanzbereich aufgebaut – von der Sicherstellung der Datenintegrität gegen Vorausschauverzerrungen bis hin zur Implementierung anspruchsvoller Kennzeichnungsmethoden wie Triple-Barrier und Trend-Scanning. Wenn jedoch unsere Strategien oder ML-Modelle – wie bei dem sequentiell „gebootstrappten“ Random Forests – immer komplexer werden, stehen wir vor einer entscheidenden Herausforderung: Rechenengpässe, die eine schnelle Iteration verhindern.

Sie haben die vielversprechende Strategie der Rückkehr zum Mittelwert entwickelt. Ihr Backtest zeigt eine Sharpe Ratio von 1,8, konsistente Gewinne über alle Marktregimes hinweg und saubere Aktienkurven. Sie sind bereit, die Parameter zu optimieren, verschiedene Rückblickszeiträume zu testen und mit einer Vorwärtstest-Analyse zu validieren.

Dann schlägt die Realität zu.  

Jede Parameterkombination benötigt 6 Minuten für die Berechnung. Sie wollen 50 Varianten testen. Das sind 5 Stunden Wartezeit. Ändern Sie Ihr Feature Engineering? Weitere 5 Stunden. Einen neuen Indikator hinzufügen? Sie haben die Idee.

Die wahren Kosten entstehen nicht nur durch verstrichene Zeit, sondern auch durch verpassten Chancen. Während Sie auf Berechnungen warten, können Sie nicht iterieren, keine neuen Ideen testen und Ihren Vorsprung nicht ausbauen. Ihr Entwicklungstempo kommt ins Stocken.

Das ist das Problem, das meine frühen Handelsstrategien zunichte gemacht hat. Ich habe ganze Wochenenden damit verbracht, Backtests durchzuführen, nur um am Montagmorgen festzustellen, dass ich einen einfachen Fehler in meinem Code gemacht habe. Wieder Warten. Mehr Frustration.

Es musste einen besseren Weg geben.

Dieser Artikel zeigt Ihnen, wie Sie diesen Engpass durch intelligentes Caching beseitigen können. Am Ende werden Sie wissen, wie man:

  • Verkürzung der Strategieoptimierungszeit von Stunden auf Minuten
  • Testen von 50+ Parameterkombinationen in der Zeit, die früher 5 benötigten
  • Iteration von Merkmalen und Modellen, ohne alles neu zu berechnen

Beginnen wir mit dem Problem, mit dem Sie gerade konfrontiert sind.


Die Visualisierung der Berechnungspipeline

Vor der Implementierung des Caching ist es wichtig zu verstehen, wie viel Zeit jeder Vorgang in der ML-Pipeline tatsächlich benötigt. Das nachstehende Diagramm zeigt den gesamten Weg vom Laden der Ticks bis zum abschließenden Backtest – und wie viele Sekunden für jeden Schritt ohne Caching aufgewendet werden.

OHNE CACHING

Schauen wir uns nun an, wie die gleiche Abfolge von Vorgängen mit aktiviertem AFML-Caching funktioniert. Beachten Sie, dass die meisten Schritte nicht mehr neu berechnet, sondern sofort geladen werden. Der Geschwindigkeitsunterschied ist aus dem folgenden Diagramm ersichtlich.

Mit AFML-Caching


Warum generisches Caching für Financial ML versagt

Wir werden sehen, warum die offensichtliche Lösung nicht funktioniert.

Es mag einleuchtend erscheinen, einfach den Standard lru_cache zu verwenden. Im Folgenden wird anhand eines einfachen Beispiels gezeigt, dass der in Python eingebaute Cache bei der Arbeit mit Finanzdatenstrukturen wie Pandas Series und DataFrame vollständig versagt.

# This is USELESS for financial data. Don't do this.
from functools import lru_cache

@lru_cache(maxsize=128)
def compute_rsi(prices, period=14):
    delta = prices.diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)
    
    avg_gain = gain.ewm(alpha=1/period, adjust=False).mean()
    avg_loss = loss.ewm(alpha=1/period, adjust=False).mean()
    
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return rsi

compute_rsi(bb_df.close, period=14)

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[25], line 18
     15     rsi = 100 - (100 / (1 + rs))
     16     return rsi
---> 18 compute_rsi(bb_df.close, period=14)

TypeError: unhashable type: 'Series'

Python bietet zwar functools.lru_cache, ist aber für ML im Finanzbereich grundsätzlich ungeeignet:

  1. Der einfache Speicher– Cache verschwindet, wenn Python beendet wird.
  2. Keine Persistenz über Sitzungen hinweg – nach jedem Neustart wird alles neu gestartet.
  3. Keine automatisches ungültig machen des Cache bei Codeänderungen.
  4. Schwache Behandlung von NumPy/Pandas-Hashing-Problemen mit Arrays.
  5. Kein verteiltes Caching – der Cache kann nicht prozessübergreifend genutzt werden.
  6. Keine Kenntnis von Finanzdaten – versteht weder Zeitstempel noch Vorausschau.


Teil I: Architektonische Fundamente

Die wichtigsten Gestaltungsprinzipien


Unser AFML-Caching-System basiert auf drei Grundpfeilern:

AFML-Cache-Architektur

Wir wollen verstehen, warum es die einzelnen Komponenten gibt und wie sie zusammenarbeiten.

Herausforderung #1: Persistente Speicherung

Beginnen wir mit der ersten und offensichtlichsten Einschränkung: Der Standard-Cache befindet sich nur im RAM. Sobald wir Python neu starten, verschwinden alle Ergebnisse. Hier ist eine kurze Demonstration, wie der „normale Weg“ aussieht und warum er in realen ML-Pipelines nutzlos ist.

# Traditional approach - memory only
from functools import lru_cache

@lru_cache(maxsize=128)
def compute_features(data):
    # Expensive computation
    return features  # Lost on restart!

AFML löst dieses Problem auf einfache Weise: Wir verlagern den Cache auf die Festplatte. Joblib behält die Berechnungsergebnisse zwischen den Python-Läufen bei, sodass Sie auch nach einem Neustart sofort die zuvor berechneten Daten erhalten. Nachstehend finden Sie ein Implementierungsbeispiel.

# AFML approach - persistent
from joblib import Memory
from appdirs import user_cache_dir


memory = Memory(location=user_cache_dir("afml"), verbose=0)

@memory.cache
def compute_features(data):
    # Expensive computation
    return features  # Saved to disk automatically!

Herausforderung #2: Hashing komplexer Finanzdaten-Strukturen

Und dann gibt es noch ein größeres Problem Finanzdaten können nicht mit Standardmethoden verschlüsselt werden. Pandas DataFrames, NumPy-Arrays und Datumsangaben sind alle „unhashable“, und ein normaler Cache funktioniert einfach nicht. Hier ist ein Beispiel dafür, wie sich dies auswirkt.

# This fails!
import pandas as pd
from functools import lru_cache

@lru_cache
def bad_cache_example(df: pd.DataFrame):
    return df.mean()

df = pd.DataFrame({'price': [100, 101, 102]})
bad_cache_example(df)  # TypeError: unhashable type: 'DataFrame'

Um sicherzustellen, dass der Cache korrekt funktioniert, erstellt AFML seinen eigenen Schlüsselgenerator. Es kann DataFrames „verstehen“: ihre Struktur, Datentypen, Spalten, Indizes und vor allem den Zeitbereich. So sieht eine nutzerdefinierte Hashing-Implementierung aus.

# afml/cache/robust_cache_keys.py

class CacheKeyGenerator:
    """Generate collision-resistant cache keys for ML data structures."""
    
    @staticmethod
    def _hash_dataframe(df: pd.DataFrame, name: str) -> str:
        """
        Hash DataFrame with attention to:
        1. Shape and structure
        2. Column names and types
        3. Index (especially DatetimeIndex)
        4. Actual data content
        """
        parts = [
            f"shape_{df.shape}",
            f"cols_{hashlib.md5(str(tuple(df.columns)).encode()).hexdigest()[:8]}",
            f"dtypes_{hashlib.md5(str(tuple(df.dtypes)).encode()).hexdigest()[:8]}",
        ]
        
        # Special handling for DatetimeIndex (critical for financial data!)
        if isinstance(df.index, pd.DatetimeIndex):
            # Hash: start date, end date, and length
            # This catches both data changes AND temporal shifts
            parts.append(f"idx_dt_{df.index[0]}_{df.index[-1]}_{len(df.index)}")
        else:
            idx_hash = hashlib.md5(str(tuple(df.index)).encode()).hexdigest()[:8]
            parts.append(f"idx_{idx_hash}")
        
        # For large DataFrames, sample for performance
        if df.size > 10000:
            # Sample ~100 rows evenly distributed
            sample_rows = df.iloc[::max(1, len(df) // 100)]
            content_hash = hashlib.md5(sample_rows.values.tobytes()).hexdigest()[:8]
        else:
            # Hash full content for small DataFrames
            content_hash = hashlib.md5(df.values.tobytes()).hexdigest()[:8]
        
        parts.append(f"data_{content_hash}")
        
        return f"{name}_df_{'_'.join(parts)}"

Um den Wert des nutzerdefinierten Hashings zu verstehen, vergleichen wir, wie Standard-Python Daten zwischenspeichert und wie AFML dies tut. Im Folgenden wird veranschaulicht, warum der Standardansatz zu einer Verzerrung der Vorausschau führt, während AFML dies nicht tut.

Hash-Vergleich des DataFrame

Herausforderung #3: Automatisches Ungültigmachen des Cache bei Codeänderungen

Selbst ein perfekter Cache wird nutzlos, wenn Sie Ihren Code bearbeiten und ungültige, veraltete Ergebnisse im Cache verbleiben. AFML verfolgt automatisch alle Änderungen an Funktionen und löscht nur den Teil des Caches, der nicht mehr aktuell ist. Schauen wir uns an, wie dies umgesetzt wird.

AFML speichert einen Hash des Quellcodes der Funktion und das Datum der letzten Änderung der Datei. Wenn sich etwas ändert, wird der Cache für diese Funktion automatisch zurückgesetzt. Dieser Mechanismus funktioniert intern folgendermaßen.

# afml/cache/selective_cleaner.py
class FunctionTracker:
    """
    Tracks function signatures and source code hashes.
    Automatically detects when functions change.
    """
    
    def track_function(self, func) -> bool:
        """
        Returns True if function has changed since last tracking.
        """
        func_name = f"{func.__module__}.{func.__qualname__}"
        
        # Get current function metadata
        current_hash = self._get_function_hash(func)      # Hash source code
        current_mtime = self._get_file_mtime(func)        # File modification time
        
        # Compare with stored metadata
        stored = self.tracked_functions.get(func_name, {})
        stored_hash = stored.get("hash")
        stored_mtime = stored.get("mtime")
        
        # Function changed if EITHER hash or mtime differs
        has_changed = (
            current_hash != stored_hash or 
            current_mtime != stored_mtime or 
            stored_hash is None  # New function
        )
        
        if has_changed:
            # Update tracking data
            self.tracked_functions[func_name] = {
                "hash": current_hash,
                "mtime": current_mtime,
                "module": func.__module__,
            }
            self._save_tracking_data()
            
        return has_changed
    
    def _get_function_hash(self, func) -> Optional[str]:
        """Hash function source code."""
        try:
            source = inspect.getsource(func)
            return hashlib.md5(source.encode()).hexdigest()
        except (OSError, TypeError):
            return None

Um dies in Aktion zu sehen, betrachten Sie ein reales Szenario: Eine Funktion enthält zunächst einen Fehler, den wir dann beheben – AFML erkennt die Änderung automatisch und löscht nur den relevanten Teil des Cache.

Smart Cacheable: Automatisches Ungültigmachen


Teil II: Der robuste zwischenspeicherfähige Datenraumgestalter (Decorator)

Alle AFML-Funktionen sind in einem einzigen leistungsstarken Gestalter des Datenraums der Fabrik zusammengefasst. Er erstellt die gewünschte Art der Zwischenspeicherung: regulär, temporär, datenverfolgend, codeverfolgend usw. Hier ist der Quellcode für diese Fabrik.

# afml/cache/robust_cache_keys.py

def create_robust_cacheable(
    track_data_access: bool = False,
    dataset_name: Optional[str] = None,
    purpose: Optional[str] = None,
    use_time_awareness: bool = False,
):
    """
    Factory function to create robust cacheable decorators.
    This is where all the magic comes together.
    """
    from functools import wraps
    from . import cache_stats, memory
    from .cache_monitoring import get_cache_monitor
    
    def decorator(func):
        func_name = f"{func.__module__}.{func.__qualname__}"
        cached_func = memory.cache(func)  # Use joblib for persistence
        seen_signatures = set()           # Track cache hits/misses
        monitor = get_cache_monitor()     # Performance monitoring
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            # Step 1: Generate cache key
            try:
                if use_time_awareness:
                    cache_key = TimeSeriesCacheKey.generate_key_with_time_range(
                        func, args, kwargs
                    )
                else:
                    cache_key = CacheKeyGenerator.generate_key(func, args, kwargs)
                
                # Step 2: Track hit/miss
                if cache_key in seen_signatures:
                    cache_stats.record_hit(func_name)
                    is_hit = True
                else:
                    cache_stats.record_miss(func_name)
                    seen_signatures.add(cache_key)
                    is_hit = False
                    
            except Exception as e:
                logger.warning(f"Cache key generation failed: {e}")
                cache_stats.record_miss(func_name)
                cache_key = None
                is_hit = False
            
            # Step 3: Track data access if requested (prevent look-ahead bias)
            if track_data_access:
                try:
                    from .data_access_tracker import get_data_tracker
                    _track_dataframe_access(
                        get_data_tracker(), args, kwargs, dataset_name, purpose
                    )
                except Exception as e:
                    logger.warning(f"Data tracking failed: {e}")
            
            # Step 4: Track access time (for monitoring)
            monitor.track_access(func_name)
            
            # Step 5: Execute function with timing
            start_time = time.time()
            try:
                result = cached_func(*args, **kwargs)
                
                # Track computation time for misses
                if not is_hit:
                    computation_time = time.time() - start_time
                    monitor.track_computation_time(func_name, computation_time)
                
                return result
                
            except (EOFError, pickle.PickleError, OSError) as e:
                # Handle cache corruption gracefully
                logger.warning(f"Cache corruption: {type(e).__name__} - recomputing")
                
                # Clear corrupted cache
                if cache_key is not None:
                    _clear_corrupted_cache(cached_func, cache_key)
                
                # Execute function directly
                return func(*args, **kwargs)
                
        wrapper._afml_cacheable = True
        return wrapper
    
    return decorator

# Standard decorators
robust_cacheable = create_robust_cacheable(use_time_awareness=False)
time_aware_cacheable = create_robust_cacheable(use_time_awareness=True)

# Data tracking decorators
data_tracking_cacheable = lambda dataset_name, purpose: create_robust_cacheable(
    track_data_access=True, dataset_name=dataset_name, purpose=purpose, use_time_awareness=False
)

time_aware_data_tracking_cacheable = lambda dataset_name, purpose: create_robust_cacheable(
    track_data_access=True, dataset_name=dataset_name, purpose=purpose, use_time_awareness=True
)



Teil III: Erweiterte Caching-Muster

Muster #1: Zeitbewusste Zwischenspeicherung für die Analyse des Vorwärtstests

Bei der Vorwärtstests-Validierung ist es wichtig, dass der Cache zwischen verschiedenen Zeiträumen unterscheidet. Andernfalls kann es leicht zu Kollisionen und falschen Ergebnissen kommen. Das folgende Diagramm zeigt, wie dies in der Praxis funktioniert.

Time Series Data (2024):
─────────────────────────────────────────────────────────────────────
Jan  Feb  Mar  Apr  May  Jun  Jul  Aug  Sep  Oct  Nov  Dec
├────┼────┼────┼────┼────┼────┼────┼────┼────┼────┼────┼────┤
│    │    │    │    │    │    │    │    │    │    │    │    │
▼────▼────▼────▼────▼────▼────▼────▼────▼────▼────▼────▼────▼

Walk-Forward Splits:
─────────────────────────────────────────────────────────────────────

Split 1:
        Train Period: Jan-Mar              Test Period: Apr
        ├─────────────────────────────┤    ├───────┤

Split 2:
            Train Period: Feb-Apr              Test Period: May
            ├─────────────────────────────┤    ├───────┤

Split 3:
                Train Period: Mar-May              Test Period: Jun
                ├─────────────────────────────┤    ├───────┤

Das folgende Beispiel zeigt, wie die Verwendung der regulären Zwischenspeicherung mit zwei unterschiedlichen Zeiträumen denselben Hash ergeben kann. Dies führt dazu, dass das Modell auf den falschen Daten trainiert wird, und es kommt zu einer Verzerrung der Vorauschschau.

@robust_cacheable  # Generic caching
def train_model(data, params):
    return model

Split 1: data = data['2024-01':'2024-03']
         Cache Key: hash(data.values)
         └──> "a7f3e8d92c1b..."        ◄──┐
                                          │
Split 2: data = data['2024-02':'2024-04'] │
         Cache Key: hash(data.values)     │
         └──> "a7f3e8d92c1b..."        ◄──┘ COLLISION! 
              └──> Cache hit on DIFFERENT time period
              └──> Model trained on wrong data!

Der zeitbewusste Cache fügt bei der Schlüsselgenerierung eine Zeitspanne hinzu, wodurch solche Kollisionen vollständig vermieden werden. Sehen Sie, wie der Cache jetzt korrekt zwischen den Zeiträumen unterscheidet.

@time_aware_cacheable  # Includes temporal info
def train_model(data, params):
    return model

Split 1: data = data['2024-01':'2024-03']
         Cache Key: hash(data.values + "2024-01_2024-03")
         └──> "a7f3_time_2024-01_2024-03"
         
Split 2: data = data['2024-02':'2024-04']
         Cache Key: hash(data.values + "2024-02_2024-04")
         └──> "b9e4_time_2024-02_2024-04"  ← Different!
              └──> Cache miss (correct)
              └──> Train new model for this period ✓

Split 3: data = data['2024-03':'2024-05']
         Cache Key: hash(data.values + "2024-03_2024-05")
         └──> "c7d1_time_2024-03_2024-05"  ← Different!
              └──> Cache miss (correct)
              └──> Train new model for this period ✓

Hier ist die Implementierung einer Klasse, die Zeitstempel zu Cache-Schlüsseln hinzufügt.

class TimeSeriesCacheKey(CacheKeyGenerator):
    """Extended cache key generator with time-series awareness."""
    
    @staticmethod
    def generate_key_with_time_range(
        func, 
        args: tuple, 
        kwargs: dict, 
        time_range: Tuple[pd.Timestamp, pd.Timestamp] = None
    ) -> str:
        """
        Generate cache key that includes time range information.
        Critical for preventing temporal data leakage.
        """
        base_key = CacheKeyGenerator.generate_key(func, args, kwargs)
        
        if time_range is None:
            # Try to extract time range from data
            time_range = TimeSeriesCacheKey._extract_time_range(args, kwargs)
        
        if time_range:
            start, end = time_range
            time_hash = f"time_{start}_{end}"
            return f"{base_key}_{time_hash}"
        
        return base_key

# Usage in walk-forward validation
@time_aware_cacheable
def train_on_period(data: pd.DataFrame, params: dict) -> Model:
    """
    Train model on specific time period.
    Cache is automatically keyed by time range!
    """
    model = RandomForestClassifier(**params)
    model.fit(data.drop('target', axis=1), data['target'])
    return model

# Walk-forward loop
for train_start, train_end, test_start, test_end in walk_forward_splits:
    # Each period is cached independently
    train_data = data.loc[train_start:train_end]
    model = train_on_period(train_data, params)  # Cached per period
    
    test_data = data.loc[test_start:test_end]
    predictions = model.predict(test_data)

Muster #2: Kreuzvalidierungs-Caching mit Sklearn-Schätzern

Das Caching der Kreuzvalidierungen ist ein separates Thema. Sklearn-Klassifikatoren enthalten interne Zustände, die nicht direkt gehasht werden können. AFML löst dieses Problem, indem es nur den Modelltyp und seine Parameter hasht. Die Schlüsselfunktion ist unten aufgeführt.

# afml/cache/cv_cache.py

def _hash_classifier(clf: BaseEstimator) -> str:
    """
    Generate stable hash for sklearn classifier.
    
    KEY INSIGHT: Hash the type + parameters, NOT the trained state!
    """
    try:
        clf_type = type(clf).__name__
        params = clf.get_params(deep=True)
        
        # Filter out non-serializable params
        serializable_params = {}
        for k, v in params.items():
            try:
                json.dumps(v)  # Test if JSON serializable
                serializable_params[k] = v
            except (TypeError, ValueError):
                # Use type name for non-serializable params
                serializable_params[k] = f"<{type(v).__name__}>"
        
        # Create stable hash
        param_str = json.dumps(serializable_params, sort_keys=True)
        combined = f"{clf_type}_{param_str}"
        return hashlib.md5(combined.encode()).hexdigest()[:12]
        
    except Exception as e:
        logger.debug(f"Failed to hash classifier: {e}")
        return f"clf_{type(clf).__name__}_{id(clf)}"

So sieht ein Gestalter des Datenraumes für die Zwischenspeicherung von Kreuzvalidierungsergebnissen aus. Es spart enorm viel Zeit bei der Auswahl der Hyperparameter.

@cv_cacheable
def ml_cross_val_score(
    classifier: BaseEstimator,
    X: pd.DataFrame,
    y: pd.Series,
    cv_gen,  # PurgedKFold, TimeSeriesSplit, etc.
    sample_weight_train: Optional[np.ndarray] = None,
    scoring: str = 'neg_log_loss'
) -> np.ndarray:
    """
    Cross-validation with proper caching.
    
    Caches based on:
    - Classifier type and parameters (not trained state)
    - Data content (X, y)
    - CV generator configuration
    - Sample weights
    """
    scores = []
    
    for train_idx, test_idx in cv_gen.split(X):
        X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
        y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
        
        # Train fresh model (not from cache)
        model = clone(classifier)
        
        if sample_weight_train is not None:
            model.fit(X_train, y_train, 
                     sample_weight=sample_weight_train[train_idx])
        else:
            model.fit(X_train, y_train)
        
        score = model.score(X_test, y_test)
        scores.append(score)
    
    return np.array(scores)

Muster #3: Verhinderung der Kontamination von Testkonstellationen

Bei der Entwicklung von maschinellen Lernmodellen ist einer der am häufigsten übersehenen Fehler die versehentliche Verwendung von Testdaten während des Trainings. AFML überwacht jeden Zugriff auf den Datensatz und macht Sie auf eventuelle Lecks aufmerksam.

So zeichnet AFML jeden Datenzugriff auf, wobei der Zeitbereich, der Zweck (trainieren, testen, validieren) und die Quelle des Aufrufs erfasst werden. Auf der Grundlage dieser Protokolle erstellt das System dann einen Bericht über mögliche Lecks.

# afml/cache/data_access_tracker.py

class DataAccessTracker:
    """
    Track every data access to detect test set contamination.
    
    This is CRITICAL for preventing data snooping bias.
    """
    
    def log_access(
        self,
        dataset_name: str,
        start_date: pd.Timestamp,
        end_date: pd.Timestamp,
        purpose: str,  # 'train', 'test', 'validate', 'optimize'
        data_shape: Optional[Tuple[int, int]] = None,
    ):
        """Log a dataset access with full temporal metadata."""
        entry = {
            "timestamp": datetime.now().isoformat(),
            "dataset": dataset_name,
            "start_date": str(start_date),
            "end_date": str(end_date),
            "purpose": purpose,
            "data_shape": str(data_shape) if data_shape else None,
            "caller": self._get_caller_info(),  # Stack trace
        }
        
        self.access_log.append(entry)
        logger.debug(
            f"Logged access: {dataset_name} [{start_date} to {end_date}] "
            f"for {purpose}"
        )
# Usage with caching decorator
@time_aware_data_tracking_cacheable(dataset_name="eur_usd_2024", purpose="test")
def evaluate_on_test_set(test_data: pd.DataFrame, model) -> dict:
    """
    Evaluate model on test set.
    Access is logged automatically!
    """
    predictions = model.predict(test_data)
    metrics = calculate_metrics(predictions, test_data['target'])
    return metrics

Um während der Modellentwicklung auf Kontaminationen zu prüfen, rufen wir print_contamination_report() auf, wie gezeigt:

# Later, check for contamination
from afml.cache import print_contamination_report

print_contamination_report()

Bericht über die Datenkontaminierung


Teil IV: Leistungsüberwachung und -optimierung

Überwachung des Cache-Zustands


AFML bietet auch eine integrierte Überwachung des Cache-Zustands:

  • welche Funktionen am häufigsten zwischengespeichert werden
  • wo Cache-Fehlversuche auftreten
  • wie groß der Cache ist
  • ob es irgendwelche verdächtigen Situationen gibt
Nachstehend finden Sie ein Beispiel für einen Bericht:

from afml.cache import print_cache_health


print_cache_health()

Zustandsbericht des Cache


Teil V: Integration in bestehende Projekte

Um das System zu nutzen, importieren wir einfach das Modul und verwenden den gewünschten Datenraumgestalter. Es ist keine komplizierte Konfiguration erforderlich – hier ein Beispiel dafür, wie es in einem echten Projekt aussieht.

Führen Sie den folgenden Code aus, um von Ihrem Terminal aus zu klonen, oder laden Sie cache.zip herunter:

git clone https://github.com/pnjoroge54/Machine-Learning-Blueprint.git
cd Machine-Learning-Blueprint/afml/cache
Kopieren Sie dann die Cache-Module in Ihr eigenes Paket:
my_package/
├── cache/
│   ├── __init__.py
│   ├── backtest_cache.py
│   ├── cache_monitoring.py
│   ├── cv_cache.py
│   ├── data_access_tracker.py
│   ├── robust_cache_keys.py
│   ├── selective_cleaner.py
│   └── mql5_bridge.py

Ausführliche Informationen zur Implementierung dieses Caching-Systems in Ihrem Projekt finden Sie in user_guide.py.


Schlussfolgerung: Vom Forschungstempo zur Ausführungsgeschwindigkeit

Das AFML-Caching-System verändert die ML-Entwicklung im Finanzbereich grundlegend, indem es die beiden kritischen Engpässe beseitigt, die Forschung und Produktion trennen: Iterationsgeschwindigkeit und Ausführungslatenz.

Geschwindigkeit der Forschung: Die Grundlage der Alpha-Generation

In der Forschungsphase ermöglicht unsere Caching-Architektur eine noch nie dagewesene Erforschung:

  • Schnelle Merkmalsentwicklung: Testen Sie Dutzende von Kombinationen technischer Indikatoren, RSI-Perioden und Volatilitätsberechnungen, ohne die Basistransformationen neu berechnen zu müssen. Das System speichert auf intelligente Weise Zwischenergebnisse, sodass Sie bei der Merkmalsauswahl iterieren können, anstatt auf die Berechnung zu warten.

  • Ausführliche Strategievalidierung: Führen Sie die PurgedKFold-Kreuzvalidierung über Hunderte von Parameterkombinationen durch. Die Merkmale und Kennzeichnungen jedes Datenfolds bleiben im Cache gespeichert und ermöglichen rigorose Backtests im großen Maßstab ohne die zeitlichen Datenverluste, die bei herkömmlichen Ansätzen auftreten.

  • Analyse mehrerer Zeitrahmen: Experimentieren Sie mit komplexen Merkmalsinteraktionen in 1-Minuten-, 5-Minuten- und stündlichen Zeitrahmen. Die zeitabhängige Zwischenspeicherung stellt sicher, dass die Berechnungen in jeder Periode unabhängig und reproduzierbar bleiben.

  • Optimierung der Kennzeichnungsstrategie: Vergleich von Triple-Barrier-, Trend-Scanning- und Meta-Labeling-Ansätzen ohne Neuberechnung von teuren Balkenstichproben.

Die komplette Pipeline: Forschung bis Ausführung

Unsere Caching-Architektur ermöglicht einen nahtlosen Übergang von der experimentellen Forschung zum produktiven Handel:

  1. Forschungsphase: Nutzung des reichhaltigen Ökosystems von Python mit AFML-Caching für schnelle Experimente und Validierung
  2. Modell Export: Konvertierung validierter Modelle in das ONNX-Format zur abhängigkeitsfreien Bereitstellung
  3. Migration der Merkmals-Pipeline: Implementierung kritischer Merkmalsberechnungen nativ in MQL5 mit parallelem Caching
  4. Einsatz in der Produktion: Ausführung von Strategien mit einer Latenzzeit von Mikrosekunden unter Wahrung der Forschungsintegrität

Integrität der Daten: Der unsichtbare Vorteil

Über die Leistung hinaus verhindert die automatische Datenzugriffsverfolgung des Systems die subtile Kontamination, die ML-Systeme in der Produktion unterminiert. Vollständige Prüfprotokolle stellen sicher, dass nicht versehentlich Testdaten optimiert werden, während die zeitliche Überwachung die Integrität der Validierung des Vorwärtstests gewährleistet.

Blick in die Zukunft: Die letzte Grenze

In der nächsten Folge werden wir die Pipeline mit weiteren Informationen ergänzen:

  1. Verwendung des ONNX-Models: Exportieren von Scikit-Learn und nutzerdefinierten Modellen, um sie nativ in MQL5 ohne Python-Abhängigkeiten auszuführen
  2. MQL5-Inferenzmaschine: Aufbau von Vorhersagesystemen mit extrem geringer Latenz, die im Mikrosekundenbereich arbeiten
  3. Hybrides Merkmals-Management: Entwerfen von intelligenten Systemen, bei denen komplexe Funktionen während der Forschung in Python berechnet werden, und migrieren Sie dann für die Produktion zu MQL5
  4. Modellüberwachung in Echtzeit: Implementierung der Erkennung von Abweichungen und eine Leistungsverfolgung innerhalb der MQL5-Ausführungsumgebung

Das Ergebnis ist ein komplettes Ökosystem, in dem Sie mit der Flexibilität von Python forschen, mit der Leistung von MQL5 arbeiten und während des gesamten Lebenszyklus wissenschaftliche Strenge wahren können. Dabei geht es nicht nur um schnellere Berechnungen, sondern auch um die Schaffung eines Rahmens, in dem hochentwickelte ML-Strategien tatsächlich auf realen Märkten funktionieren können.


Code-Repository

Der gesamte Code aus diesem Artikel ist im Repository Machine-Learning-Blueprint verfügbar. Führen Sie den folgenden Code aus, um von Ihrem Terminal aus zu klonen:

git clone https://github.com/pnjoroge54/Machine-Learning-Blueprint.git
cd Machine-Learning-Blueprint/afml/cache


Modul-Dateien

Modul-Datei Zweck Wesentliche Merkmale Wann zu verwenden
__init__.py Zentrales Initialisierungs- und Koordinierungsmodul - Initialisiert alle Cache-Subsysteme
- Einrichten der Numba- und Joblib-Zwischenspeicherung
- Bietet eine einheitliche API für alle Cache-Funktionen – Exportiert Komfortfunktionen
- Konfiguriert die Cache-Verzeichnisse
Importieren Sie dies, um auf alle Cache-Funktionen zuzugreifen. Es ist der einzige Einstiegspunkt für das gesamte System.
robust_cache_keys.py Erweiterte Cache-Schlüsselgenerierung für Finanzdaten - Hashes NumPy-Arrays korrekt
- Behandelt Pandas DataFrames mit DatetimeIndex
- Zeitserienbewusste Schlüsselerzeugung
- Sklearn Schätzer Hashing
- Verhindert zeitliche Datenverluste
Verwendung für jede Funktion, die finanzielle Zeitreihendaten, DataFrames oder ML-Modelle verarbeitet.
selective_cleaner.py Intelligentes System zur Cache-Ungültigkeitserklärung - Verfolgt Änderungen am Quellcode von Funktionen
- Automatische Cache-Löschung bei Codeänderungen
- Selektive Ungültigkeitserklärung nach Modul
- Größen- und altersabhängige Bereinigung
- smart_cacheable Datenraumgestalter
Während der aktiven Entwicklung verwenden, um Probleme mit veralteten Caches zu vermeiden. Wesentlich für die iterative Forschung.
data_access_tracker.py Verhindert die Kontamination des Testsatzes - Protokolliert jeden Zugriff auf einen Datensatz mit Zeitstempel
- Verfolgt die Nutzung von Training/Test/Validierung
- Erzeugt Kontaminationsberichte
- Erkennt Verzerrungen beim Daten-Snooping
- Stellt ein Prüfprotokoll bereit
Entscheidend für die Integrität der Forschung. Verwenden Sie diese Funktion, um alle Datenzugriffe während der Modellentwicklung zu verfolgen.
cv_cache.py Auf Kreuzvalidierung spezialisiertes Caching - Effiziente Zwischenspeicherung von CV-Ergebnissen
- Behandelt Sklearn-Schätzer korrekt
- Unterstützt PurgedKFold und nutzerdefinierte CV
- Trennt die Parameter des Schätzers vom Zustand
- Schnelle CV-Iterationen
Wird verwendet, wenn teure Kreuzvalidierungsexperimente durchgeführt werden. Beschleunigt die Hyperparameter-Optimierung drastisch.
backtest_cache.py Optimierung des Backtesting-Workflows - Zwischenspeicher für vollständige Backtest-Läufe
- Unterstützung von Vorwärtstest-Analysen
- Verfolgung der Parameteroptimierung
- Caching der Handelsebene
- Tools für den Ergebnisvergleich
Wesentlich für die Strategieentwicklung. Zwischenspeichern von Backtest-Ergebnissen zum effizienten Vergleich von Parametervariationen.
cache_monitoring.py Leistungsanalyse und Diagnose - Trefferquotenverfolgung pro Funktion
- Messung der Berechnungszeit
- Überwachung der Cache-Größe
- Gesundheitsberichte und Empfehlungen
- Analyse der Effizienz
Damit können Sie die Cache-Leistung verstehen und Optimierungsmöglichkeiten erkennen.
mlflow_integration.py Integration der Experimentverfolgung - Kombiniert Caching mit MLflow
- Automatische Versuchsprotokollierung
- Versionierung von Modellen
- Metrische Verfolgung
- Vergleich der Ergebnisse
Einsatz in produktiven Forschungsumgebungen zur Verfolgung von Experimenten bei gleichzeitiger Nutzung von Caching.
mql5_bridge.py Python-MQL5-Kommunikationsbrücke - Startet Python-Skripte aus MQL5
- Dateibasierte Signalisierung für sprachenübergreifende Ausführung
- Unterstützung der Modellinferenz in Echtzeit
- Integriert ML-Pipelines mit MetaTrader 5
Verwendung beim Einsatz von Python-basierten ML-Modellen in MQL5-Handelsumgebungen zur Automatisierung.
startup_script.py Umwelt-Bootstrap und Cache-Einrichtung - Initialisiert Cache-Verzeichnisse und Protokollierung
- Lädt Umgebungsvariablen
- Sorgt für einen reproduzierbaren Start
- Kann MLflow oder Überwachungseinstellungen auslösen
Verwenden Sie es zu Beginn eines ML- oder Handels-Workflows, um ein konsistentes und reproduzierbares Setup zu gewährleisten.
PythonBridgeEA.mq5 Chart-gebundene MQL5-Brücke zu Python - Dient als Brücke zwischen MQL5 und Python
- Verwendet Chart-Ereignisse und Dateisignalisierung
- Auslösen von Python-Skripten aus dem MetaTrader
- Synchronisiert die Handelslogik mit der Python-Ausgabe
Anhängen an einen Chart, um die Python-Kommunikation von MetaTrader 5 zu ermöglichen. Erforderlich für die ML-Integration in Echtzeit.


Übersetzt aus dem Englischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/en/articles/20302

Beigefügte Dateien |
PythonBridgeEA.mq5 (21.22 KB)
user_guide.py (22.37 KB)
cache.zip (45.84 KB)
Letzte Kommentare | Zur Diskussion im Händlerforum (1)
Kgothatso Nkuna
Kgothatso Nkuna | 27 Nov. 2025 in 08:45
Ich möchte automatisch bauen Roboter forex mt5
Die Übertragung der Trading-Signale in einem universalen Expert Advisor. Die Übertragung der Trading-Signale in einem universalen Expert Advisor.
In diesem Artikel wurden die verschiedenen Möglichkeiten beschrieben, um die Trading-Signale von einem Signalmodul des universalen EAs zum Steuermodul der Positionen und Orders zu übertragen. Es wurden die seriellen und parallelen Interfaces betrachtet.
Die „Griechen“ in Black-Scholes automatisieren: Fortgeschrittenes Scalping und Mikrostrukturhandel Die „Griechen“ in Black-Scholes automatisieren: Fortgeschrittenes Scalping und Mikrostrukturhandel
Gamma und Delta wurden ursprünglich als Risikomanagement-Tools zur Absicherung von Optionsrisiken entwickelt, entwickelten sich aber im Laufe der Zeit zu leistungsstarken Instrumenten für fortgeschrittenes Scalping, Orderflow-Modellierung und Mikrostrukturhandel. Heute dienen sie als Echtzeit-Indikatoren für die Preisempfindlichkeit und das Liquiditätsverhalten und ermöglichen es den Händlern, kurzfristige Schwankungen mit bemerkenswerter Präzision zu antizipieren.
Eine alternative Log-datei mit der Verwendung der HTML und CSS Eine alternative Log-datei mit der Verwendung der HTML und CSS
In diesem Artikel werden wir eine sehr einfache, aber leistungsfähige Bibliothek zur Erstellung der HTML-Dateien schreiben, dabei lernen wir auch, wie man eine ihre Darstellung einstellen kann (nach seinem Geschmack) und sehen wir, wie man es leicht in seinem Expert Advisor oder Skript hinzufügen oder verwenden kann.
Automatisieren von Handelsstrategien in MQL5 (Teil 42): Sitzungsbasiertes System des Opening Range Breakout (ORB) Automatisieren von Handelsstrategien in MQL5 (Teil 42): Sitzungsbasiertes System des Opening Range Breakout (ORB)
In diesem Artikel erstellen wir das vollständig anpassbare sitzungsbasierte System des Opening Range Breakout (ORB) in MQL5, mit dem wir eine beliebige Startzeit der Sitzung und die Dauer der Spanne festlegen können, das Hoch und Tief dieser Eröffnungsperiode automatisch berechnet und nur bestätigte Ausbrüche in die Richtung der Bewegung handelt.