English Русский 中文 Español Deutsch Português
preview
PythonとMQL5で構築するマルチモジュール型取引ロボット(第1回):基本アーキテクチャと最初のモジュールの作成

PythonとMQL5で構築するマルチモジュール型取引ロボット(第1回):基本アーキテクチャと最初のモジュールの作成

MetaTrader 5トレーディングシステム |
87 1
Yevgeniy Koshtenko
Yevgeniy Koshtenko

はじめに

ある日、ふと思いついたのです。現代の市場において、従来の取引ロボットはあまりにも単純すぎるのではないか、と。もっと柔軟で、もっと賢い仕組みが必要だと感じました。

市場は常に変化しています。今日うまく機能する戦略が、明日にはまったく通用しなくなります。私は長い間この現象を観察し、まったく新しいアプローチが必要だと確信しました。そして、あるときふと解決策がひらめいたのです。モジュール化されたシステムを作ればいいのでは?想像してみてください。ひとつのチームの中に、トレンドを監視する専門家、取引量を分析する専門家、リスクを管理する専門家がいる。まさに、現代の取引ロボットはこのように動くべきだと考えました。

使用する技術の選択は明確でした。データ分析にはPythonが最適です。その豊富なライブラリを活用すれば、驚くほど強力な分析が可能です。そして、実際の取引執行はMQL5が担当します。この2つを組み合わせることで、理想的なタッグが生まれました。まず基礎を固めることから始めました。拡張性と進化性を備えたアーキテクチャを構築し、その上でPythonとMQL5の連携を実現しました。データ管理システムは驚くほどシンプルで効果的なものになりました。

そして真の転換点となったのが、非同期処理の導入です。これにより、ロボットは複数の銘柄を同時に監視できるようになり、生産性が飛躍的に向上しました。

何よりも興味深いのは、このシステムが実際の市場で動作しているということです。これは単なる教科書的な例ではなく、実運用可能な取引ツールなのです。もちろん、最初は基本的なバージョンから始めますが、それだけでも十分に印象的です。これから長い旅が始まります。私たちは学習し、適応し続けるシステムを作り上げていきます。少しずつ改良を重ねます。まずは最も重要なこと、つまり堅牢な基盤を築くことから始めます。


システムの基本アーキテクチャ:完璧なバランスを求めて

3年間、私は取引ロボットの開発に苦戦してきました。そして次第に気づいたのです。大切なのはアルゴリズムそのものではなく、それらがどう連携して動くかだということに。この発見がすべてを変えました。

オーケストラを想像してみてください。演奏者全員が優秀でも、指揮者がいなければ音楽は生まれません。私のシステムでは、MarketMakerがその指揮者の役割を担っています。MarketMakerは4つのモジュールを統括し、それぞれのモジュールは自分の役割をよく理解しています。

  • 最初のモジュールは取引量を監視します。いつ、どの価格で取引がおこなわれたかを追跡します。
  • 2つ目のモジュールはアービトラージの機会を探します。
  • 3つ目のモジュールは経済状況を分析します。 
  • 4つ目のモジュールはリスクを制御し、システムが暴走しないよう制御します。

市場は誰も待ってくれません。変化は電光石火の速さです。そのため、すべてのモジュールは同時に動作し、常に互いに情報をやり取りしています。たとえばアービトラージモジュールがチャンスを発見したとします。すると他のモジュールの情報が確認され、最終的な判断が下されます。

最初はエントリーのための厳密なルールを作ろうと考えました。しかし、すぐにそれが現実的ではないことがわかりました。ときには複数の弱いシグナルよりも、ひとつの強いシグナルの方が重要なのです。データの整理にも多くの時間がかかりました。各モジュールはそれぞれ異なる情報を持っています。価格データ、マクロ指標、取引履歴などです。これらを保存し、更新し、共有するために特別な同期システムを作る必要がありました。

皮肉なことに、モジュール同士の独立性が高いほどシステムの動作は安定しました。ひとつのコンポーネントが停止しても、他の部分は動き続けます。とはいえ、障害は起こります。接続が切れたり、価格データが止まったりすることもあります。このアーキテクチャの最大の利点は、拡張性にあります。ニュース分析を追加したいとしても、問題ありません。新しいモジュールを作り、MarketMakerに接続するだけで機能します。

このシステムは生きており、進化しています。完璧ではありませんが、モジュール構造、並列処理、柔軟性という基盤があるおかげで、未来に自信を持って進むことができます。それぞれのコンポーネントについては、今後さらに詳しく紹介していきます


メインシステムクラス

取引ロボットのアーキテクチャについてさまざまなアプローチを試した結果、システムの成功はその中核部分がどれだけうまく構成されているかに大きく左右されることに気づきました。MarketMakerはその理解の結晶であり、これまでに培ってきたアルゴリズムシステム開発のベストプラクティスをすべて取り入れたものです。

まずはクラスの基本構造から見ていきましょう。以下が初期化の部分です。

