Multimodul-Handelsroboter in Python und MQL5 (Teil I): Erstellung der Grundarchitektur und erster Module
Einführung
Eines Tages kam mir eine Idee: Handelsroboter sind zu simpel für den modernen Markt, etwas Flexibleres und Intelligenteres wird benötigt.
Der Markt ist ständig in Bewegung. Eine Strategie, die heute funktioniert, ist morgen nutzlos. Ich beobachtete dies lange Zeit und erkannte, dass ein völlig neuer Ansatz erforderlich war. Die Lösung kam unerwartet. Was wäre, wenn wir ein modulares System entwickeln würden? Stellen Sie sich ein Team von Fachleuten vor: Einer beobachtet die Trends, der zweite analysiert das Handelsvolumen, der dritte kontrolliert die Risiken. Genau so sollte ein moderner Handelsroboter funktionieren!
Die Wahl der Technologien lag auf der Hand. Python erwies sich als perfekt für die Datenanalyse – mit seinen Bibliotheken kann man Wunder bewirken. MQL5 hat die Ausführung von Handelsgeschäften übernommen. Ein tolles Tandem ist entstanden. Wir haben klein angefangen: Zuerst haben wir eine solide Grundlage geschaffen – eine Architektur, die wachsen und sich weiterentwickeln kann, dann haben wir die Interaktion zwischen Python und MQL5 hinzugefügt. Das Datenverwaltungssystem erwies sich als überraschend einfach und effektiv.
Die Asynchronität war ein echter Durchbruch! Jetzt kann der Roboter mehrere Instrumente gleichzeitig überwachen. Die Produktivität stieg sprunghaft an.
Wissen Sie, was am interessantesten ist? Dieses System funktioniert wirklich auf dem Markt. Es handelt sich nicht nur um ein Lehrbuchbeispiel, sondern um ein tatsächliches Handelsinstrument. Natürlich fangen wir mit der Basisversion an, aber auch diese ist beeindruckend. Wir haben eine große Reise vor uns. Wir werden ein System schaffen, das lern- und anpassungsfähig ist. Wir werden sie Schritt für Schritt verbessern. Beginnen wir zunächst mit dem Wichtigsten – dem Aufbau eines soliden Fundaments.
Grundlegende Architektur des Systems. Auf der Suche nach dem perfekten Gleichgewicht
Drei Jahre lang habe ich mich bemüht, Handelsroboter zu entwickeln. Mir wurde klar, dass es nicht auf die Algorithmen selbst ankommt, sondern darauf, wie sie zusammenarbeiten. Diese Entdeckung hat alles verändert.
Stellen Sie sich ein Sinfonieorchester vor. Jeder Musiker ist großartig, aber ohne einen Dirigenten gibt es keine Musik. In meinem System wurde MarketMaker zu einem solchen Dirigenten. Es steuert vier Module, von denen jedes für sich selbst zuständig ist:
- Das erste Modul überwacht das Handelsvolumen: wann und zu welchen Preisen Transaktionen stattfinden.
- Das zweite Modul sucht nach Arbitragemöglichkeiten.
- Im dritten Modul wird die Wirtschaft analysiert.
- Das vierte Modul verhindert, dass das System aus dem Ruder läuft und kontrolliert die Risiken.
Der Markt wartet auf niemanden. Er ändert sich blitzschnell, sodass alle Module gleichzeitig arbeiten und ständig miteinander kommunizieren. Nehmen wir an, das Arbitrage-Modul erkennt die Gelegenheit. Informationen aus anderen Modulen werden geprüft und eine Entscheidung wird getroffen.
Zuerst dachte ich daran, strenge Regeln für den Markteintritt aufzustellen. Die Praxis zeigte jedoch schnell, dass dies nicht möglich war. Manchmal ist ein starkes Signal wichtiger als mehrere schwache Signale. Die Zusammenstellung der Daten hat ziemlich viel Zeit in Anspruch genommen. Jedes Modul hat seine eigenen Informationen: Kurse, Makroindikatoren, Transaktionshistorie. All dies muss gespeichert, aktualisiert und mit anderen geteilt werden. Es war notwendig, ein spezielles Synchronisationssystem zu schaffen.
Ironischerweise funktionierte das System umso besser, je unabhängiger die Module waren. Der Ausfall einer Komponente hat die anderen nicht gestoppt. Aber es kommt zu Fehlern: Die Verbindung wird unterbrochen oder die Zitate frieren ein. Der Hauptvorteil dieser Architektur ist, dass sie erweiterbar ist. Möchten Sie eine Nachrichtenanalyse hinzufügen? Kein Problem! Erstellen Sie ein Modul, verbinden Sie es mit MarketMaker, und alles funktioniert wie am Schnürchen.
Das System lebt und entwickelt sich. Sie ist nicht perfekt, aber ihre Grundlage aus Modularität, Parallelität und Flexibilität lässt uns zuversichtlich in die Zukunft blicken. Ich werde Ihnen bald mehr über die einzelnen Komponenten erzählen.
Hauptsystemklasse
Nach vielen Experimenten mit verschiedenen Ansätzen für die Architektur von Handelsrobotern kam ich zu der Erkenntnis, dass der Erfolg des Systems weitgehend davon abhängt, wie gut sein Kern organisiert ist. MarketMaker ist das Ergebnis dieses Verständnisses und verkörpert alle Best Practices, die ich im Laufe der Jahre bei der Entwicklung algorithmischer Systeme gesammelt habe.
Beginnen wir mit der Grundstruktur der Klasse. So sieht seine Initialisierung aus:
def __init__(self, pairs: List[str], terminal_path: str, volume: float = 1.0, levels: int = 5, spacing: float = 3.0): # Main parameters self.pairs = pairs self.base_volume = volume self.levels = levels self.spacing = spacing self.magic = 12345 # Trading parameters self.portfolio_iterations = 10 self.leverage = 50 self.min_profit_pips = 1.0 self.max_spread_multiplier = 2.0 # Data warehouses self.symbols_info = {} self.trading_parameters = {} self.optimal_horizons = {}
Auf den ersten Blick sieht alles ganz einfach aus. Aber hinter jedem Parameter steht eine Geschichte. Nehmen Sie zum Beispiel portfolio_iterations – dieser Parameter wurde erstellt, nachdem ich festgestellt hatte, dass eine zu aggressive Eröffnung von Positionen zu Liquiditätsproblemen führen kann. Jetzt teilt das System das verfügbare Volumen in Teile auf, was den Handel ausgewogener macht.
Besonderes Augenmerk habe ich auf die Initialisierung der historischen Daten gelegt. So funktioniert es:
def _initialize_history(self, pair: str): """Initializing historical data for a pair""" try: rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, datetime.now()-timedelta(days=1), 1440) if rates is not None: df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) returns = np.log(df['close'] / df['close'].shift(1)).dropna() self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:]) except Exception as e: logger.error(f"Error initializing history for {pair}: {e}")
Interessant ist hier die Verwendung logarithmischer Renditen anstelle von einfachen prozentualen Veränderungen. Dies ist keine zufällige Wahl. In der Praxis habe ich festgestellt, dass logarithmierte Renditen bei der Berechnung statistischer Indikatoren stabilere Ergebnisse liefern, insbesondere wenn es um die Volatilität geht.
Eine der größten Herausforderungen war die Umsetzung der Volumenprognose. Nach vielen Experimenten wurde der folgende Code geboren:
async def update_volume_predictions(self): """Updating volume predictions for each pair""" for pair in self.pairs: try: df = volume_model.get_volume_data( symbol=pair, timeframe=mt5.TIMEFRAME_H1, n_bars=100 ) if pair in self.volume_models: feature_columns = [ 'volume_sma_5', 'volume_sma_20', 'relative_volume', 'volume_change', 'volume_volatility', 'price_sma_5', 'price_sma_20', 'price_change', 'price_volatility', 'rsi', 'macd', 'macd_signal', 'bb_upper', 'bb_lower' ] X = df[feature_columns].iloc[-1:].copy() prediction = self.volume_models[pair].predict(X)[0] current_price = df['close'].iloc[-1] predicted_change = (prediction - current_price) / current_price self.volume_predictions[pair] = predicted_change except Exception as e: logger.error(f"Error updating prediction for {pair}: {e}")
Beachten Sie, dass es sich bei den Merkmalen nicht nur um eine zufällige Auswahl von Indikatoren handelt. Jedes dieser Produkte wurde nach sorgfältiger Prüfung schrittweise hinzugefügt. Zum Beispiel hat sich relative_volume als besonders nützlich erwiesen, um anormale Marktaktivitäten zu erkennen.
Und hier befindet sich das Herzstück des Systems – die Handelsschleife:
async def trade_cycle(self): """Main trading loop""" try: await self.update_volume_predictions() await self.economic_module.update_forecasts() all_positions = mt5.positions_get() or [] open_positions = [pos for pos in all_positions if pos.magic == self.magic] if open_positions: await self.manage_positions() return valid_signals = [] available_volume = self.calculate_available_volume() * len(self.pairs) for pair in self.pairs: signal = await self.get_combined_signal(pair) if signal and self._validate_signal(signal): valid_signals.append(signal) if valid_signals: volume_per_trade = available_volume / len(valid_signals) for signal in valid_signals: signal['adjusted_volume'] = volume_per_trade await self.place_order(signal) except Exception as e: logger.error(f"Error in trade cycle: {e}")
Dieser Code ist das Ergebnis langer Überlegungen, wie man den Handelsprozess richtig organisieren kann. Der asynchrone Charakter der Schleife ermöglicht die effiziente Bearbeitung mehrerer Paare gleichzeitig, und die klare Abfolge der Aktionen (Aktualisierung der Prognosen → Überprüfung der Positionen → Suche nach Signalen → Ausführung) gewährleistet ein vorhersehbares Systemverhalten.
Der Mechanismus der Signalvalidierung verdient besondere Aufmerksamkeit:
def _validate_signal(self, signal: Dict) -> bool: """Trading signal check""" spread = signal['spread'] diff_pips = signal['diff_pips'] # Basic checks if spread > self.max_spread_multiplier * diff_pips: return False if diff_pips < self.min_profit_pips: return False # Check economic factors if signal['economic_volatility'] > self.volatility_threshold: return False # Check the volume prediction if abs(signal['volume_prediction']) < self.min_volume_change: return False return True
Jeder Check hier ist das Ergebnis echter Handelserfahrung. So wurde beispielsweise die Prüfung der wirtschaftlichen Volatilität hinzugefügt, nachdem ich festgestellt hatte, dass der Handel während wichtiger Nachrichtenereignisse aufgrund starker Kursschwankungen häufig zu höheren Verlusten führte.
Abschließend möchte ich anmerken, dass MarketMaker ein lebendiges System ist, das sich ständig weiterentwickelt. Jeder Tag im Handel bringt neue Ideen und Verbesserungen. Die modulare Architektur macht es einfach, diese Verbesserungen zu implementieren, ohne die Kernkomponenten zu stören.
Umgang mit Daten
Die Datenverarbeitung war schon immer einer der schwierigsten Aspekte des algorithmischen Handels. Ich erinnere mich, wie ich zu Beginn der Entwicklung mit einer scheinbar einfachen Frage konfrontiert wurde: Wie kann man die Speicherung und Handhabung von Marktinformationen richtig organisieren? Es wurde schnell klar, dass eine normale Datenbank oder einfache Arrays nicht ausreichen würden.
Alles begann mit der Schaffung einer Grundstruktur für den Datenempfang. Nach mehreren Iterationen wurde die folgende Methode entwickelt:
def _initialize_history(self, pair: str): try: rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, datetime.now()-timedelta(days=1), 1440) if rates is None: logger.error(f"Failed to get history data for {pair}") return df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) # Calculate logarithmic returns returns = np.log(df['close'] / df['close'].shift(1)).dropna() # Add new metrics df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3 df['price_velocity'] = df['close'].diff() / df['time'].diff().dt.total_seconds() df['volume_intensity'] = df['tick_volume'] / df['time'].diff().dt.total_seconds() self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:]) self.price_data[pair] = df except Exception as e: logger.error(f"Error initializing history for {pair}: {e}")
Interessant ist hier die Berechnung der „Geschwindigkeit“ der Preisänderung (price_velocity) und der Intensität des Volumens (volume_intensity). Diese Metriken erschienen nicht sofort. Anfangs arbeitete ich nur mit regulären Preisdaten, erkannte aber schnell, dass der Markt nicht nur eine Abfolge von Preisen ist, sondern ein komplexes dynamisches System, bei dem nicht nur das Ausmaß der Veränderungen wichtig ist, sondern auch die Geschwindigkeit dieser Veränderungen.
Besondere Aufmerksamkeit musste dem Umgang mit fehlenden Daten gewidmet werden. So sieht das Validierungs- und Reinigungssystem aus:
def _validate_and_clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """Validation and data cleaning""" if df.empty: raise ValueError("Empty dataset received") # Check gaps missing_count = df.isnull().sum() if missing_count.any(): logger.warning(f"Found missing values: {missing_count}") # Use 'forward fill' for prices price_cols = ['open', 'high', 'low', 'close'] df[price_cols] = df[price_cols].ffill() # Use interpolation for volumes df['tick_volume'] = df['tick_volume'].interpolate(method='linear') # Check outliers for col in ['high', 'low', 'close']: zscore = stats.zscore(df[col]) outliers = abs(zscore) > 3 if outliers.any(): logger.warning(f"Found {outliers.sum()} outliers in {col}") # Replace extreme outliers df.loc[outliers, col] = df[col].mean() + 3 * df[col].std() * np.sign(zscore[outliers]) return df
Ich erinnere mich an einen Fall, in dem das Fehlen eines einzigen Ticks zu einer falschen Berechnung von Indikatoren und damit zu einem falschen Handelssignal führte. Daraufhin wurde das Datenbereinigungssystem erheblich verbessert.
Hier sehen Sie, wie wir mit dem Volumen arbeiten, einem der wichtigsten Merkmale des Marktes:
def analyze_volume_profile(self, pair: str, window: int = 100) -> Dict: """Volume profile analysis""" try: df = self.price_data[pair].copy().last(window) # Normalize volumes volume_mean = df['tick_volume'].rolling(20).mean() volume_std = df['tick_volume'].rolling(20).std() df['normalized_volume'] = (df['tick_volume'] - volume_mean) / volume_std # Calculate volume clusters price_levels = pd.qcut(df['close'], q=10) volume_clusters = df.groupby(price_levels)['tick_volume'].sum() # Find support/resistance levels by volume significant_levels = volume_clusters[volume_clusters > volume_clusters.mean() + volume_clusters.std()] # Analyze imbalances buy_volume = df[df['close'] > df['open']]['tick_volume'].sum() sell_volume = df[df['close'] <= df['open']]['tick_volume'].sum() volume_imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume) return { 'normalized_profile': volume_clusters.to_dict(), 'significant_levels': significant_levels.index.to_list(), 'volume_imbalance': volume_imbalance, 'current_percentile': stats.percentileofscore(df['tick_volume'], df['tick_volume'].iloc[-1]) } except Exception as e: logger.error(f"Error analyzing volume profile: {e}") return None
Dieser Code ist das Ergebnis einer langen Studie über die Mikrostruktur des Marktes. Von besonderem Interesse ist die Berechnung des Ungleichgewichts zwischen Käufen und Verkäufen. Ich habe dieses Thema zunächst auf dem Kryptomarkt studiert. Ich weiß nicht, ob die MQL5-Administration grünes Licht für die Veröffentlichung des Codes mit der Integration der Krypto-Börse, MetaTrader 5, Python, ... geben wird.
Aber ich schweife ab. Auf den ersten Blick mag es so aussehen, als ob ein einfacher Vergleich der Volumina bei steigenden und fallenden Balken keine nützlichen Informationen liefern würde. Die Praxis hat jedoch gezeigt, dass dieser einfache Indikator häufig vor einer bevorstehenden Trendwende warnt.
Die Arbeit mit Wirtschaftsdaten ist eine andere Geschichte. Hier war es notwendig, ein ganzes Synchronisationssystem zu schaffen:
async def synchronize_market_data(self): """Market data synchronization""" while True: try: # Update basic data for pair in self.pairs: latest_data = await self._get_latest_ticks(pair) if latest_data is not None: self._update_price_data(pair, latest_data) # Update derived metrics await self._update_derivatives() # Check data integrity self._verify_data_integrity() await asyncio.sleep(1) # Dynamic delay except Exception as e: logger.error(f"Error in data synchronization: {e}") await asyncio.sleep(5) # Increased delay on error
Der entscheidende Punkt ist hier die Asynchronität der Datenaktualisierung. In frühen Versionen des Systems habe ich synchrone Anfragen verwendet, was jedoch zu Verzögerungen bei der Bearbeitung einer großen Anzahl von Paaren führte. Der Übergang zu einem asynchronen Modell hat die Produktivität deutlich verbessert.
Abschließend möchte ich anmerken, dass die richtige Organisation der Arbeit mit Daten nicht nur eine technische Frage ist. Dies ist das Fundament, auf dem die gesamte Handelsstrategie aufgebaut ist. Saubere, gut strukturierte Daten ermöglichen es uns, Marktmuster zu erkennen, die bei einer oberflächlichen Analyse verborgen bleiben.
Erstes Modul: Analyse des Volumens
Die Geschichte der Entwicklung des Volumensanalyse-Moduls begann mit einer einfachen Beobachtung: Klassische Indikatoren hinken oft hinterher, weil sie nur mit Preisen arbeiten. Aber auf dem Markt geht es nicht nur um Preise, sondern auch um Handelsvolumina, die oft die Entwicklung der Kurse vorhersagen. Aus diesem Grund war das erste Modul unseres Systems der Volumenanalysator.
Beginnen wir mit der grundlegenden Datenabfragefunktion:
def get_volume_data(symbol, timeframe=mt5.TIMEFRAME_H1, n_bars=2000): """Getting volume and price data from MT5""" try: bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, n_bars) if bars is None: logger.error(f"Failed to get data for {symbol}") return None df = pd.DataFrame(bars) df['time'] = pd.to_datetime(df['time'], unit='s') return df except Exception as e: logger.error(f"Error getting data for {symbol}: {e}") return None
Auf den ersten Blick sieht die Funktion einfach aus. Doch hinter dieser Einfachheit verbirgt sich eine wichtige Entscheidung: Wir nehmen genau 2000 Balken der Geschichte. Warum? Ich habe experimentell herausgefunden, dass dies für die Erstellung eines qualitativ hochwertigen Modells ausreicht, aber gleichzeitig keine übermäßige Belastung des Serverspeichers verursacht, wenn sehr große Modelle mit großen Datensatzabmessungen und Eingangsmerkmalen als Stapelsequenzen trainiert werden.
Der interessanteste Teil des Moduls ist die Erstellung von Merkmalen für die Analyse. So funktioniert es:
def create_features(df, forecast_periods=None): """Create features for the forecasting model""" try: # Basic volume indicators df['volume_sma_5'] = df['tick_volume'].rolling(window=5).mean() df['volume_sma_20'] = df['tick_volume'].rolling(window=20).mean() df['relative_volume'] = df['tick_volume'] / df['volume_sma_20'] # Volume dynamics df['volume_change'] = df['tick_volume'].pct_change() df['volume_acceleration'] = df['volume_change'].diff() # Volume volatility df['volume_volatility'] = df['tick_volume'].rolling(window=20).std() df['volume_volatility_5'] = df['tick_volume'].rolling(window=5).std() df['volume_volatility_ratio'] = df['volume_volatility_5'] / df['volume_volatility']
Besonderes Augenmerk sollte hier auf volume_volatility_ratio gelegt werden. Dieser Indikator entstand, nachdem mir ein interessantes Muster aufgefallen war: Vor starken Bewegungen beginnt die kurzfristige Volatilität oft schneller zu wachsen als die langfristige Volatilität. Dieser Indikator hat sich zu einem der wichtigsten Indikatoren für die Identifizierung potenzieller Einstiegspunkte entwickelt.
Die Berechnung des Volumenprofils ist eine andere Geschichte:
# Volume profile df['volume_percentile'] = df['tick_volume'].rolling(window=100).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] ) df['volume_density'] = df['tick_volume'] / (df['high'] - df['low']) df['volume_density_ma'] = df['volume_density'].rolling(window=20).mean() df['cumulative_volume'] = df['tick_volume'].rolling(window=20).sum() df['volume_ratio'] = df['tick_volume'] / df['cumulative_volume']
Der Indikator volume_density ist nicht zufällig entstanden. Ich habe festgestellt, dass das Volumen selbst trügerisch sein kann – es ist wichtig zu berücksichtigen, in welcher Preisklasse es gesammelt wurde. Ein hohes Volumen in einer engen Preisspanne deutet oft auf die Bildung eines wichtigen Unterstützungs- oder Widerstandsniveaus hin.
Ich habe eine spezielle Funktion entwickelt, um die Richtung der Kursbewegung vorherzusagen:
def predict_direction(model, X): """Price movement direction forecast""" try: prediction = model.predict(X)[0] current_price = X['close'].iloc[-1] if 'close' in X else None if current_price is None: return 0 # Return 1 for rise, -1 for fall, 0 for neutral price_change = (prediction - current_price) / current_price if abs(price_change) < 0.0001: # Minimum change threshold return 0 return 1 if price_change > 0 else -1 except Exception as e: logger.error(f"Error predicting direction: {e}") return 0
Bitte beachten Sie, dass die Änderungsschwelle bei 0,0001 liegt. Dies ist keine zufällige Zahl – sie wird auf der Grundlage einer Analyse der Mindestbewegung gewählt, die unter Berücksichtigung des Spreads und der verschiedenen Arten von Provisionen gehandhabt werden kann. Für den Aktienmarkt sollte der Indikator separat ausgewählt werden.
Der letzte Schritt ist das Training des Modells:
def train_model(X_train, X_test, y_train, y_test, model_params=None): try: if model_params is None: model_params = {'n_estimators': 400, 'random_state': 42} model = RandomForestRegressor(**model_params) model.fit(X_train, y_train) # Model evaluation train_predictions = model.predict(X_train) test_predictions = model.predict(X_test) train_rmse = np.sqrt(mean_squared_error(y_train, train_predictions)) test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions)) test_r2 = r2_score(y_test, test_predictions)
Wissen Sie, warum ich RandomForest mit 400 Bäumen gewählt habe? Nach vielen Versuchen – von der einfachen Regression bis hin zu neuronalen Netzen von erstaunlicher Komplexität in der Architektur – bin ich zu dem Schluss gekommen, dass diese Methode die zuverlässigste ist. Vielleicht nicht die genauesten, aber stabil. Der Markt ist unruhig und enthält viel Rauschen, aber RandomForest hält sich gut.
Dies ist natürlich nur der Anfang. Die nächsten Fragen, die sich uns stellen, sind, wie man all diese Signale miteinander verbindet und wie man das System so einrichtet, dass es unterwegs lernt. Aber dazu beim nächsten Mal mehr.
Risikomanagement: Die Kunst der Kapitalerhaltung
Und nun zum Wichtigsten: den Risiken. Es ist ziemlich amüsant, allen zuzuhören, die über coole Strategien und neuronale Netze diskutieren. In den zehn Jahren, die ich auf dem Markt bin, habe ich vor allem eines erkannt: All diese Strategien sind ohne Risikomanagement wertlos. Sie mögen einen tollen Handelsalgorithmus haben, aber ohne ein angemessenes Risikomanagement werden Sie trotzdem rote Zahlen schreiben.
Deshalb steht in unserem System der Kapitalschutz im Mittelpunkt. Es ist dieser konservative Ansatz, der es uns ermöglicht, beständig zu verdienen, während andere mit „perfekten“ Strategien Geld verlieren.
def calculate_available_volume(self) -> float: try: account = mt5.account_info() if not account: return 0.01 # Use balance and free margin balance = account.balance free_margin = account.margin_free # Take the minimum value for safety available_margin = min(balance, free_margin) # Calculate the maximum volume taking into account the margin margin_ratio = 0.1 # Use only 10% of the available margin base_volume = (available_margin * margin_ratio) / 1000 # Limit to maximum volume max_volume = min(base_volume, 1.0) # max 1 lot
Bitte beachten Sie, dass margin_ratio = 0,1 ist. Dies ist keine Zufallszahl. Nach mehrmonatigen Tests bin ich zu dem Schluss gekommen, dass der Einsatz von mehr als 10 % der verfügbaren Marge das Risiko eines Margenausgleichs bei starken Marktbewegungen deutlich erhöht. Dies ist besonders wichtig, wenn mehrere Paare gleichzeitig gehandelt werden.
Der nächste wichtige Punkt ist die Berechnung von Stop Loss und Take Profit:
async def calculate_position_limits(self, signal: Dict) -> Tuple[float, float]: try: pair = signal['pair'] direction = signal['direction'] # Get volatility volatility = signal['price_volatility'] economic_volatility = signal['economic_volatility'] # Base values in pips base_sl = 20 base_tp = 40 # Adjust for volatility volatility_factor = 1 + (volatility * 2) sl_points = base_sl * volatility_factor tp_points = base_tp * volatility_factor # Take economic volatility into account if economic_volatility > 0.5: sl_points *= 1.5 tp_points *= 1.2 # Check minimum distances info = self.symbols_info[pair] min_stop_level = info.trade_stops_level if hasattr(info, 'trade_stops_level') else 0 return max(sl_points, min_stop_level), max(tp_points, min_stop_level) except Exception as e: logger.error(f"Error calculating position limits: {e}") return 20, 40 # return base values in case of an error
Die Geschichte mit volatility_factor ist besonders interessant. Anfangs verwendete ich feste Stopp-Levels, stellte aber schnell fest, dass sie in Zeiten hoher Volatilität oft zu früh und zu oft ausgelöst wurden. Die dynamische Anpassung der Stop-Levels auf Basis der aktuellen Volatilität hat die Handelsergebnisse deutlich verbessert.
Und so sieht das Positionsmanagementsystem aus:
async def manage_positions(self): """Managing open positions""" try: positions = mt5.positions_get() or [] for position in positions: if position.magic == self.magic: # Check the time in the position time_in_trade = datetime.now() - pd.to_datetime(position.time, unit='s') # Get current market data signal = await self.get_combined_signal(position.symbol) # Check the need to modify the position if self._should_modify_position(position, signal, time_in_trade): await self._modify_position(position, signal) # Check the closing conditions if self._should_close_position(position, signal, time_in_trade): await self.close_position(position) except Exception as e: logger.error(f"Error managing positions: {e}")
Besonderes Augenmerk wird dabei auf die Zeit gelegt, die eine Position offen ist. Die Erfahrung hat gezeigt, dass die Anforderungen an die Aufrechterhaltung einer Stelle umso höher sein sollten, je länger sie offen ist. Dies wird durch eine dynamische Verschärfung der Bedingungen für das Halten von Positionen im Laufe der Zeit erreicht.
Ein interessanter Punkt ist die teilweise Schließung von Positionen:
def calculate_partial_close(self, position, profit_threshold: float = 0.5) -> float: """Volume calculation for partial closure""" try: # Check the current profit if position.profit <= 0: return 0.0 profit_ratio = position.profit / (position.volume * 1000) # approximate ROI estimate if profit_ratio >= profit_threshold: # Close half of the position when the profit threshold is reached return position.volume * 0.5 return 0.0 except Exception as e: logger.error(f"Error calculating partial close: {e}") return 0.0
Diese Funktion wurde nach der Analyse der Transaktionen erstellt. Ich habe festgestellt, dass das teilweise Schließen von Positionen bei Erreichen eines bestimmten Gewinnniveaus die Handelsstatistiken insgesamt deutlich verbessert. Auf diese Weise können wir einen Teil unserer Gewinne sichern und haben gleichzeitig noch Potenzial für weiteres Wachstum.
Abschließend möchte ich anmerken, dass das Risikomanagementsystem ein lebendiger Organismus ist, der sich ständig weiterentwickelt. Jeder erfolglose Handel, jede unerwartete Marktbewegung ist eine neue Erfahrung, die wir nutzen, um die Algorithmen zum Kapitalschutz zu verbessern. Für die nächsten Versionen des Systems habe ich beschlossen, maschinelles Lernen zur dynamischen Optimierung der Risikomanagementparameter sowie eine Mischung aus dem VaR-System und der Portfoliotheorie von Markowitz hinzuzufügen, aber das ist eine ganz andere Geschichte....
Wirtschaftsmodul: Wenn die Fundamentalanalyse auf maschinelles Lernen trifft
Bei der Arbeit an dem Handelssystem ist mir ein interessantes Muster aufgefallen: Selbst die stärksten technischen Signale können fehlschlagen, wenn sie im Widerspruch zu fundamentalen Faktoren stehen. Diese Beobachtung führte zur Schaffung des Wirtschaftsmoduls, einer Komponente, die makroökonomische Indikatoren und ihre Auswirkungen auf die Entwicklung von Währungspaaren analysiert.
Beginnen wir mit der Grundstruktur des Moduls. So sieht die Initialisierung der wichtigsten Wirtschaftsindikatoren aus:
def __init__(self):
self.indicators = {
'NY.GDP.MKTP.KD.ZG': 'GDP growth',
'FP.CPI.TOTL.ZG': 'Inflation',
'FR.INR.RINR': 'Real interest rate',
'NE.EXP.GNFS.ZS': 'Exports',
'NE.IMP.GNFS.ZS': 'Imports',
'BN.CAB.XOKA.GD.ZS': 'Current account balance',
'GC.DOD.TOTL.GD.ZS': 'Government debt',
'SL.UEM.TOTL.ZS': 'Unemployment rate',
'NY.GNP.PCAP.CD': 'GNI per capita',
'NY.GDP.PCAP.KD.ZG': 'GDP per capita growth'
}
Die Auswahl dieser Indikatoren ist nicht zufällig. Nach der Analyse von Tausenden von Handelsgeschäften habe ich festgestellt, dass diese Indikatoren den größten Einfluss auf die langfristigen Trends von Währungspaaren haben. Von besonderem Interesse ist die Beziehung zwischen dem realen Zinssatz und den Währungsbewegungen, wobei Veränderungen dieses Indikators häufig einer Trendumkehr vorausgehen.
Ich habe eine spezielle Methode entwickelt, um Wirtschaftsdaten zu erhalten:
def fetch_economic_data(self): data_frames = [] for indicator, name in self.indicators.items(): try: data_frame = wbdata.get_dataframe({indicator: name}, country='all') data_frames.append(data_frame) except Exception as e: logger.error(f"Error fetching data for indicator '{indicator}': {e}") if data_frames: self.economic_data = pd.concat(data_frames, axis=1) return self.economic_data
Interessant ist hier die Verwendung der Bibliothek wbdata, um die Daten der Weltbank zu erhalten. Ich habe mich für diese Quelle entschieden, nachdem ich mit verschiedenen APIs experimentiert hatte, da sie die vollständigsten und am besten geprüften Daten liefert.
Besonderes Augenmerk habe ich auf die Aufbereitung der Daten für die Analyse gelegt:
def prepare_data(self, symbol_data): data = symbol_data.copy() data['close_diff'] = data['close'].diff() data['close_corr'] = data['close'].rolling(window=30).corr(data['close'].shift(1)) for indicator in self.indicators.keys(): if indicator in self.economic_data.columns: data[indicator] = self.economic_data[indicator].ffill() data.dropna(inplace=True) return data
Beachten Sie die Verwendung von „forward fill“ für Wirtschaftsindikatoren. Diese Lösung ergab sich nicht sofort – zunächst versuchte ich es mit Interpolation, aber es stellte sich heraus, dass es für wirtschaftliche Daten korrekter ist, den letzten bekannten Wert zu verwenden.
Das Herzstück des Moduls ist das Prognosesystem:
def forecast(self, symbol, symbol_data): if len(symbol_data) < 50: return None, None X = symbol_data.drop(columns=['close']) y = symbol_data['close'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) model = CatBoostRegressor(iterations=1000, learning_rate=0.1, depth=8, loss_function='RMSE') model.fit(X_train, y_train, verbose=False)
Die Wahl von CatBoost als Algorithmus für maschinelles Lernen ist auch nicht zufällig. Nach dem Experimentieren mit verschiedenen Modellen (von der einfachen linearen Regression bis hin zu komplexen neuronalen Netzen) stellte sich heraus, dass CatBoost am besten mit der unregelmäßigen Natur der Wirtschaftsdaten zurechtkommt.
Die letzte Phase ist die Interpretation der Ergebnisse:
def interpret_results(self, symbol): forecast = self.forecasts.get(symbol) importance_df = self.feature_importances.get(symbol) if forecast is None or importance_df is None: return f"Insufficient data for interpretation of {symbol}" trend = "upward" if forecast[-1] > forecast[0] else "downward" volatility = "high" if forecast.std() / forecast.mean() > 0.1 else "low" top_feature = importance_df.iloc[0]['feature']
Besonders interessant ist die Berechnung der Volatilität. Der Schwellenwert von 0,1 für eine hohe Volatilität wurde nach der Analyse historischer Daten gewählt. Es hat sich gezeigt, dass dieser Wert gut zwischen Zeiten ruhiger Märkte und erhöhter Turbulenzen unterscheidet.
Bei der Arbeit an dem Modul habe ich eine interessante Beobachtung gemacht: Wirtschaftliche Faktoren wirken oft mit Verzögerung, aber ihr Einfluss ist stabiler als der Einfluss technischer Faktoren. Dies hat zur Schaffung eines Gewichtungssystems geführt, bei dem die Bedeutung von Wirtschaftssignalen in längeren Zeiträumen zunimmt.
Natürlich ist das Wirtschaftsmodul kein Zauberstab, und es kann nicht alle Marktbewegungen vorhersagen. In Verbindung mit der technischen Analyse und der Volumenanalyse bietet sie jedoch eine zusätzliche Dimension für das Verständnis der Marktprozesse. In zukünftigen Versionen des Systems werde ich die Analyse von Nachrichtenströmen und deren Auswirkungen auf Wirtschaftsindikatoren hinzufügen, aber das ist ein Thema für eine separate Diskussion.
Modul Arbitrage: Auf der Suche nach einem aktuellen Preis
Die Idee, ein Arbitrage-Modul zu entwickeln, kam mir nach langen Beobachtungen des Devisenmarktes. Mir ist ein interessantes Muster aufgefallen: Die tatsächlichen Preise von Währungspaaren weichen oft von ihrem theoretischen Wert ab, der anhand der Kreuzkurse berechnet wird. Diese Abweichungen schaffen Arbitragemöglichkeiten, aber noch wichtiger ist, dass sie als Indikator für künftige Preisbewegungen dienen können.
Beginnen wir mit der Grundstruktur des Moduls:
class ArbitrageModule: def __init__(self, terminal_path: str = "C:/Program Files/RannForex MetaTrader 5/terminal64.exe", max_trades: int = 10): self.terminal_path = terminal_path self.MAX_OPEN_TRADES = max_trades self.symbols = [ "AUDUSD.ecn", "AUDJPY.ecn", "CADJPY.ecn", "AUDCHF.ecn", "AUDNZD.ecn", "USDCAD.ecn", "USDCHF.ecn", "USDJPY.ecn", "NZDUSD.ecn", "GBPUSD.ecn", "EURUSD.ecn", "CADCHF.ecn", "CHFJPY.ecn", "NZDCAD.ecn", "NZDCHF.ecn", "NZDJPY.ecn", "GBPCAD.ecn", "GBPCHF.ecn", "GBPJPY.ecn", "GBPNZD.ecn", "EURCAD.ecn", "EURCHF.ecn", "EURGBP.ecn", "EURJPY.ecn", "EURNZD.ecn" ]
Bei der Berechnung der Kreuzkurse sollte besonders auf die Struktur der Basispaare geachtet werden:
self.usd_pairs = {
"EUR": "EURUSD.ecn",
"GBP": "GBPUSD.ecn",
"AUD": "AUDUSD.ecn",
"NZD": "NZDUSD.ecn",
"USD": None,
"CAD": ("USDCAD.ecn", True),
"CHF": ("USDCHF.ecn", True),
"JPY": ("USDJPY.ecn", True)
}
Interessant ist dabei, dass einige Paare als invers (wahr) gekennzeichnet sind. Dies ist kein Zufall – für einige Währungen, wie CAD, CHF und JPY, lautet die Basisnotierung USD/XXX und nicht XXX/USD. Dies ist eine wichtige Nuance, die bei der Berechnung der Kreuzkurse oft übersehen wird.
Das Herzstück des Moduls ist die Funktion zur Berechnung der synthetischen Preise:
def calculate_synthetic_prices(self, data: Dict[str, pd.DataFrame]) -> pd.DataFrame: """Calculation of synthetic prices through cross rates""" synthetic_prices = {} try: for symbol in self.symbols: base = symbol[:3] quote = symbol[3:6] # Calculate the synthetic price using cross rates fair_price = self.calculate_cross_rate(base, quote, data) synthetic_prices[f'{symbol}_fair'] = pd.Series([fair_price])
Ich erinnere mich, wie schwierig es war, diesen Code zu optimieren. Zunächst habe ich versucht, alle möglichen Umstellungswege zu berechnen und den optimalen zu wählen. Es hat sich jedoch gezeigt, dass die einfache Berechnung über den USD stabilere Ergebnisse liefert, insbesondere bei hoher Volatilität.
Interessant ist auch die Funktion zur Berechnung des Wechselkurses gegenüber dem USD:
def get_usd_rate(self, currency: str, data: dict) -> float: """Get exchange rate to USD""" if currency == "USD": return 1.0 pair_info = self.usd_pairs[currency] if isinstance(pair_info, tuple): pair, inverse = pair_info rate = data[pair]['close'].iloc[-1] return 1 / rate if inverse else rate else: pair = pair_info return data[pair]['close'].iloc[-1]
Diese Funktion wurde nach langen Experimenten mit verschiedenen Methoden zur Berechnung von Kreuzkursen entwickelt. Der wichtigste Punkt ist hier die korrekte Handhabung von inversen Paaren. Eine fehlerhafte Berechnung auch nur eines Paares kann zu einer Kaskade von Fehlern bei den synthetischen Preisen führen.
Ich habe eine spezielle Funktion entwickelt, um echte Daten zu verarbeiten:
def get_mt5_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
try:
timezone = pytz.timezone("Etc/UTC")
utc_from = datetime.now(timezone) - timedelta(days=1)
ticks = mt5.copy_ticks_from(symbol, utc_from, count, mt5.COPY_TICKS_ALL)
if ticks is None:
logger.error(f"Failed to fetch data for {symbol}")
return None
ticks_frame = pd.DataFrame(ticks)
ticks_frame['time'] = pd.to_datetime(ticks_frame['time'], unit='s')
return ticks_frame
Die Wahl der Anzahl der Ticks (1000) ist ein Kompromiss zwischen der Genauigkeit der Berechnungen und der Geschwindigkeit der Datenverarbeitung. In der Praxis hat sich dies als ausreichend erwiesen, um einen fairen Preis zuverlässig zu ermitteln.
Bei der Arbeit an dem Modul habe ich eine interessante Beobachtung gemacht: Diskrepanzen zwischen dem realen und dem synthetischen Preis treten häufig vor bedeutenden Marktbewegungen auf. Es ist so, als ob intelligentes Geld anfängt, einige Paare zu verschieben, was zu Spannungen im System der Kreuzkurse führt, die sich dann durch eine starke Bewegung entladen.
Natürlich ist das Arbitragemodul kein Zauberstab, aber in Kombination mit der Volumenanalyse und den Wirtschaftsindikatoren bietet es eine zusätzliche Dimension zum Verständnis des Marktes. In zukünftigen Versionen werde ich die Analyse von Korrelationen zwischen Abweichungen in verschiedenen Paaren hinzufügen, und das ist eine ganz andere Geschichte.
Schlussfolgerung
Als ich dieses Projekt begann, hatte ich keine Ahnung, was daraus werden würde. Ich dachte, ich würde einfach Python mit MQL5 verbinden und das wäre alles. Und es stellte sich heraus, dass es eine ganze Handelsplattform war! Jedes Teil davon ist wie ein Detail in einer Schweizer Uhr, und dieser Teil ist nur der erste von vielen Artikeln.
Während der Entwicklung habe ich eine Menge gelernt. Zum Beispiel, dass es beim algorithmischen Handel keine einfachen Wege gibt. Nehmen wir zum Beispiel die Berechnung des Positionsvolumens. Das ist doch nicht so schwierig, oder? Aber wenn man anfängt, alle Risiken und das Marktverhalten zu berücksichtigen, wird einem ganz schwindelig.
Und wie gut die modulare Architektur funktioniert! Fällt ein Modul aus, arbeiten die anderen weiter. Sie können jedes Teil sicher verbessern, ohne zu befürchten, dass das gesamte System zusammenbricht.
Am interessantesten ist es zu beobachten, wie die verschiedenen Teile des Systems zusammenarbeiten. Ein Modul sucht nach Arbitrage, ein anderes überwacht die Volumina, das dritte analysiert die Wirtschaft, während das vierte die Risiken kontrolliert. Gemeinsam sehen sie den Markt auf eine Weise, die keine einzelne Analyse leisten kann.
Natürlich gibt es noch Raum für eine Entwicklung. Ich würde gerne die Nachrichtenanalyse ergänzen, das maschinelle Lernen verbessern und neue Modelle zur Risikobewertung entwickeln. Besonders interessant ist es, an der 3D-Visualisierung des Marktes zu arbeiten – um sich Preis, Volumen und Zeit in einem Raum vorzustellen.
Die wichtigste Lehre aus diesem Projekt ist, dass das Handelssystem lebendig sein sollte. Der Markt bleibt nicht stehen, und das System muss sich mit ihm verändern. Lernen Sie aus Fehlern, finden Sie neue Muster, verwerfen Sie überholte Ansätze.
Ich hoffe, dass meine Erfahrungen für diejenigen, die Handelsalgorithmen entwickeln, nützlich sind. Und denken Sie daran: Diese Reise hat keine Ziellinie. Es gibt nur einen Weg!
Übersetzt aus dem Russischen von MetaQuotes Ltd.
Originalartikel: https://www.mql5.com/ru/articles/16667
Warnung: Alle Rechte sind von MetaQuotes Ltd. vorbehalten. Kopieren oder Vervielfältigen untersagt.
Dieser Artikel wurde von einem Nutzer der Website verfasst und gibt dessen persönliche Meinung wieder. MetaQuotes Ltd übernimmt keine Verantwortung für die Richtigkeit der dargestellten Informationen oder für Folgen, die sich aus der Anwendung der beschriebenen Lösungen, Strategien oder Empfehlungen ergeben.
Neuronale Netze im Handel: Ein Ensemble von Agenten mit Aufmerksamkeitsmechanismen (letzter Teil)
Neuronale Netze im Handel: Ein Ensemble von Agenten mit Aufmerksamkeitsmechanismen (MASAAT)
Trendstärke- und Richtungsindikator auf 3D-Balken
Marktsimulation (Teil 03): Eine Frage der Leistung
- Freie Handelsapplikationen
- Über 8.000 Signale zum Kopieren
- Wirtschaftsnachrichten für die Lage an den Finanzmärkte
Sie stimmen der Website-Richtlinie und den Nutzungsbedingungen zu.