def __init__(self, pairs: List[str], terminal_path: str, 
             volume: float = 1.0, levels: int = 5, spacing: float = 3.0):
    # Main parameters
    self.pairs = pairs
    self.base_volume = volume
    self.levels = levels
    self.spacing = spacing
    self.magic = 12345
    
    # Trading parameters
    self.portfolio_iterations = 10
    self.leverage = 50
    self.min_profit_pips = 1.0
    self.max_spread_multiplier = 2.0
    
    # Data warehouses
    self.symbols_info = {}
    self.trading_parameters = {}
    self.optimal_horizons = {}

一見すると、とてもシンプルに見えます。しかし、各パラメータの背後にはそれぞれの経緯があります。たとえばportfolio_iterations。このパラメータは、ポジションをあまりにも積極的に開くと流動性の問題が起きることに気づいた後に追加されました。現在では、システムが利用可能なボリュームを複数の部分に分割して処理することで、よりバランスの取れた取引がおこなえるようになっています。

特に重視したのが過去データの初期化です。その仕組みは次のようになっています。

def _initialize_history(self, pair: str):
    """Initializing historical data for a pair"""
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is not None:
            df = pd.DataFrame(rates)
            df['time'] = pd.to_datetime(df['time'], unit='s')
            df.set_index('time', inplace=True)
            returns = np.log(df['close'] / df['close'].shift(1)).dropna()
            self.returns_history[pair] = pd.Series(returns.values, 
                                                 index=df.index[1:])
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

ここで興味深いのは、単純な変化率ではなく対数リターンを使用している点です。これは偶然の選択ではありません。実際に運用してみると、対数リターンの方が統計指標、特にボラティリティを計算する際により安定した結果をもたらすことが分かりました。

最も難しかった部分のひとつが、出来高予測の実装です。数多くの実験を重ねた結果、次のコードが生まれました。

async def update_volume_predictions(self):
    """Updating volume predictions for each pair"""
    for pair in self.pairs:
        try:
            df = volume_model.get_volume_data(
                symbol=pair,
                timeframe=mt5.TIMEFRAME_H1,
                n_bars=100
            )
            
            if pair in self.volume_models:
                feature_columns = [
                    'volume_sma_5', 'volume_sma_20', 'relative_volume', 
                    'volume_change', 'volume_volatility', 'price_sma_5', 
                    'price_sma_20', 'price_change', 'price_volatility',
                    'rsi', 'macd', 'macd_signal', 'bb_upper', 'bb_lower'
                ]
                
                X = df[feature_columns].iloc[-1:].copy()
                prediction = self.volume_models[pair].predict(X)[0]
                current_price = df['close'].iloc[-1]
                predicted_change = (prediction - current_price) / current_price
                
                self.volume_predictions[pair] = predicted_change
                
        except Exception as e:
            logger.error(f"Error updating prediction for {pair}: {e}")

ここで注目してほしいのは、特徴量のセットが単なる指標の寄せ集めではないという点です。それぞれの指標は、慎重なテストを経て少しずつ追加されてきました。たとえばrelative_volumeは、異常な市場活動を検出する際に特に有効であることが確認されています。

そして、ここにシステムの中枢、つまり取引ループがあります。

async def trade_cycle(self):
    """Main trading loop"""
    try:
        await self.update_volume_predictions()
        await self.economic_module.update_forecasts()
        
        all_positions = mt5.positions_get() or []
        open_positions = [pos for pos in all_positions if pos.magic == self.magic]
        
        if open_positions:
            await self.manage_positions()
            return
            
        valid_signals = []
        available_volume = self.calculate_available_volume() * len(self.pairs)
        
        for pair in self.pairs:
            signal = await self.get_combined_signal(pair)
            if signal and self._validate_signal(signal):
                valid_signals.append(signal)
        
        if valid_signals:
            volume_per_trade = available_volume / len(valid_signals)
            for signal in valid_signals:
                signal['adjusted_volume'] = volume_per_trade
                await self.place_order(signal)
                
    except Exception as e:
        logger.error(f"Error in trade cycle: {e}")

このコードは、取引プロセスをどのように整理すべきかについて長く考え抜いた結果です。ループが非同期処理で動作することで、複数の通貨ペアを効率的に同時処理できます。また、予測の更新 → ポジションの確認 → シグナルの探索 → 実行という明確な手順があることで、システムの動作が予測可能になります。

そして、特に注目すべきなのがシグナル検証の仕組みです。

def _validate_signal(self, signal: Dict) -> bool:
    """Trading signal check"""
    spread = signal['spread']
    diff_pips = signal['diff_pips']
    
    # Basic checks
    if spread > self.max_spread_multiplier * diff_pips:
        return False
        
    if diff_pips < self.min_profit_pips:
        return False
        
    # Check economic factors
    if signal['economic_volatility'] > self.volatility_threshold:
        return False
        
    # Check the volume prediction
    if abs(signal['volume_prediction']) < self.min_volume_change:
        return False
        
    return True

ここでおこなわれているすべてのチェックは、実際の取引経験から生まれたものです。たとえば、経済的ボラティリティチェックは、重要なニュース発表時に取引をおこなうと、急激な価格変動によって損失が増えることに気づいた後に追加されました。

最後にお伝えしたいのは、MarketMakerは生きたシステムであり、今も進化を続けているということです。日々の取引の中で新しいアイデアや改善点が生まれ、それらを反映して成長していきます。モジュール型アーキテクチャのおかげで、中核部分を崩さずに改良を加えることが可能です。


データの取り扱い

データの処理は、アルゴリズム取引における最も難しい課題のひとつです。開発の初期段階で、私はある一見単純な疑問に直面しました。「市場情報をどのように整理し、扱えばよいのか?」すぐに、通常のデータベースや単純な配列では不十分だということが分かりました。

最初は、データを受け取るための基本構造を作ることから始まりました。いくつかの試行錯誤を経て、次のメソッドが生まれました。

def _initialize_history(self, pair: str):
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is None:
            logger.error(f"Failed to get history data for {pair}")
            return
            
        df = pd.DataFrame(rates)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        df.set_index('time', inplace=True)
        
        # Calculate logarithmic returns
        returns = np.log(df['close'] / df['close'].shift(1)).dropna()
        
        # Add new metrics
        df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
        df['price_velocity'] = df['close'].diff() / df['time'].diff().dt.total_seconds()
        df['volume_intensity'] = df['tick_volume'] / df['time'].diff().dt.total_seconds()
        
        self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:])
        self.price_data[pair] = df
        
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

ここで興味深いのは、価格変化の「速度」』(price_velocity)や出来高の強さ(volume_intensity)の計算です。これらの指標は最初からあったわけではありません。当初は単純な価格データだけで作業していましたが、すぐに気づきました。市場は単なる価格の連続ではなく、変化の大きさだけでなく、そのスピードも重要な複雑な動的システムだということです。

特に注意を払ったのが、欠損データの取り扱いです。これが、データの検証とクリーニングの仕組みです。

def _validate_and_clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """Validation and data cleaning"""
    if df.empty:
        raise ValueError("Empty dataset received")
        
    # Check gaps
    missing_count = df.isnull().sum()
    if missing_count.any():
        logger.warning(f"Found missing values: {missing_count}")
        
        # Use 'forward fill' for prices
        price_cols = ['open', 'high', 'low', 'close']
        df[price_cols] = df[price_cols].ffill()
        
        # Use interpolation for volumes
        df['tick_volume'] = df['tick_volume'].interpolate(method='linear')
    
    # Check outliers
    for col in ['high', 'low', 'close']:
        zscore = stats.zscore(df[col])
        outliers = abs(zscore) > 3
        if outliers.any():
            logger.warning(f"Found {outliers.sum()} outliers in {col}")
            
            # Replace extreme outliers
            df.loc[outliers, col] = df[col].mean() + 3 * df[col].std() * np.sign(zscore[outliers])
    
    return df

あるとき、たった1つのティックデータの欠損が原因で指標の計算が誤り、結果として誤ったエントリーシグナルが出たことを覚えています。この経験を受けて、データクリーニングシステムは大幅に改善されました。

次に、市場の最も重要な特性のひとつである出来高の取り扱いについて見ていきます。

def analyze_volume_profile(self, pair: str, window: int = 100) -> Dict:
    """Volume profile analysis"""
    try:
        df = self.price_data[pair].copy().last(window)
        
        # Normalize volumes
        volume_mean = df['tick_volume'].rolling(20).mean()
        volume_std = df['tick_volume'].rolling(20).std()
        df['normalized_volume'] = (df['tick_volume'] - volume_mean) / volume_std
        
        # Calculate volume clusters
        price_levels = pd.qcut(df['close'], q=10)
        volume_clusters = df.groupby(price_levels)['tick_volume'].sum()
        
        # Find support/resistance levels by volume
        significant_levels = volume_clusters[volume_clusters > volume_clusters.mean() + volume_clusters.std()]
        
        # Analyze imbalances
        buy_volume = df[df['close'] > df['open']]['tick_volume'].sum()
        sell_volume = df[df['close'] <= df['open']]['tick_volume'].sum()
        volume_imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume)
        
        return {
            'normalized_profile': volume_clusters.to_dict(),
            'significant_levels': significant_levels.index.to_list(),
            'volume_imbalance': volume_imbalance,
            'current_percentile': stats.percentileofscore(df['tick_volume'], df['tick_volume'].iloc[-1])
        }
        
    except Exception as e:
        logger.error(f"Error analyzing volume profile: {e}")
        return None

このコードは、市場のマイクロストラクチャを長期間研究した成果です。特に注目すべきは、買いと売りの出来高の不均衡の計算です。最初はこのテーマを暗号資産市場で研究しました。ただ、MQL5の運営が、暗号取引所、MetaTrader 5、Pythonを統合したコードの公開を許可してくれるかは分かりません。

話が逸れましたが、一見すると、上昇バーと下降バーの出来高を単純に比較するだけでは有益な情報は得られないように思えます。しかし、実際の運用では、この単純な指標がトレンド転換の兆しを示すことが多いことが分かりました。

経済データの扱いは、また別の話です。ここでは、完全な同期システムを作る必要がありました。

async def synchronize_market_data(self):
    """Market data synchronization"""
    while True:
        try:
            # Update basic data
            for pair in self.pairs:
                latest_data = await self._get_latest_ticks(pair)
                if latest_data is not None:
                    self._update_price_data(pair, latest_data)
                    
            # Update derived metrics
            await self._update_derivatives()
            
            # Check data integrity
            self._verify_data_integrity()
            
            await asyncio.sleep(1)  # Dynamic delay
            
        except Exception as e:
            logger.error(f"Error in data synchronization: {e}")
            await asyncio.sleep(5)  # Increased delay on error

ここでの重要なポイントは、データ更新の非同期性です。システムの初期バージョンでは同期リクエストを使用していましたが、多数の通貨ペアを扱う際に遅延が発生しました。非同期モデルへの移行により、生産性は大幅に向上しました。

最後に強調しておきたいのは、データ処理の正しい組織化は単なる技術的問題ではないということです。これは、全取引戦略の基盤であり、きれいで構造化されたデータがあれば、表面的な分析では見えない市場のパターンを把握することができます。


最初のモジュール:ボリューム分析

ボリューム分析モジュールの開発の歴史は、単純な観察から始まりました。従来の指標は価格のみを扱うため、どうしても遅れが生じることが多いのです。しかし市場は価格だけではありません。取引量も重要であり、しばしば価格の動きを予測します。そのため、私たちのシステムの最初のモジュールはボリュームアナライザーとして設計されました。

まずは、基本的なデータ取得関数から見ていきましょう。

def get_volume_data(symbol, timeframe=mt5.TIMEFRAME_H1, n_bars=2000):
    """Getting volume and price data from MT5"""
    try:
        bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, n_bars)
        if bars is None:
            logger.error(f"Failed to get data for {symbol}")
            return None
        
        df = pd.DataFrame(bars)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        return df
        
    except Exception as e:
        logger.error(f"Error getting data for {symbol}: {e}")
        return None

一見すると、この関数はシンプルに見えます。しかし、このシンプルさの背後には重要な判断があります。過去2000本のバーを正確に取得するという決定です。なぜでしょうか。実験の結果、この2000本という長さであれば、高品質なモデルを構築するのに十分であり、同時に、非常に大きなモデルや、バッチシーケンスとして扱う大規模データセットの入力特徴量を扱う場合でも、サーバーメモリに過剰な負荷をかけないことが分かりました。

モジュールの最も興味深い部分は、分析用の特徴量を作成する仕組みです。その仕組みは次のようになっています。

def create_features(df, forecast_periods=None):
    """Create features for the forecasting model"""
    try:
        # Basic volume indicators
        df['volume_sma_5'] = df['tick_volume'].rolling(window=5).mean()
        df['volume_sma_20'] = df['tick_volume'].rolling(window=20).mean()
        df['relative_volume'] = df['tick_volume'] / df['volume_sma_20']
        
        # Volume dynamics
        df['volume_change'] = df['tick_volume'].pct_change()
        df['volume_acceleration'] = df['volume_change'].diff()
        
        # Volume volatility
        df['volume_volatility'] = df['tick_volume'].rolling(window=20).std()
        df['volume_volatility_5'] = df['tick_volume'].rolling(window=5).std()
        df['volume_volatility_ratio'] = df['volume_volatility_5'] / df['volume_volatility']

ここで特に注目すべきは、volume_volatility_ratioです。この指標は、強い値動きの前に、短期の出来高ボラティリティが長期ボラティリティよりも先に増加することが多いという興味深いパターンに気づいた後に追加されました。この指標は、潜在的なエントリーポイントを特定する上で重要なもののひとつとなっています。

一方で、ボリュームプロファイルの計算はまた別の話です。

# Volume profile
        df['volume_percentile'] = df['tick_volume'].rolling(window=100).apply(
            lambda x: pd.Series(x).rank(pct=True).iloc[-1]
        )
        df['volume_density'] = df['tick_volume'] / (df['high'] - df['low'])
        df['volume_density_ma'] = df['volume_density'].rolling(window=20).mean()
        df['cumulative_volume'] = df['tick_volume'].rolling(window=20).sum()
        df['volume_ratio'] = df['tick_volume'] / df['cumulative_volume']

volume_density指標は偶然生まれたものではありません。私は、出来高そのものは誤解を招くことがあると気づきました。重要なのは、どの価格帯でその出来高が集まったかです。狭い価格帯での高出来高は、しばしば重要なサポートやレジスタンスの形成を示しています。

そこで、価格変動の方向性を予測するための特別な関数を開発しました。

def predict_direction(model, X):
    """Price movement direction forecast"""
    try:
        prediction = model.predict(X)[0]
        current_price = X['close'].iloc[-1] if 'close' in X else None
        if current_price is None:
            return 0
            
        # Return 1 for rise, -1 for fall, 0 for neutral
        price_change = (prediction - current_price) / current_price
        if abs(price_change) < 0.0001:  # Minimum change threshold
            return 0
        return 1 if price_change > 0 else -1
        
    except Exception as e:
        logger.error(f"Error predicting direction: {e}")
        return 0

ここで注目すべきは、変化の閾値が0.0001に設定されている点です。これは単なる偶然の数字ではなく、スプレッドや各種手数料を考慮した上で扱える最小の値動きを分析して決定したものです。株式市場の場合は、指標の設定は別途おこなう必要があります。

そして、最後のステージはモデルの学習です。

def train_model(X_train, X_test, y_train, y_test, model_params=None):
    try:
        if model_params is None:
            model_params = {'n_estimators': 400, 'random_state': 42}
            
        model = RandomForestRegressor(**model_params)
        model.fit(X_train, y_train)
        
        # Model evaluation
        train_predictions = model.predict(X_train)
        test_predictions = model.predict(X_test)
        
        train_rmse = np.sqrt(mean_squared_error(y_train, train_predictions))
        test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions))
        test_r2 = r2_score(y_test, test_predictions)

なぜ私がランダムフォレストを400本の木で選んだのか知っていますか。単純な回帰から複雑な構造を持つニューラルネットワークまで、さまざまな手法を試した結果、この方法が最も信頼性が高いと結論づけました。おそらく最も精度が高いわけではありませんが、安定性があります。市場はノイズだらけで不規則ですが、ランダムフォレストはしっかりと耐えてくれます。

もちろん、これは始まりに過ぎません。次に取り組む課題は、これらのシグナルをどのように統合するか、そしてシステムが自動で学習できるようにするかです。次回、その点について詳しく触れます。


リスクマネジメント:資本を守る技術

そして、次に最も重要なこと、つまりリスクについて考えましょう。かっこいい戦略やニューラルネットワークの話を聞くのは面白いですが、私が10年間市場で経験して分かったことはひとつです。リスク管理なしでは、どんな戦略も価値がないということです。どんなに優れた取引アルゴリズムを持っていても、適切なリスク管理がなければ結局損失を出してしまいます。

そのため、私たちのシステムでは、資本の保護を最優先にしています。この保守的なアプローチこそが、他のトレーダーが「完璧な戦略」で損をしている中でも、安定して利益を上げられる理由です。

def calculate_available_volume(self) -> float:
    try:
        account = mt5.account_info()
        if not account:
            return 0.01
            
        # Use balance and free margin
        balance = account.balance
        free_margin = account.margin_free
        
        # Take the minimum value for safety
        available_margin = min(balance, free_margin)
        
        # Calculate the maximum volume taking into account the margin
        margin_ratio = 0.1  # Use only 10% of the available margin
        base_volume = (available_margin * margin_ratio) / 1000
        
        # Limit to maximum volume
        max_volume = min(base_volume, 1.0)  # max 1 lot

ここで注目すべきは、margin_ratio = 0.1である点です。これは単なる偶然の数字ではありません。数か月にわたるテストの結果、利用可能証拠金の10%以上を使うと、市場が大きく動いた際にロスカット(マージンコール)のリスクが大幅に高まることが分かりました。これは特に、複数の通貨ペアを同時に取引する場合に重要です。

次の重要なポイントは、ストップロスとテイクプロフィットの計算です。

async def calculate_position_limits(self, signal: Dict) -> Tuple[float, float]:
    try:
        pair = signal['pair']
        direction = signal['direction']
        
        # Get volatility
        volatility = signal['price_volatility']
        economic_volatility = signal['economic_volatility']
        
        # Base values in pips
        base_sl = 20
        base_tp = 40
        
        # Adjust for volatility
        volatility_factor = 1 + (volatility * 2)
        sl_points = base_sl * volatility_factor
        tp_points = base_tp * volatility_factor
        
        # Take economic volatility into account
        if economic_volatility > 0.5:
            sl_points *= 1.5
            tp_points *= 1.2
            
        # Check minimum distances
        info = self.symbols_info[pair]
        min_stop_level = info.trade_stops_level if hasattr(info, 'trade_stops_level') else 0
        
        return max(sl_points, min_stop_level), max(tp_points, min_stop_level)
        
    except Exception as e:
        logger.error(f"Error calculating position limits: {e}")
        return 20, 40  # return base values in case of an error

volatility_factorにまつわる話は特に興味深いです。当初は固定のストップレベルを使用していましたが、高ボラティリティの期間には、ストップが早すぎたり頻繁に発動したりすることに気づきました。そこで、現在のボラティリティに応じてストップレベルを動的に調整する方法を採用したところ、取引結果は大幅に改善されました。

そして、これがポジション管理システムの全体像です。

async def manage_positions(self):
    """Managing open positions"""
    try:
        positions = mt5.positions_get() or []
        for position in positions:
            if position.magic == self.magic:
                # Check the time in the position
                time_in_trade = datetime.now() - pd.to_datetime(position.time, unit='s')
                
                # Get current market data
                signal = await self.get_combined_signal(position.symbol)
                
                # Check the need to modify the position
                if self._should_modify_position(position, signal, time_in_trade):
                    await self._modify_position(position, signal)
                    
                # Check the closing conditions
                if self._should_close_position(position, signal, time_in_trade):
                    await self.close_position(position)
                    
    except Exception as e:
        logger.error(f"Error managing positions: {e}")

ここで特に注目すべきは、ポジション保有時間です。経験上、ポジションを長く保有するほど、維持するための条件を厳しくする必要があります。これは、時間の経過とともにポジション維持条件を動的に厳しくすることで実現されています。

また、ポジションの部分的なクローズに関する興味深いポイントもあります。

def calculate_partial_close(self, position, profit_threshold: float = 0.5) -> float:
    """Volume calculation for partial closure"""
    try:
        # Check the current profit
        if position.profit <= 0:
            return 0.0
            
        profit_ratio = position.profit / (position.volume * 1000)  # approximate ROI estimate
        
        if profit_ratio >= profit_threshold:
            # Close half of the position when the profit threshold is reached
            return position.volume * 0.5
        return 0.0
        
    except Exception as e:
        logger.error(f"Error calculating partial close: {e}")
        return 0.0

この機能は、取引履歴の分析から生まれました。一定の利益レベルに達した際にポジションを部分的にクローズすることで、全体の取引成績が大幅に改善されることに気づいたのです。これにより、利益の一部を確定させつつ、さらなる上昇の可能性も残すことができます。

最後に強調したいのは、リスク管理システムは生きた有機体であり、常に進化しているということです。失敗した取引や予期せぬ市場変動はすべて、新しい経験として資本保護アルゴリズムの改善に活かされています。次のバージョンでは、リスク管理パラメータの動的最適化のために機械学習を導入し、さらにVaRシステムとマルコヴィッツのポートフォリオ理論のハイブリッドも組み込む予定ですが、それはまた別の話です。


経済モジュール:ファンダメンタル分析と機械学習の融合

取引システムの開発を進める中で、興味深いパターンに気づきました。それは、最も強力なテクニカルシグナルでさえ、ファンダメンタル要因と矛盾すると失敗することがあるということです。この観察がきっかけとなり、経済モジュールを作成しました。このモジュールは、マクロ経済指標を分析し、通貨ペアの値動きに与える影響を評価します。

まずはモジュールの基本構造から見ていきましょう。以下が、主要な経済指標の初期化の例です。

def __init__(self):
    self.indicators = {
        'NY.GDP.MKTP.KD.ZG': 'GDP growth',
        'FP.CPI.TOTL.ZG': 'Inflation',
        'FR.INR.RINR': 'Real interest rate',
        'NE.EXP.GNFS.ZS': 'Exports',
        'NE.IMP.GNFS.ZS': 'Imports',
        'BN.CAB.XOKA.GD.ZS': 'Current account balance',
        'GC.DOD.TOTL.GD.ZS': 'Government debt',
        'SL.UEM.TOTL.ZS': 'Unemployment rate',
        'NY.GNP.PCAP.CD': 'GNI per capita',
        'NY.GDP.PCAP.KD.ZG': 'GDP per capita growth'
    }

これらの指標の選択は偶然ではありません。数千件の取引を分析した結果、通貨ペアの長期トレンドに最も影響を与える指標であることが分かりました。特に注目すべきは、実質金利と通貨の値動きの関係です。この指標の変化は、しばしばトレンド転換に先行します。

そこで、私は経済データを取得するための特別な手法を開発しました。

def fetch_economic_data(self):
    data_frames = []
    for indicator, name in self.indicators.items():
        try:
            data_frame = wbdata.get_dataframe({indicator: name}, country='all')
            data_frames.append(data_frame)
        except Exception as e:
            logger.error(f"Error fetching data for indicator '{indicator}': {e}")

    if data_frames:
        self.economic_data = pd.concat(data_frames, axis=1)
        return self.economic_data

ここで興味深いのは、World Bankのデータを取得するためにwbdataライブラリを使用している点です。さまざまなAPIを試した結果、このソースが最も完全で検証済みのデータを提供することが分かり、採用しました。

特に注力したのが、分析のためのデータ準備です。

def prepare_data(self, symbol_data):
    data = symbol_data.copy()
    data['close_diff'] = data['close'].diff()
    data['close_corr'] = data['close'].rolling(window=30).corr(data['close'].shift(1))

    for indicator in self.indicators.keys():
        if indicator in self.economic_data.columns:
            data[indicator] = self.economic_data[indicator].ffill()

    data.dropna(inplace=True)
    return data

ここで注目すべきは、経済指標に対して「前方補完」を使用している点です。この解決策はすぐに思いついたわけではありません。最初は補間を試しましたが、経済データに関しては、最後に知られている値を使用する方がより正確であることが分かりました。

モジュールの中心となるのは予測システムです。

def forecast(self, symbol, symbol_data):
    if len(symbol_data) < 50:
        return None, None

    X = symbol_data.drop(columns=['close'])
    y = symbol_data['close']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

    model = CatBoostRegressor(iterations=1000, learning_rate=0.1, depth=8, loss_function='RMSE')
    model.fit(X_train, y_train, verbose=False)

CatBoostを機械学習アルゴリズムとして選んだのも偶然ではありません。単純な線形回帰から複雑なニューラルネットワークまでさまざまなモデルを試した結果、経済データの不規則性に最も適応できるのはCatBoostであることが分かりました。

そして、最後のステージは予測結果の解釈です。

def interpret_results(self, symbol):
    forecast = self.forecasts.get(symbol)
    importance_df = self.feature_importances.get(symbol)

    if forecast is None or importance_df is None:
        return f"Insufficient data for interpretation of {symbol}"

    trend = "upward" if forecast[-1] > forecast[0] else "downward"
    volatility = "high" if forecast.std() / forecast.mean() > 0.1 else "low"
    top_feature = importance_df.iloc[0]['feature']

ボラティリティの計算は特に興味深いポイントです。高ボラティリティとみなす閾値0.1は、過去のデータを分析した結果選ばれました。この値により、市場が落ち着いている期間と、変動が激しい期間をうまく区別できます。

モジュールの開発中に、興味深い観察もありました。経済要因はしばしば遅れて作用しますが、その影響はテクニカル要因よりも安定していることが分かりました。そこで、長期の時間足で経済シグナルの重要度を高める重み付けシステムを作りました。

もちろん、経済モジュールは魔法の杖ではなく、すべての市場変動を予測できるわけではありません。しかし、テクニカル分析やボリューム分析と組み合わせることで、市場の動きを理解するための追加の視点を提供します。将来のバージョンでは、ニュースフローの分析や、それが経済指標に与える影響も取り入れる予定ですが、これは別の議題です。


アービトラージモジュール:実際の価格を追い求めて

アービトラージモジュールを作るアイデアは、通貨市場を長期間観察した後に生まれました。興味深いパターンに気づいたのです。通貨ペアの実際の価格は、クロスレートから算出される理論価格からしばしば乖離することがあります。これらの乖離はアービトラージの機会を生み出すだけでなく、将来の価格変動を示す指標としても利用できます

まずは、モジュールの基本構造から見ていきましょう。

class ArbitrageModule:
    def __init__(self, terminal_path: str = "C:/Program Files/RannForex MetaTrader 5/terminal64.exe", max_trades: int = 10):
        self.terminal_path = terminal_path
        self.MAX_OPEN_TRADES = max_trades
        self.symbols = [
            "AUDUSD.ecn", "AUDJPY.ecn", "CADJPY.ecn", "AUDCHF.ecn", "AUDNZD.ecn", 
            "USDCAD.ecn", "USDCHF.ecn", "USDJPY.ecn", "NZDUSD.ecn", "GBPUSD.ecn", 
            "EURUSD.ecn", "CADCHF.ecn", "CHFJPY.ecn", "NZDCAD.ecn", "NZDCHF.ecn", 
            "NZDJPY.ecn", "GBPCAD.ecn", "GBPCHF.ecn", "GBPJPY.ecn", "GBPNZD.ecn", 
            "EURCAD.ecn", "EURCHF.ecn", "EURGBP.ecn", "EURJPY.ecn", "EURNZD.ecn"
        ]

ここで特に注目すべきは、クロスレートを計算するための基軸通貨ペアの構造です。

     self.usd_pairs = {
            "EUR": "EURUSD.ecn",
            "GBP": "GBPUSD.ecn", 
            "AUD": "AUDUSD.ecn",
            "NZD": "NZDUSD.ecn",
            "USD": None,
            "CAD": ("USDCAD.ecn", True),
            "CHF": ("USDCHF.ecn", True),
            "JPY": ("USDJPY.ecn", True)
        }

ここで興味深いのは、一部の通貨ペアが「逆(True)」としてマークされている点です。これは偶然ではありません。CAD、CHF、JPYなど一部の通貨では、ベース通貨はUSです。つまり、USD/XXXであり、XXX/USDではないのです。このニュアンスは、クロスレートを計算する際にしばしば見落とされる重要なポイントです。

そして、このモジュールの中核は、合成価格を計算する関数です。

def calculate_synthetic_prices(self, data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """Calculation of synthetic prices through cross rates"""
    synthetic_prices = {}
    
    try:
        for symbol in self.symbols:
            base = symbol[:3]
            quote = symbol[3:6]
            
            # Calculate the synthetic price using cross rates
            fair_price = self.calculate_cross_rate(base, quote, data)
            synthetic_prices[f'{symbol}_fair'] = pd.Series([fair_price])

このコードの最適化には苦労したことを覚えています。当初は、すべての可能な換算ルートを計算して最適なものを選ぶ方法を試しました。しかし、実際にはUSDを経由して単純に計算する方が、特に高ボラティリティ時に安定した結果が得られることが分かりました。

また、USDに対する為替レートを計算する関数も興味深いポイントです。

def get_usd_rate(self, currency: str, data: dict) -> float:
    """Get exchange rate to USD"""
    if currency == "USD":
        return 1.0
        
    pair_info = self.usd_pairs[currency]
    if isinstance(pair_info, tuple):
        pair, inverse = pair_info
        rate = data[pair]['close'].iloc[-1]
        return 1 / rate if inverse else rate
    else:
        pair = pair_info
        return data[pair]['close'].iloc[-1]

この機能は、クロスレートの計算方法をさまざまに試した長期の実験の末に生まれました。ここでの重要なポイントは、逆ペアの正しい取り扱いです。たった1つのペアで計算を誤るだけで、合成価格に連鎖的な誤差が発生する可能性があります。

そこで、私は実際のデータを正しく処理するための特別な関数を開発しました。

def get_mt5_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
    try:
        timezone = pytz.timezone("Etc/UTC")
        utc_from = datetime.now(timezone) - timedelta(days=1)

        ticks = mt5.copy_ticks_from(symbol, utc_from, count, mt5.COPY_TICKS_ALL)
        if ticks is None:
            logger.error(f"Failed to fetch data for {symbol}")
            return None

        ticks_frame = pd.DataFrame(ticks)
        ticks_frame['time'] = pd.to_datetime(ticks_frame['time'], unit='s')
        return ticks_frame

ティック数(1000)の選択は、計算精度とデータ処理速度のバランスによる妥協点です。実際の運用では、公正価格を信頼性高く算出するのに十分であることが確認されています。

モジュールの開発中に、興味深い観察もありました。実際の価格と合成価格の乖離は、重要な市場変動の前に現れることが多いのです。まるでスマートマネーが一部のペアを動かし、クロスレートシステムに緊張を生み、それが強い値動きによって解放されるかのようです。

もちろん、アービトラージモジュールは魔法の杖ではありません。しかし、ボリューム分析や経済指標と組み合わせることで、市場をより深く理解するためのもう一つの次元を与えてくれます。今後のバージョンでは、異なる通貨ペア間の乖離の相関分析を追加する予定ですが、それはまた別の物語です。


結論

このプロジェクトを始めた当初、まさかここまで発展するとは思っていませんでした。単に「PythonとMQL5を接続するだけ」と考えていたのですが、結果的に1つの完全な取引プラットフォームへと成長しました。それぞれのパーツはまるでスイス製の時計の歯車のように精密で、このプロジェクトはその最初の一章にすぎません。

開発の過程で、私は多くのことを学びました。たとえば、アルゴリズム取引に「簡単な道」は存在しないということです。ポジションボリュームの計算ひとつ取っても、一見単純そうに見えますが、リスクや市場の動きをすべて考慮し始めると、頭が混乱するほど複雑になります。

そして、モジュール型アーキテクチャの素晴らしさを改めて実感しました。どれか1つのモジュールが停止しても、他のモジュールは動き続けます。それぞれを独立して改良でき、システム全体を壊す心配がないのです。

特に興味深いのは、モジュール同士がどのように連携して動くかを観察することです。1つはアービトラージの機会を探し、もう1つはボリュームを監視し、3つ目は経済状況を分析し、四つ目はリスクを管理します。それぞれが異なる役割を持ちながら、全体として1つの「市場の目」を形成しているのです。

もちろん、まだ成長の余地はあります。ニュース分析の追加、機械学習アルゴリズムの改良、リスク評価モデルの高度化など、 取り組みたい課題は数多くあります。特に、価格、出来高、時間を1つの空間で可視化する3D市場分析の開発には強い関心を持っています。

このプロジェクトから得た最大の教訓は、取引システムは生きた存在であるべきだということです。市場は常に変化しており、システムもそれに合わせて進化しなければなりません。失敗から学び、新しいパターンを見つけ、古い手法を捨てていく。

私の経験が、取引アルゴリズムを開発する方々の助けになれば幸いです。そして覚えておいてください。この旅にはゴールはありません。あるのは道だけです。

MetaQuotes Ltdによってロシア語から翻訳されました。
元の記事: https://www.mql5.com/ru/articles/16667

添付されたファイル |
arbitrage_mt5.py (5.45 KB)
最後のコメント | ディスカッションに移動 (1)
linfo2
linfo2 | 29 8月 2025 において 04:06
ありがとうございます , pythonを学ぼうとしています , あなたのarbitrage_mt5はコンパイルされません AttributeError: 'ArbitrageModule' object has no attribute 'run' , ここで意図されているものは何ですか?
ブラックホールアルゴリズム(BHA) ブラックホールアルゴリズム(BHA)
ブラックホールアルゴリズム(BHA)は、ブラックホールの重力原理に着想を得た最適化アルゴリズムです。本記事では、BHAがどのようにして優れた解を引き寄せ、局所最適解への陥り込みを回避するのか、そしてなぜこのアルゴリズムが複雑な問題を解くための強力なツールとなっているのかを解説します。シンプルな発想がいかにして最適化の世界で大きな成果を生み出すのかを見ていきましょう。
取引におけるトレンド基準 取引におけるトレンド基準
トレンドは多くの取引戦略において重要な要素です。本記事では、トレンドを識別するために使用されるいくつかのツールとその特性にを見ていきます。トレンドを理解し正しく解釈することは、取引効率を大幅に高め、リスクを最小限に抑えることにつながります。
取引におけるニューラルネットワーク:予測符号化を備えたハイブリッド取引フレームワーク(最終回) 取引におけるニューラルネットワーク:予測符号化を備えたハイブリッド取引フレームワーク(最終回)
予測符号化と強化学習アルゴリズムを組み合わせた金融時系列分析用のハイブリッド取引システム「StockFormer」の検討を引き続きおこないます。本システムは、複雑なパターンや資産間の相互依存関係を捉えることを可能にするDiversified Multi-Head Attention (DMH-Attn)機構を備えた、3つのTransformerブランチに基づいています。前回は、フレームワークの理論的な側面に触れ、DMH-Attn機構を実装しました。今回は、モデルのアーキテクチャと学習について解説します。
人工部族アルゴリズム(ATA) 人工部族アルゴリズム(ATA)
本記事では、状況に応じて適応的に動作する独自の二重行動システムを備えた進化的手法、人工部族アルゴリズム(ATA: Artificial Tribe Algorithm)の主要要素と革新点について、詳細に説明します。ATAは、個体学習と社会的学習を組み合わせ、探索には交叉を用い、局所最適に陥った際には移動によって新たな解を探索するためのアルゴリズムです。