English
preview
MetaTrader 5機械学習の設計図(第5回):逐次ブートストラップ - ラベルのバイアス除去とリターンの向上

MetaTrader 5機械学習の設計図(第5回):逐次ブートストラップ - ラベルのバイアス除去とリターンの向上

MetaTrader 5トレーディング |
17 0
Patrick Murimi Njoroge
Patrick Murimi Njoroge

はじめに

本記事では、逐次ブートストラップを紹介します。これは、金融機械学習におけるラベルの同時発生問題を根本から解決するための原理的なサンプリング手法です。サンプリング後に冗長性を補正するのではなく、サンプリング過程そのものにおいて冗長性を能動的に防止します。時間的な重複に応じて抽出確率を動的に調整することで、観測値間の独立性が最大化されたブートストラップ標本を構築します。

本記事では、以下をおこなう方法を説明します。

  • 金融分野における標準的なブートストラップの根本的な限界を理解する
  • 逐次ブートストラップのアルゴリズムを基本原理から実装する
  • モンテカルロシミュレーションによって有効性を検証する
  • 完全な金融機械学習パイプラインに統合する
  • 実際の取引戦略における性能向上を評価する

前提知識

本記事では、本連載で以前に取り上げた金融機械学習におけるラベルの同時発生問題およびトリプルバリア法の理解を前提としています。また、Pythonの機械学習ライブラリに関する実践的な知識があると、本記事の内容をより効果的に活用できます。

本連載に関連するすべてのソースコードは、私のGitHubで公開しています。



従来の統計学においてブートストラップが有効な理由

ブートストラップ法は、1979年にブラッドリー・エフロンによって導入された、統計的推測における最も強力な手法の一つです。その本質的な強みはシンプルさにあります。ある統計量のサンプル分布を推定するために、データを復元抽出で繰り返しリサンプルし、それぞれのサンプルで統計量を計算するだけです。

この方法は、データが独立同分布(IID)である場合に非常にうまく機能します。医療研究、農業実験、製造業の品質管理などでは、各観測値が通常は独立した事象を表します。異なる患者から採取した血液サンプルは独立した情報を含みますし、異なる区画の作物収量も独立した生育条件を反映しています。

2/3ルール:バグではなく隠れた特性


標準的なブートストラップには、多くの実務家が見落としがちな興味深い数学的性質があります。観測値がI個あるとき、それらから復元抽出でI回サンプリングすると、各ブートストラップ標本には元の観測値のおよそ2/3が含まれます。

なぜこのような現象が起きるのでしょうか。

簡単な思考実験

1から100まで番号が振られたボールが100個入った袋を想像してください。次の操作をおこないます。

  1. ランダムにボールを選びます。
  2. ボールの番号を書き留めます。
  3. ボールを袋に戻します(これが重要です)。
  4. 1~3を100回繰り返します。

質問:100回引いた後、少なくとも一度は見た異なるボールは何個でしょうか。

直感で「100回引いたのだから、全部かもしれない」と思うかもしれません。しかし、毎回ボールを戻しているため、同じボールを何度も引く一方で、一度も選ばれないボールも出てきます。

数学的な説明


特定の1個、たとえば「42番のボール」を考えます。

1回の抽出で:

  • 42番を引く確率 = 1/100
  • 42番を引かない確率 = 99/100

100回の抽出後:

  • 42番を一度も引かない確率 = (99/100)100 ≈ 0.366

つまり、42番が少なくとも一度選ばれる確率は約63.4%です。

この性質はサンプルサイズに依存しません。

ボールの数 抽出回数 各ボールが見られる確率
10 10 (9/10)10 ≈ 0.651
100 100 (99/100)100 ≈ 0.634
1,000 1,000 (999/1000)1000 ≈ 0.632
10,000 10,000 (9999/10000)10000 ≈ 0.632

この割合は1 - e -1 ≈ 0.632に収束します。ここでe ≈ 2.71828はネイピア数です。

なぜこの数になるのか:ネイピア数

正確な値は1 - e-1です。Iが大きくなるにつれて、各ボールが選ばれない確率はより細かく分割され、その極限として指数関数の形が現れます。これは連続複利の計算にも現れる同じ数学的構造です。

覚えておくべき簡単なルール:

復元抽出で、アイテム数と同じ回数だけサンプリングすると、約63%(およそ2/3)のアイテムが少なくとも一度選ばれます。

言い換えると、標準的なブートストラップでは、各反復ごとに約37%のデータが自然にサンプルに含まれません。従来の統計学では、これはまったく問題ではなく、むしろ分散推定に役立つ特性です。しかし金融の文脈では、これがラベルの同時発生問題と組み合わさることで、深刻な影響をもたらします。


なぜ金融分野で致命的になるのか

2/3ルールは、各観測値が独立した情報を含んでいることを前提としています。しかし、トリプルバリア法を用いた金融機械学習では、時間的重複のためにこの前提が大きく崩れます。

第3回の血液サンプルの例えを思い出してください。検査室で誰かが各試験管の血液を、その右隣の9本の試験管にもこぼしてしまったと想像してください。チューブ10には患者10の血液だけでなく、患者1~9の血液も混ざっているという状況です。この「汚染された」試験管から復元抽出でサンプリングしているのと同じ状況になります。

以下は、問題が複合的に悪化する構造です。

  1. 標準ブートストラップは約63%の観測値しかサンプリングしません。
  2. しかし、各観測値は他の観測値と時間的に重複しています。
  3. 実質的な独立情報は63%よりはるかに少なくなります。
  4. モデルは同じパターンを1つのブートストラップ標本内で何度も学習します。
  5. 分散推定が信頼できなくなり、ブートストラップの本来の目的が損なわれます。

トリプルバリア法の具体例

シナリオ:ボラティリティの高い期間における100件の取引観測値

  • 各ポジションの平均保有時間:4時間(トリプルバリアのエグジットまで)
  • 新規エントリーは15分ごと
  • その結果、常に約16個のポジションが同時進行

標準ブートストラップ結果

  • 約63件の観測値を抽出
  • しかし重複のため、実質的な独立情報は63 ÷ 16 ≈ 4件程度
  • つまり、63件で学習しているつもりでも、実際には独立な事象4件分程度の情報しかない

これが、標準ブートストラップで学習した金融モデルに次のような特徴が見られる理由です。

  • 不自然に低い学習誤差(同じパターンを16回繰り返し学習しているため)
  • ブートストラップ反復間での高い分散(同じ事象の「コピー」をランダムに異なる組み合わせで取得しているだけ)
  • アウトオブサンプル性能の悪化(実際の市場ではパターンの出現頻度がはるかに低いため)


逐次ブートストラップ:解決策

逐次ブートストラップは、サンプリングの考え方そのものを根本から再設計する手法です。すべての観測値を等確率で抽出するのではなく、現在のサンプルに対して新しい情報をどれだけ追加できるかに応じて、抽出確率を動的に調整します。

概念的基盤

核心となる洞察は、観測値がブートストラップ標本にもたらす価値は、すでに何が選ばれているかによって決まるということです。

たとえば、サンプルにすでに「月曜日9:00〜11:00」のデータが含まれている場合、「月曜日10:00〜12:00」の観測値はあまり新しい情報を追加しません。一方で、「火曜日の午後」の観測値は、非常に価値が高いと言えます。

逐次ブートストラップは、この直感を次の3ステップで実装します(各抽出ごとに繰り返す)。

  1. 現在の状態を評価する:すでにサンプルに含まれている時間帯を特定します。
  2. ユニーク性を計算する:残っている各観測値について、どれだけ新しい情報を追加できるかを評価します。
  3. 確率を調整する:ユニーク性に比例した確率で次の観測値を抽出します。

数学的定式化

この直感を定式化してみましょう。ラベル集合{y[i]i=1,2,3を考えます。ここで、

  • ラベルy[1]はリターンr[0,3]に基づく
  • ラベルy[2]はリターンr[2,4]に基づく
  • ラベルy[3]はリターンr[4,6]に基づく

ここで、行はラベル付けに使用されたリターンのインデックスを表し、列は各観測値を表します。結果の重複関係は、以下のインジケーター行列によって特徴づけられます。

 時刻  観測値1 観測値2 観測値3
 1 1 0 0
 2 1 0 0
 3 1 1 0
 4 0 1 0
 5 0 0 1
 6 0 0 1

平均ユニーク性ū[i]は次のように計算されます。

ū[i] = (1/L[i]) × Σ(t ∈ T[i]) [1 / c[t]]

ここで

  • L[i] = 観測値iがまたがる時点(バー)の数
  • T[i] = 観測値iがアクティブな時間区間の集合
  • c[t] =時刻tにアクティブな観測値数(すでにサンプルに含まれている観測値と、候補である観測値i
    を含む)

観測値iが選ばれる確率は次のように定義されます。

P(select i) = ū[i] / Σ(j ∈ candidates) ū[j]

これにより、次の性質が保証されます。

  • 重複のない観測値は最も高い確率を得る
  • サンプルと大きく重複する観測値は最も低い確率を得る
  • 確率の総和は常に1(有効な確率分布)

数値例での説明

先ほどのインジケータ行列を用いて、具体例を見ていきます。

ステップ1:最初の抽出

初期状態では、まだ何も選ばれていないため、すべて等確率です。

P(観測値1) = P(観測値2) = P(観測値3) = 1/3 ≈ 33.3%

結果:観測値2がランダムに選択される
現在のサンプル:φ¹ = {2}

ステップ2:2回目の抽出 - ユニーク性の計算

観測値2がサンプルに含まれている状態で、各観測値のユニーク性を計算します。

観測値1:

  • アクティブな時間帯:{1, 2, 3}
  • 時間1:c[1] = 1 (観測値1のみ)、ユニーク性 = 1/1 = 1.0
  • 時間2:c[2] = 1 (観測値1のみ)、ユニーク性 = 1/1 = 1.0
  • 時間3: c[3] = 2 (観測値1 + 観測値2)、ユニーク性 = 1/2 = 0.5
  • 平均ユニーク性:(1.0 + 1.0 + 0.5)/3 = 2.5/3 = 5/6 ≈ 0.833

観測値2:

  • アクティブな時間帯:{3, 4}
  • 時刻3:c[3] = 2 (観測値2 + 自身)、ユニーク性 = 1/2 = 0.5
  • 時間4:c[4] = 1 (観測値2のみ)、ユニーク性 = 1/1 = 1.0
  • 平均ユニーク性:(0.5 + 1.0)/2 = 1.5/2 = 3/6 = 0.5

観測値3:

  • アクティブな時間帯:{5, 6}
  • 時間5:c[5] = 1 (観測値3のみ)、ユニーク性 = 1/1 = 1.0
  • 時間6:c[6] = 1 (観測値3のみ)、ユニーク性 = 1/1 = 1.0
  • 平均ユニーク性:(1.0 + 1.0)/2 = 2.0/2 = 6/6 = 1.0

抽出確率の計算

  • ユニーク性の合計:5/6 + 3/6 + 6/6 = 14/6
  • P(観測値1) = (5/6) / (14/6) = 5/14 ≈ 35.7%
  • P(観測値2) = (3/6) / (14/6) = 3/14 ≈ 21.4% ← 最低値(すでに選択済み)
  • P(観測値3) = (6/6) / (14/6) = 6/14 ≈ 42.9% ← 最高値(重複なし)

結果:観測値3が選択される
現在のサンプル:φ² = {2, 3}

ステップ3:3回目の抽出 - ユニーク性の計算

観測値2と3は互いに重複していないため、重複構造は変わりません。

  • P(観測値1) = 5/14 ≈ 35.7%
  • P(観測値2) = 3/14 ≈ 21.4%
  • P(観測値3) = 6/14 ≈ 42.9%

以下は、抽出確率のまとめです。

抽出 観測値1 観測値2 観測値3 選択
1 1/3 (33.3%) 1/3 (33.3%) 1/3 (33.3%) 2
2 5/14 (35.7%) 3/14 (21.4%) 6/14 (42.9%) 3
3 5/14 (35.7%) 3/14 (21.4%) 6/14 (42.9%) ?

この例から分かる重要なポイント

  1. 最も低い確率は、すでに選択された観測値に割り当てられます(観測値2は33.3%から21.4%に低下)。
  2. 最も高い確率は、重複が0の観測値に割り当てられます(観測値3は42.9%に上昇)。
  3. 部分的に重複する観測値は中間的な重みを持ちます(観測値1は35.7%にやや上昇)。
  4. この手法は、サンプルの多様性を維持しながら冗長性を効果的に抑制することができます。


実装

コアアルゴリズム


実装には、主に2つの関数が必要です。1つはインジケーター行列を計算する関数、もう1つは逐次サンプリングを実行する関数です。

関数1:インジケーター行列の構築

def get_ind_matrix(bar_index, t1):
    """
    Build an indicator matrix showing which observations are active at each time.
    
    :param bar_index: (pd.Index) Complete time index (all bars)
    :param t1: (pd.Series) End time for each observation (index = start time, value = end time)
    :return: (pd.DataFrame) Indicator matrix where ind_matrix[t, i] = 1 if obs i is active at time t
    """
    ind_matrix = pd.DataFrame(0, index=bar_index, columns=range(t1.shape[0]))
    
    for i, (t_in, t_out) in enumerate(t1.items()):
        # Mark all times from t_in to t_out as active for observation i
        ind_matrix.loc[t_in:t_out, i] = 1.0
    
    return ind_matrix

関数2:平均ユニーク性計算

def get_avg_uniqueness(ind_matrix):
    """
    Calculate average uniqueness for each observation.
    
    Average uniqueness of observation i = mean of (1/c[t]) across all times t where i is active,
    where c[t] is the number of observations active at time t.
    
    :param ind_matrix: (pd.DataFrame) Indicator matrix from get_ind_matrix
    :return: (pd.Series) Average uniqueness for each observation
    """
    # Count how many observations are active at each time (row sums)
    concurrency = ind_matrix.sum(axis=1)
    
    # Calculate uniqueness: 1/concurrency for each observation at each time
    # Replace concurrency with NaN where ind_matrix is 0 (observation not active)
    uniqueness = ind_matrix.div(concurrency, axis=0)
    
    # Average uniqueness across all times where observation is active
    avg_uniqueness = uniqueness[uniqueness > 0].mean(axis=0)
    
    return avg_uniqueness

関数3:逐次ブートストラップサンプラー

def seq_bootstrap(ind_matrix, sample_length=None):
    """
    Generate a bootstrap sample using sequential bootstrap method.
    
    :param ind_matrix: (pd.DataFrame) Indicator matrix from get_ind_matrix
    :param sample_length: (int) Length of bootstrap sample. If None, uses len(ind_matrix.columns)
    :return: (list) Indices of selected observations
    """
    if sample_length is None:
        sample_length = ind_matrix.shape[1]
    
    phi = []  # Bootstrap sample (list of selected observation indices)
    
    while len(phi) < sample_length:
        # Calculate average uniqueness for each observation
        avg_u = pd.Series(dtype=float)
        
        for i in ind_matrix.columns:
            # Create temporary indicator matrix with current sample + candidate i
            ind_matrix_temp = ind_matrix[phi + [i]]
            avg_u.loc[i] = get_avg_uniqueness(ind_matrix_temp).iloc[-1]
        
        # Convert uniqueness to probabilities
        prob = avg_u / avg_u.sum()
        
        # Draw next observation according to probabilities
        selected = np.random.choice(ind_matrix.columns, p=prob)
        phi.append(selected)
    
    return phi

計算効率に関する考慮事項

上記の実装は概念的には明確ですが、大規模データセットに対しては計算コストが非常に高くなります。各反復で残りすべての観測値についてユニーク性を再計算するため、計算量はO(n³)に達する可能性があります。

実稼働システムでは、いくつかの最適化が重要です。

  1. 増分更新:インジケータ行列全体を毎回再計算するのではなく、観測値がサンプルに追加されるたびに同時発生数のカウントを更新するランニング変数を維持します。
  2. 事前計算:観測値間のペアワイズ重複を最初に一度だけ計算し、サンプリング中は単純な参照操作で利用します。
  3. 並列化:複数のブートストラップ標本は独立に生成できるため、並列処理によって大幅な高速化が可能です。
  4. スパース行列演算:金融時系列では同時性が限定的な場合が多いため、スパース構造を利用することでメモリ効率と計算効率を向上できます。

期待される結果

図1は、標準ブートストラップ(左)と逐次ブートストラップ(右)におけるユニーク性のヒストグラムを示しています。標準法の平均ユニーク性の中央値は0.6、逐次法の平均ユニーク性の中央値は0.7です。平均値の差に対してANOVA検定をおこなうと、ほぼ0に近い確率(極めて小さいp値)が得られます。統計的に言えば、逐次ブートストラップ法から得られるサンプルの期待ユニーク性は、合理的な信頼水準において、標準ブートストラップ法を有意に上回ります。独自にモンテカルロシミュレーションを実行したい場合は、添付のbootstrap_mc.pyを参照してください。

標準ブートストラップと逐次ブートストラップのモンテカルロ実験

図1:標準ブートストラップと逐次ブートストラップのモンテカルロ実験


最適化された実装

このセクションでは、インジケーター行列を使った従来実装よりも、インデックスの平坦化 + Numbaによる高速サンプリングを組み合わせた最適化実装が優れている理由を説明します。 記事内で紹介されたメモリ使用量と計算複雑度の比較でも、その優位性が裏付けられています。

ステップ1:get_active_indices - スパースマッピング vs. 密なインジケーター行列

目的:各イベントの 開始/終了時刻を、サンプルがカバーするバーのインデックス一覧に変換します。

優れている理由:密なn × T行列を作る代わりに、非0エントリだけ(サンプルが実際に触れるバーのインデックス)を保持します。これにより、メモリ使用量がO(n·T)からO(イベント長の合計)に削減します。現実的な金融時系列では、この合計はnに対してほぼ線形です。

実務上の利点:サンプルのカバレッジをスキャンする際にメモリ使用量が大幅に削減され、キャッシュ効率も向上します。これらの改善によって、最適化された分析で報告されている圧縮率やメモリ削減が実現されます。

def get_active_indices(samples_info_sets, price_bars_index):
    """
    Build an indicator mapping from each sample to the bar indices it influences.

    Args:
        samples_info_sets (pd.Series):
            Triple-barrier events (t1) returned by labeling.get_events.
            Index: start times (t0) as pd.DatetimeIndex.
            Values: end times (t1) as pd.Timestamp (or NaT for open events).
        price_bars_index (pd.DatetimeIndex or array-like):
            Sorted bar timestamps (pd.DatetimeIndex or array-like). Will be converted to
            np.int64 timestamps for internal processing.

    Returns:
        dict:
            Standard Python dictionary mapping sample_id (int) to a numpy.ndarray of
            bar indices (dtype=int64). Example: {0: array([0,1,2], dtype=int64), 1: array([], dtype=int64), ...}
    """
    t0 = samples_info_sets.index
    t1 = samples_info_sets.values
    n = len(samples_info_sets)
    active_indices = {}

    # precompute searchsorted positions to restrict scanning range
    starts = np.searchsorted(price_bars_index, t0, side="left")
    ends = np.searchsorted(price_bars_index, t1, side="right")  # exclusive

    for sample_id in range(n):
        s = starts[sample_id]
        e = ends[sample_id]
        if e > s:
            active_indices[sample_id] = np.arange(s, e, dtype=int)
        else:
            active_indices[sample_id] = np.empty(0, dtype=int)

    return active_indices

ステップ2:pack_active_indices - スループット向けの連続フラットレイアウト

目的:辞書や配列のリスト形式flat_indices、offsets、lengths、sample_idsに変換します。

優れている理由:連続した配列にすることで、Pythonオブジェクトのオーバーヘッドを排除し、メモリを線形に効率よく走査できるようになります。offsetsにより、各サンプルのスライスへのアクセスがO(1)で可能になり、ループごとにリストをインデックスする必要がなくなります。この「ラグド構造からフラット構造への変換」は、NumbaやJITコンパイルされたループでの効率的な処理に必須です。

実務上の利点:Numbaがメモリを連続して走査できるため、CPUのプリフェッチ効率が向上し、Pythonレベルでの繰り返し操作やサンプルごとの配列オブジェクトを使う場合に比べて、インタプリタオーバーヘッドを大幅に削減できます。

def pack_active_indices(active_indices):
    """
    Convert dict/list-of-arrays active_indices into flattened arrays and offsets.

    Args:
        active_indices (dict or list): mapping sample_id -> 1D ndarray of bar indices

    Returns:
        flat_indices (ndarray int64): concatenated bar indices for all samples
        offsets (ndarray int64): start index in flat_indices for each sample (len = n+1)
        lengths (ndarray int64): number of indices per sample (len = n)
        sample_ids (list): list of sample ids in the order used to pack data
    """
    # Preserve sample id ordering to allow mapping between chosen index and original id
    if isinstance(active_indices, dict):
        sample_ids = list(active_indices.keys())
        values = [active_indices[sid] for sid in sample_ids]
    else:
        # assume list-like ordered by sample id 0..n-1
        sample_ids = list(range(len(active_indices)))
        values = list(active_indices)

    lengths = np.array([v.size for v in values], dtype=np.int64)
    offsets = np.empty(len(values) + 1, dtype=np.int64)
    offsets[0] = 0
    offsets[1:] = np.cumsum(lengths)

    total = int(offsets[-1])
    if total == 0:
        flat_indices = np.empty(0, dtype=np.int64)
    else:
        flat_indices = np.empty(total, dtype=np.int64)
        pos = 0
        for v in values:
            l = v.size
            if l:
                flat_indices[pos : pos + l] = v
            pos += l

    return flat_indices, offsets, lengths, sample_ids

ステップ3:_compute_scores_flat/_normalize_to_prob - 局所的、増分的スコア計算

目的:score = (1 / (1 + concurrency)).mean()という式を用いて各サンプルのスコアを計算し、それを確率ベクトルへ正規化します。

優れている理由:スコア計算は、各サンプルが実際にカバーしているバーのみ(flat_indices内の該当スライス)と、現在の同時発生数カウントだけを用いておこなわれます。そのため、1サンプルあたりの計算コストはイベント長kに比例し、全時間幅Tには依存しません。結果として、1回のフルパスあたりの計算量はO(n·T)ではなくO(n·k)となります。

実務上の利点:epsによる下限値の設定と決定論的な正規化により数値的安定性が確保され、合計が0となるケースを防ぎます。これにより、同時発生数が高くなった場合でもサンプリング分布が崩れにくくなります。

@njit
def _compute_scores_flat(flat_indices, offsets, lengths, concurrency):
    """
    Compute average uniqueness for each sample using flattened indices.

    This follows de Prado's approach: for each bar in a sample, compute uniqueness as 1/(c+1),
    then average across all bars in that sample.

    Args:
        flat_indices (ndarray int64): concatenated indices
        offsets (ndarray int64): start positions (len = n+1)
        lengths (ndarray int64): counts per sample
        concurrency (ndarray int64): current concurrency counts per bar

    Returns:
        scores (ndarray float64): average uniqueness per sample
    """
    n = offsets.shape[0] - 1
    scores = np.empty(n, dtype=np.float64)

    for i in range(n):
        s = offsets[i]
        e = offsets[i + 1]
        length = lengths[i]

        if length == 0:
            # If a sample covers no bars, assign zero average uniqueness
            scores[i] = 0.0
        else:
            # Compute uniqueness = 1/(c+1) for each bar, then average
            sum_uniqueness = 0.0
            for k in range(s, e):
                bar = flat_indices[k]
                c = concurrency[bar]
                uniqueness = 1.0 / (c + 1.0)
                sum_uniqueness += uniqueness
            avg_uniqueness = sum_uniqueness / length
            scores[i] = avg_uniqueness

    return scores
@njit
def _normalize_to_prob(scores):
    """
    Normalize non-negative scores to a probability vector. If all zero, return uniform.
    """
    n = scores.shape[0]
    total = 0.0
    for i in range(n):
        total += scores[i]

    prob = np.empty(n, dtype=np.float64)
    if total == 0.0:
        # fallback to uniform distribution
        uni = 1.0 / n
        for i in range(n):
            prob[i] = uni
    else:
        for i in range(n):
            prob[i] = scores[i] / total
    return prob

ステップ4:_increment_concurrency_flat- concurrencyのインプレース更新

目的:選択されたサンプルがカバーする各バーについて、concurrency[bar]を増分します。

優れている理由:影響を受けるバーのみを更新し、大きな配列を再スキャンしたり再集計したりしません。密な行列アプローチでは、多くの行の合計を再計算したり、一時的な構造を生成したりする必要が生じる可能性がありますが、ここでは更新は局所的におこなわれ、計算量はO(k)です。

実務上の利点:低コストな増分更新により、各抽出後にサンプラーをオンラインで適応させることができ、高価な全体再計算をおこなうことなく、多数のブートストラップ抽出を効率的に実行できます。

@njit
def _increment_concurrency_flat(flat_indices, offsets, chosen, concurrency):
    """
    Increment concurrency for the bars covered by sample `chosen`.
    """
    s = offsets[chosen]
    e = offsets[chosen + 1]
    for k in range(s, e):
        bar = flat_indices[k]
        concurrency[bar] += 1

ステップ5:_seq_bootstrap_loopseq_bootstrap_optimized - 再現可能なRNGを使用した完全なNumbaアクセラレーション

目的:Python側で事前に生成した一様乱数を用い、再現可能な乱数生成器(RNG)を維持しながら、逐次ブートストラップのループ全体をNumba内で実行し、あわせて、メモリ効率の高いフラットレイアウトを使用します。

優れている理由:

  • 反復ごとのPythonオーバーヘッドを排除:スコア計算、CDFによる選択、concurrencyの更新をJITコンパイルされた関数内で実行することで、大規模処理時に支配的となるインタープリタのコンテキスト切り替えコストを除去します。
  • 再現性を維持:乱数生成はPython(NumPyのRandomState)で管理し、その一様乱数をJITコンパイルされた関数内のループに渡します。これにより、シードの再現性と高速な内部ループを両立します。
  • O(n·k)の計算量を小さな定数因子で実現:標準アルゴリズムで必要となるインジケーター行列の繰り返し再計算や、O(n²)以上になり得るスキャン処理と比較して効率的です。

実務上の利点:フラットレイアウトと完全なJIT化を組み合わせることで、速度およびメモリ使用量の大幅な改善が得られます。これにより、本番環境のワークフローにおいて、数万から数十万規模のサンプルに対して逐次ブートストラップを実行することが可能になります。

@njit
def _choose_index_from_cdf(prob, u):
    """
    Convert a uniform random number u in [0,1) to an index using the cumulative distribution.

    This avoids calling numpy.choice inside numba and is efficient.
    """
    n = prob.shape[0]
    cum = 0.0
    for i in range(n):
        cum += prob[i]
        if u < cum:
            return i
    # numerical fallback: return last index
    return n - 1
@njit
def _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms):
    """
    Njitted sequential bootstrap loop.

    Args:
        flat_indices, offsets, lengths: flattened index layout
        concurrency (ndarray int64): initial concurrency vector (will be mutated)
        uniforms (ndarray float64): pre-drawn uniform random numbers in [0,1), length = sample_length

    Returns:
        chosen_indices (ndarray int64): sequence of chosen sample indices (positions in packed order)
    """
    sample_length = uniforms.shape[0]
    chosen_indices = np.empty(sample_length, dtype=np.int64)

    for it in range(sample_length):
        # compute scores and probabilities given current concurrency
        scores = _compute_scores_flat(flat_indices, offsets, lengths, concurrency)
        prob = _normalize_to_prob(scores)

        # map uniform to a sample index
        u = uniforms[it]
        idx = _choose_index_from_cdf(prob, u)
        chosen_indices[it] = idx

        # update concurrency for selected sample
        _increment_concurrency_flat(flat_indices, offsets, idx, concurrency)

    return chosen_indices
def seq_bootstrap_optimized(active_indices, sample_length=None, random_seed=None):
    """
    End-to-end sequential bootstrap using flattened arrays + Numba.

    Implements the sequential bootstrap as described in de Prado's "Advances in Financial
    Machine Learning" Chapter 4: average uniqueness per sample where uniqueness per bar
    is 1/(concurrency+1).

    Args:
        active_indices (dict or list): mapping sample id -> ndarray of bar indices
        sample_length (int or None): requested number of draws; defaults to number of samples
        random_seed (int, RandomState, or None): seed controlling the pre-drawn uniforms

    Returns:
        phi (list): list of chosen original sample ids (length = sample_length)
    """
    # Pack into contiguous arrays and keep mapping from packed index -> original sample id
    flat_indices, offsets, lengths, sample_ids = pack_active_indices(active_indices)
    n_samples = offsets.shape[0] - 1

    if sample_length is None:
        sample_length = n_samples

    # Concurrency vector length: bars are indices into price-bar positions.
    # When there are no bars (flat_indices empty), create an empty concurrency of length 0.
    if flat_indices.size == 0:
        T = 0
    else:
        # max bar index + 1 (bars are zero-based indices)
        T = int(flat_indices.max()) + 1

    concurrency = np.zeros(T, dtype=np.int64)

    # Prepare reproducible uniforms. Accept either integer seed or RandomState.
    if random_seed is None:
        rng = np.random.RandomState()
    elif isinstance(random_seed, np.random.RandomState):
        rng = random_seed
    else:
        try:
            rng = np.random.RandomState(int(random_seed))
        except (ValueError, TypeError):
            rng = np.random.RandomState()

    # Pre-draw uniforms in Python and pass them into njit function (numba cannot accept RandomState)
    uniforms = rng.random_sample(sample_length).astype(np.float64)

    # Run njit loop (this mutates concurrency but we don't need concurrency afterwards)
    chosen_packed = _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms)

    # Map packed indices back to original sample ids
    phi = [sample_ids[int(i)] for i in chosen_packed.tolist()]

    return phi

計算量およびデプロイ上の含意

  • メモリ:最適化アプローチは、メモリ使用量の増加を二次的な成長からnに対してほぼ線形な成長へと抑えます。これにより、従来は実行不可能だった問題サイズが、実用的に扱える規模へと変わります。
  • 計算時間:1回の抽出のコストは、時間グリッド全体Tではなく、平均イベント長kに比例します。1回あたりの計算量はO(n·k)であり、n個のサンプルを生成する全体の計算量はO(n²·k)となります。これは、特にk ≪ nの場合に、ナイーブなインジケーター行列アプローチが達成するO(n³)あるいはそれ以上と比べて大幅に優れています。
  • エンジニアリング面:フラットレイアウトとnjitパターンの組み合わせは、さらなる最適化を可能にします(安全であればインデックスにint32を使用する、サンプリングを並列化する、繰り返し使用されるスコア項を事前計算するなど)。また、labeling.get_eventsから最適化サンプラーへと流れるパイプラインにも自然に統合できます。

メモリ効率分析:最適化実装と標準インジケーター行列実装の比較

メモリ使用量の比較


メモリ消費データは、標準実装と最適化実装の間に劇的な差があることを示しています。

サンプルサイズ 標準 最適化 メモリ削減 圧縮比
500 7.19 MB 0.02 MB 99.7% 359:1
1,000 23.75 MB 0.04 MB 99.8% 594:1
2,000 93.65 MB 0.07 MB 99.9% 1,338:1
4,000 412.23 MB 0.14 MB 99.97% 2,944:1
8,000 1,237.82 MB 0.28 MB 99.98% 4,421:1


成長パターンの数理分析

標準実装(二次成長)


標準実装はO(n²)のメモリ計算量を示します。

Memory(n) ≈ 0.0000188 × n² (MB)
  • n=8,000の場合、予測値 = 1,203 MB、実測値 = 1,238 MB(精度97%)
  • サンプル数を2倍にすると、メモリは4倍に増加(二次成長と整合的)

最適化実装(線形成長)

最適化実装はO(n)のメモリ計算量を示します。

Memory(n) ≈ 0.000035 × n (MB)
  • n=8,000の場合、予測値 = 0.28 MB、実測値 = 0.28 MB(完全一致)
  • サンプル数を2倍にすると、メモリは2倍に増加(線形成長と整合的)


金融機械学習における実務的含意

スケーラビリティの限界


標準実装

  • n=50,000:~47 GB(非現実的)
  • n=100,000:~188 GB(実質的に不可能)
  • 最大規模:約15,000サンプル

最適化された実装

  • n=50,000:約1.75 MB(ごく軽量)
  • n=100,000:約3.5 MB(容易に処理可能)
  • 最大規模:数百万サンプル規模

実運用でのデプロイシナリオ

# Typical financial dataset scenarios
scenarios = {
    "Intraday Trading": {
        "samples": 50_000,      # 2 years of 5-minute bars
        "standard_memory": "47 GB",
        "optimized_memory": "1.75 MB",
        "feasible": "Only with optimized"
    },
    "Multi-Asset Portfolio": {
        "samples": 200_000,     # 100 instruments × 2,000 bars
        "standard_memory": "752 GB", 
        "optimized_memory": "7 MB",
        "feasible": "Only with optimized"
    },
    "Research Backtesting": {
        "samples": 1_000_000,   # Comprehensive market analysis
        "standard_memory": "18.8 TB",
        "optimized_memory": "35 MB",
        "feasible": "Only with optimized"
    }
}


技術的アーキテクチャの洞察

なぜ劇的な違いがあるのか


標準実装(get_ind_matrix):

# Creates dense n × n matrix (O(n²) memory)
ind_matrix = np.zeros((len(bar_index), len(label_endtime)), dtype=np.int8)
for sample_num, label_array in enumerate(tokenized_endtimes):
    ind_mat[label_index:label_endtime+1, sample_num] = 1  # Fills entire ranges

最適化された実装(precompute_active_indices):

# Stores only active indices (O(k×n) memory, where k << n)
active_indices = {}
for sample_id in range(n_samples):
    mask = (price_bars_array >= t0) & (price_bars_array <= t1)
    indices = np.where(mask)[0]  # Stores only non-zero indices
    active_indices[sample_id] = indices

メモリ効率の向上

サンプル数が増えるにつれて圧縮率が向上する理由は次の通りです。

  1. 疎性の増加:各サンプルは総バーのごく一部にしか影響しません。
  2. 固定オーバーヘッドが小さい:辞書構造の基本メモリコストは最小限です。
  3. 効率的な格納:密行列の代わりに整数配列を使用します。



逐次ブートストラップへの性能影響

アルゴリズムの計算量比較

# Standard implementation: O(n³) time, O(n²) memory
def seq_bootstrap_standard(ind_mat):
    # Each iteration: O(n²) operations × n iterations
    for i in range(n_samples):
        avg_unique = _bootstrap_loop_run(ind_mat, prev_concurrency)  # O(n²)
        
# Optimized implementation: O(n×k) time, O(n) memory  
def seq_bootstrap_optimized(active_indices):
    # Each iteration: O(k) operations × n iterations (where k = avg event length)
    for i in range(n_samples):
        prob = _seq_bootstrap_loop(flat_indices, offsets, lengths, concurrency, uniforms)  # O(k)

メモリ使用パターンから、計算時間の性能も推定できます。

操作 n=1,000 n=8,000 スケーリング効果
メモリ割り当て 23.75 MB → 0.04 MB 1,238 MB → 0.28 MB 4,421倍改善
行列演算 O(100万)要素 O(6,400万)要素 64倍遅い(標準)
キャッシュ効率 悪い(大きな行列) 優秀(小さな配列) 大きな優位性


アンサンブルの構築:SequentiallyBootstrappedBaggingClassifier

最適化された逐次ブートストラップサンプラーを用意したので、これをフル機械学習アンサンブルに統合できます。SequentiallyBootstrappedBaggingClassifierは、逐次ブートストラップの時間依存性の認識と、アンサンブル手法による分散低減の両方を組み合わせたクラスです。

バギングが有効な理由と、逐次ブートストラップが必要な理由


バギング(Bootstrap aggregating)は機械学習で非常に有効なアンサンブル手法です。基本的な考え方はエレガントです。

  1. 学習データから複数のブートストラップ標本を生成します。
  2. 各サンプルで個別のモデルを学習します。
  3. 予測を投票(分類)または平均(回帰)で集約します。

サンプルが独立していれば、各ブートストラップ標本はデータのわずかに異なる「見え方」を提供し、これらを集約することで分散を低減できます。バイアスは増加せず、予測の安定性が向上します。

しかし、ラベルが重複する金融機械学習においては、標準バギングは致命的に失敗します。

各ブートストラップ標本は、ラベルの同時発生により同じ時間パターンを何度も含みます。アンサンブルは同じパターンを繰り返し学習してしまいます。この結果、次のようになります。

  • 過信した予測:モデルが同じパターンを10回以上学習し、高い信頼度で予測する
  • 分散の過小評価:ブートストラップ標本間で独立性が確保されない
  • 汎化性能の低下:実際のパターン頻度よりも学習データが多く見積もられる

逐次ブートストラップは、各ブートストラップ標本が時間的に独立性の高いデータを最大化するようにサンプリングするため、アンサンブルに真に多様な学習セットを提供できます。

アーキテクチャの概要

SequentiallyBootstrappedBaggingClassifierはscikit-learnのBaggingClassifierを拡張し、以下の3点を変更しています。

  1. 逐次サンプリング:従来の一様ランダムサンプリングの代わりにseq_bootstrap_optimizedを使用します。
  2. 時間メタデータの追跡:samples_info_sets(ラベルの開始/終了時刻)とprice_bars_indexを維持します。
  3. アクティブインデックスの事前計算:スパースなインデックスマッピングを一度作成し、すべての推定器で再利用します。


実装の解説

ステップ1:初期化とメタデータ

この分類器は、標準的なバギングでは不要な時間情報を必要とします。
def __init__(
    self,
    samples_info_sets,    # NEW: label temporal spans
    price_bars_index,     # NEW: price bar timestamps
    estimator=None,
    n_estimators=10,
    max_samples=1.0,
    max_features=1.0,
    bootstrap_features=False,
    oob_score=False,
    warm_start=False,
    n_jobs=None,
    random_state=None,
    verbose=0,
):

主要パラメータの説明:

  • samples_info_sets (pd.Series):インデックスにラベルの開始時刻(t0)、値にラベルの終了時刻(t1)を保持します。これにより、各観測値のラベルがカバーする時間範囲を表します。
  • price_bars_index (pd.DatetimeIndex):ラベル生成に使用したすべての価格バーのタイムスタンプです。時間範囲をバーのインデックスにマッピングするために必要です。
  • estimator:ベース分類器(デフォルトはDecisionTreeClassifier)です。アンサンブル内の各モデルはこの分類器のクローンです。
  • n_estimators:アンサンブル内のモデル数です。多いほど予測は滑らかになりますが、学習時間は長くなります。
  • max_samples:ブートストラップ標本のサイズです。(0,1]の浮動小数点なら学習データの割合、整数なら正確なサンプル数です。
  • bootstrap_features:特徴量のサブサンプリングをおこなうかどうかです。おこなうと、多様性は増しますが、個々のモデルの精度はやや低下する可能性があります。

ステップ2:アクティブインデックスの計算

サンプリングの前に、スパースなインデックスマッピングを一度だけ事前計算してキャッシュします。

def _fit(self, X, y, max_samples=None, sample_weight=None):
    # ... validation and setup ...
    
    # Compute active indices mapping (once, cached for all estimators)
    if self.active_indices_ is None:
        self.active_indices_ = get_active_indices(
            self.samples_info_sets, 
            self.price_bars_index
        )

事前計算をおこなうのは、アクティブインデックスの計算がO(n)で決定的であり、ラベルのタイムスタンプのみに依存して乱数の影響を受けないためです。一度計算して再利用することで、多数の推定器を学習する際の計算時間を節約できます。

メモリ効率:メモリ分析のセクションで示した通り、active_indices_は平均ラベル長kに比例するO(n·k)のメモリを使用します(kは平均ラベル長)。これに対し、密なインジケーター行列ではO(n²)のメモリを消費します。8,000サンプルの例では、active_indices_が0.28 MBに対して、密行列は1,238 MBとなり、圧縮率は4,421:1に相当します。

ステップ3:カスタムブートストラップ標本の生成

重要な革新点:従来の一様ランダムサンプリングを逐次ブートストラップに置き換えたことです。

def _generate_bagging_indices(
    random_state, bootstrap_features, n_features, max_features, max_samples, active_indices
):
    """Randomly draw feature and sample indices."""
    # Get valid random state - this returns a RandomState object
    random_state_obj = check_random_state(random_state)

    # Draw samples using sequential bootstrap
    if isinstance(max_samples, numbers.Integral):
        sample_indices = seq_bootstrap(
            active_indices, sample_length=max_samples, random_seed=random_state_obj
        )
    elif isinstance(max_samples, numbers.Real):
        n_samples = int(round(max_samples * len(active_indices)))
        sample_indices = seq_bootstrap(
            active_indices, sample_length=n_samples, random_seed=random_state_obj
        )
    else:
        sample_indices = seq_bootstrap(
            active_indices, sample_length=None, random_seed=random_state_obj
        )

    # Draw feature indices only if bootstrap_features is True
    if bootstrap_features:
        if isinstance(max_features, numbers.Integral):
            n_feat = max_features
        elif isinstance(max_features, numbers.Real):
            n_feat = int(round(max_features * n_features))
        else:
            raise ValueError("max_features must be int or float when bootstrap_features=True")

        feature_indices = _generate_random_features(
            random_state_obj, bootstrap_features, n_features, n_feat
        )
    else:
        # When not bootstrapping features, return None (will be handled downstream)
        feature_indices = None

    return sample_indices, feature_indices

重要な洞察:サンプル(時間次元/観測値)の抽出には逐次ブートストラップを使用しますが、特徴量(列)の抽出は標準のランダムサンプリングを用います(bootstrap_features=Trueの場合)。これは次の理由で正しいです。

  • 時間的重複は観測値(行)間で発生し、特徴量(列)間では発生しない
  • 特徴量間の相関は時間的同時性とは独立している
  • 従来の特徴量のブートストラップをおこなうことで、多様性を増しつつ時間的問題は生じない

ステップ4:推定器の並列学習

複数の推定器の学習はほぼ完全に並列化可能であり、それぞれ独立して学習できます。

def _parallel_build_estimators(
    n_estimators, ensemble, X, y, active_indices, sample_weight, seeds, total_n_estimators, verbose
):
    """Private function used to build a batch of estimators within a job."""
    # Retrieve settings
    n_samples, n_features = X.shape
    max_samples = ensemble._max_samples
    max_features = ensemble.max_features
    bootstrap_features = ensemble.bootstrap_features
    support_sample_weight = has_fit_parameter(ensemble.estimator_, "sample_weight")

    # Build estimators
    estimators = []
    estimators_samples = []
    estimators_features = []

    for i in range(n_estimators):
        if verbose > 1:
            print(
                "Building estimator %d of %d for this parallel run (total %d)..."
                % (i + 1, n_estimators, total_n_estimators)
            )

        random_state = seeds[i]
        estimator = ensemble._make_estimator(append=False, random_state=random_state)

        # Draw samples and features
        sample_indices, feature_indices = _generate_bagging_indices(
            random_state, bootstrap_features, n_features, max_features, max_samples, active_indices
        )

        # Draw samples, using sample weights if supported
        if support_sample_weight and sample_weight is not None:
            curr_sample_weight = sample_weight[sample_indices]
        else:
            curr_sample_weight = None

        # Store None for features if no bootstrapping (memory optimization)
        if bootstrap_features:
            estimators_features.append(feature_indices)
        else:
            estimators_features.append(None)  # Don't store redundant feature arrays

        estimators_samples.append(sample_indices)

        # Select data
        if bootstrap_features:
            X_ = X[sample_indices][:, feature_indices]
        else:
            X_ = X[sample_indices]  # Use all features

        y_ = y[sample_indices]

        estimator.fit(X_, y_, sample_weight=curr_sample_weight)
        estimators.append(estimator)

    return estimators, estimators_features, estimators_samples

並列効率:n_jobs=-1を指定すると、実装はすべてのCPUコアを使用します。たとえば、8コアで100個の推定器を学習する場合、1コアあたり約12個の推定器が同時に処理されます。これにより、大規模なアンサンブルでもほぼ線形の速度向上が得られます。

ステップ5:アウトオブバッグ(OOB)スコアリング

バギングの最も有用な機能のひとつは、OOBサンプルによる組み込みの検証です。

def _set_oob_score(self, X, y):
    """Compute out-of-bag score"""

    # Safeguard: Ensure n_classes_ is set
    if not hasattr(self, "n_classes_"):
        self.classes_ = np.unique(y)
        self.n_classes_ = len(self.classes_)

    n_samples = y.shape[0]
    n_classes = self.n_classes_

    predictions = np.zeros((n_samples, n_classes))

    for estimator, samples, features in zip(
        self.estimators_, self._estimators_samples, self.estimators_features_
    ):
        # Create mask for OOB samples
        mask = ~indices_to_mask(samples, n_samples)

        if np.any(mask):
            # Get predictions for OOB samples
            X_oob = X[mask]

            # If features is None, use all features; otherwise subset
            if features is not None:
                X_oob = X_oob[:, features]

            predictions[mask] += estimator.predict_proba(X_oob)

    # Average predictions
    denominator = np.sum(predictions != 0, axis=1)
    denominator[denominator == 0] = 1  # avoid division by zero
    predictions /= denominator[:, np.newaxis]

    # Compute OOB score
    oob_decision_function = predictions
    oob_prediction = np.argmax(predictions, axis=1)

    if n_classes == 2:
        oob_prediction = oob_prediction.astype(np.int64)

    self.oob_decision_function_ = oob_decision_function
    self.oob_prediction_ = oob_prediction
    self.oob_score_ = accuracy_score(y, oob_prediction)

金融機械学習においてOOBが重要な理由:

  • データを無駄にしない:各サンプルは一部の推定器の学習と他の推定器の検証の両方に使用される
  • 正直な推定:OOBスコアはクロスバリデーションに近い推定値を、追加の計算コストなしで提供する
  • 早期停止の指標:学習中にOOBスコアを監視することで過学習を検出する
  • 時間的安全性:逐次ブートストラップを用いることで、OOBサンプルは時間的に真に独立する

ステップ6:クラスの拡張

ここまでの内容を統合し、SequentiallyBootstrappedBaseBaggingSequentiallyBootstrappedBaggingClassifierSequentiallyBootstrappedBaggingRegressorの各クラスを作成します。これらはsb_bagging.pyに含まれています。

完全な使用例

import pandas as pd
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report

# Assume we have triple-barrier labels from Part 3
# samples_info_sets: pd.Series with index=t0, values=t1
# price_bars: pd.DataFrame with DatetimeIndex
# X: feature matrix, y: labels

# Initialize classifier
clf = SequentiallyBootstrappedBaggingClassifier(
    samples_info_sets=samples_info_sets,  # Label temporal spans
    price_bars_index=price_bars.index,    # Bar timestamps
    estimator=DecisionTreeClassifier(
        max_depth=6,
        min_samples_leaf=50
    ),
    n_estimators=100,                  # Large ensemble for stability
    max_samples=0.5,                   # Use 50% of data per estimator
    bootstrap_features=True,           # Also subsample features
    max_features=0.7,                  # Use 70% of features per estimator
    oob_score=True,                    # Enable OOB validation
    n_jobs=-1,                        # Use all CPU cores
    random_state=42,                  # Reproducibility
    verbose=1
)

# Train ensemble
clf.fit(X_train, y_train)

# Inspect OOB performance (no test set needed!)
print(f"OOB Score: {clf.oob_score_:.4f}")

# Make predictions on test set
y_pred = clf.predict(X_test)
y_proba = clf.predict_proba(X_test)

# Evaluate
print(classification_report(y_test, y_pred))

# Access individual estimators if needed
print(f"Number of estimators: {len(clf.estimators_)}")
print(f"Average sample size: {np.mean([len(s) for s in clf.estimators_samples_]):.0f}")

パラメータ調整ガイドライン

n_estimators(モデル数)

  • 小規模データセット(<1,000サンプル):50~100推定器
  • 中規模データセット(1,000~10,000):100~200推定器
  • 大規模データセット(>10,000):200~500推定器
  • 目安:OOBスコアが頭打ちになるまで多い方がよい(学習中に監視)

max_samples(ブートストラップ標本サイズ)

  • 高重複度(バーあたり>10の重複):小さめのサンプル(0.3~0.5)で多様性を最大化する
  • 低重複度(バーあたり<5の重複):大きめのサンプル(0.6~0.8)でも安全
  • トレードオフ:小さいサンプル → 多様性は増すが個々のモデルは弱くなる
bootstrap_features(特徴量のサブサンプリング)
  • 有効にする場合:特徴量が多い(>50)、特徴量間に相関がある、最大多様性を求める場合
  • 無効にする場合:特徴量が少ない(<20)、各特徴量が重要、解釈性が重要な場合
  • 推奨max_features:有効時は0.5~0.7(低すぎると個々のモデルが弱くなる)

比較:標準 vs 逐次ブートストラップバギング

実際の取引戦略での性能差を確認しましょう。

from sklearn.ensemble import BaggingClassifier

# Standard bagging (temporal leakage)
standard_clf = BaggingClassifier(
    estimator=DecisionTreeClassifier(max_depth=6),
    n_estimators=100,
    max_samples=0.5,
    oob_score=True,
    random_state=42
)

# Sequential bootstrap bagging (temporal awareness)
sequential_clf = SequentiallyBootstrappedBaggingClassifier(
    samples_info_sets=samples_info_sets,
    price_bars_index=price_bars.index,
    estimator=DecisionTreeClassifier(max_depth=6),
    n_estimators=100,
    max_samples=0.5,
    oob_score=True,
    random_state=42
)

# Train both
standard_clf.fit(X_train, y_train)
sequential_clf.fit(X_train, y_train)

# Compare results
print("Standard Bagging:")
print(f"  OOB Score: {standard_clf.oob_score_:.4f}")
print(f"  Test Accuracy: {standard_clf.score(X_test, y_test):.4f}")

print("\nSequential Bootstrap Bagging:")
print(f"  OOB Score: {sequential_clf.oob_score_:.4f}")
print(f"  Test Accuracy: {sequential_clf.score(X_test, y_test):.4f}")

同時発生の高い金融データに関する典型的な結果:

指標 標準バギング 逐次バギング 改善
OOB-Test Gap 0.124 0.013 -89.5%


    クロスバリデーションとの統合

    OOBスコアは便利ですが、時間的リークを防ぐためには、適切な評価にパージクロスバリデーション(purged CV)が必要です。

    from mlfinlab.cross_validation import PurgedKFold
    
    # Setup purged cross-validation
    cv = PurgedKFold(
        n_splits=5,
        samples_info_sets=samples_info_sets,
        pct_embargo=0.01  # Embargo 1% of data after each fold
    )
    
    

    重要:各推定器で逐次ブートストラップサンプリングをおこなっていても、アンサンブル全体を正しく評価するにはパージCVが必須です。逐次ブートストラップは推定器内部の重複を処理しますが、パージCVはフォールド間の時間的リークを防ぎます。

    逐次ブートストラップバギングモデルに対応するため、独自のクロスバリデーション手法を実装する必要があります。以下の関数は、各フォールドで何が起きるかを包括的に分析するもので、データの時間依存性を深く理解するために必要です。

    def analyze_cross_val_scores(
        classifier: ClassifierMixin,
        X: pd.DataFrame,
        y: pd.Series,
        cv_gen: BaseCrossValidator,
        sample_weight_train: np.ndarray = None,
        sample_weight_score: np.ndarray = None,
    ):
        # pylint: disable=invalid-name
        # pylint: disable=comparison-with-callable
        """
        Advances in Financial Machine Learning, Snippet 7.4, page 110.
    
        Using the PurgedKFold Class.
    
        Function to run a cross-validation evaluation of the classifier using sample weights and a custom CV generator.
        Scores are computed using accuracy_score, probability_weighted_accuracy, log_loss and f1_score.
    
        Note: This function is different to the book in that it requires the user to pass through a CV object. The book
        will accept a None value as a default and then resort to using PurgedCV, this also meant that extra arguments had to
        be passed to the function. To correct this we have removed the default and require the user to pass a CV object to
        the function.
    
        Example:
    
        .. code-block:: python
    
            cv_gen = PurgedKFold(n_splits=n_splits, t1=t1, pct_embargo=pct_embargo)
            scores_array = ml_cross_val_scores_all(classifier, X, y, cv_gen, sample_weight_train=sample_train,
                                                   sample_weight_score=sample_score, scoring=accuracy_score)
    
        :param classifier: (BaseEstimator) A scikit-learn Classifier object instance.
        :param X: (pd.DataFrame) The dataset of records to evaluate.
        :param y: (pd.Series) The labels corresponding to the X dataset.
        :param cv_gen: (BaseCrossValidator) Cross Validation generator object instance.
        :param sample_weight_train: (np.array) Sample weights used to train the model for each record in the dataset.
        :param sample_weight_score: (np.array) Sample weights used to evaluate the model quality.
        :return: tuple(dict, pd.DataFrame, dict) The computed scores, a data frame of mean and std. deviation, and a dict of data in each fold
        """
        scoring_methods = [
            accuracy_score,
            probability_weighted_accuracy,
            log_loss,
            precision_score,
            recall_score,
            f1_score,
        ]
        ret_scores = {
            (
                scoring.__name__.replace("_score", "")
                .replace("probability_weighted_accuracy", "pwa")
                .replace("log_loss", "neg_log_loss")
            ): np.zeros(cv_gen.n_splits)
            for scoring in scoring_methods
        }
    
        # If no sample_weight then broadcast a value of 1 to all samples (full weight).
        if sample_weight_train is None:
            sample_weight_train = np.ones((X.shape[0],))
    
        if sample_weight_score is None:
            sample_weight_score = np.ones((X.shape[0],))
    
        seq_bootstrap = isinstance(classifier, SequentiallyBootstrappedBaggingClassifier)
        if seq_bootstrap:
            t1 = classifier.samples_info_sets.copy()
            common_idx = t1.index.intersection(y.index)
            X, y, t1 = X.loc[common_idx], y.loc[common_idx], t1.loc[common_idx]
            if t1.empty:
                raise KeyError(f"samples_info_sets not aligned with data")
            classifier.set_params(oob_score=False)
    
        cms = []  # To store confusion matrices
    
        # Score model on KFolds
        for i, (train, test) in enumerate(cv_gen.split(X=X, y=y)):
            if seq_bootstrap:
                classifier = clone(classifier).set_params(
                    samples_info_sets=t1.iloc[train]
                )  # Create new instance
            fit = classifier.fit(
                X=X.iloc[train, :],
                y=y.iloc[train],
                sample_weight=sample_weight_train[train],
            )
            prob = fit.predict_proba(X.iloc[test, :])
            pred = fit.predict(X.iloc[test, :])
            params = dict(
                y_true=y.iloc[test],
                y_pred=pred,
                labels=classifier.classes_,
                sample_weight=sample_weight_score[test],
            )
    
            for method, scoring in zip(ret_scores.keys(), scoring_methods):
                if scoring in (probability_weighted_accuracy, log_loss):
                    score = scoring(
                        y.iloc[test],
                        prob,
                        sample_weight=sample_weight_score[test],
                        labels=classifier.classes_,
                    )
                    if method == "neg_log_loss":
                        score *= -1
                else:
                    try:
                        score = scoring(**params)
                    except:
                        del params["labels"]
                        score = scoring(**params)
                        params["labels"] = classifier.classes_
    
                ret_scores[method][i] = score
    
            cms.append(confusion_matrix(**params).round(2))
    
        # Mean and standard deviation of scores
        scores_df = pd.DataFrame.from_dict(
            {
                scoring: {"mean": scores.mean(), "std": scores.std()}
                for scoring, scores in ret_scores.items()
            },
            orient="index",
        )
    
        # Extract TN, TP, FP, FN for each fold
        confusion_matrix_breakdown = []
        for i, cm in enumerate(cms, 1):
            if cm.shape == (2, 2):  # Binary classification
                tn, fp, fn, tp = cm.ravel()
                confusion_matrix_breakdown.append({"fold": i, "TN": tn, "FP": fp, "FN": fn, "TP": tp})
            else:
                # For multi-class, you might want different handling
                confusion_matrix_breakdown.append({"fold": i, "confusion_matrix": cm})
    
        return ret_scores, scores_df, confusion_matrix_breakdown
    

    以下は、メタラベル付きボリンジャーバンドの平均回帰戦略(sample_weights.ipynbを参照)の結果です。 逐次ブートストラップを用いると、すべてのスコアが改善され、分散も低下しています。

    クロスバリデーションの結果:

    ランダムフォレスト 標準バギング 逐次バギング
    accuracy 0.509 ± 0.024 0.515 ± 0.024 0.527 ± 0.015
    pwa 0.513 ± 0.038 0.519 ± 0.039 0.544 ± 0.018
    neg_log_loss -0.695 ± 0.005 -0.694 ± 0.005 -0.692 ± 0.001
    precision 0.637 ± 0.027 0.643 ± 0.026 0.637 ± 0.026
    recall 0.476 ± 0.095 0.484 ± 0.098 0.567 ± 0.038
    f1 0.539 ± 0.065 0.546 ± 0.067 0.599 ± 0.026

    アウトオブサンプルの結果:

    ランダムフォレスト 標準バギング 逐次バギング
    accuracy 0.505780 0.496628 0.519750
    pwa 0.493505 0.495487 0.523738
    neg_log_loss -0.696703 -0.696612 -0.692669
    precision 0.650811 0.646396 0.633913
    recall 0.461303 0.439847 0.558621
    f1 0.539910 0.523484 0.593890
    oob 0.516976 0.516133 0.522153
    oob_test_gap 0.011195 0.019505 0.002403

    主要な観察点:

    1. 逐次ブートストラップバギングでは、OOBとテストのギャップが小さい(0.002 vs 0.019)ため、OOBスコアは信頼できることを示しています。OOBとテストの性能が密接に一致しており、隠れた時間的リークがないことが確認できます。
    2. テスト精度の向上は、真に未学習データに対する汎化性能の改善を示しています。

    高度な内容:カスタムOOB指標

    組み込みのoob_score_は分類では正解率(accuracy)、回帰ではR²を使用します。しかし金融アプリケーションでは、しばしばカスタム指標が必要になります。

    def compute_custom_oob_metrics(clf, X, y, sample_weight=None):
        """
        Compute custom OOB metrics (F1, AUC, precision/recall) for a fitted ensemble.
        
        Args:
            clf: Fitted SequentiallyBootstrappedBaggingClassifier
            X: Feature matrix used in training
            y: True labels
            sample_weight: Optional sample weights
        
        Returns:
            dict: Custom OOB metric values
        """
        from sklearn.metrics import f1_score, roc_auc_score, precision_score, recall_score
        
        n_samples = y.shape[0]
        n_classes = clf.n_classes_
        
        # Accumulate OOB predictions
        oob_proba = np.zeros((n_samples, n_classes))
        oob_count = np.zeros(n_samples)
        
        for estimator, samples, features in zip(
            clf.estimators_,
            clf.estimators_samples_,
            clf.estimators_features_
        ):
            mask = ~indices_to_mask(samples, n_samples)
            if np.any(mask):
                X_oob = X[mask][:, features]
                oob_proba[mask] += estimator.predict_proba(X_oob)
                oob_count[mask] += 1
        
        # Average and get predictions
        oob_mask = oob_count > 0
        oob_proba[oob_mask] /= oob_count[oob_mask, np.newaxis]
        oob_pred = np.argmax(oob_proba, axis=1)
        
        # Compute metrics on samples with OOB predictions
        y_oob = y[oob_mask]
        pred_oob = oob_pred[oob_mask]
        proba_oob = oob_proba[oob_mask]
        
        metrics = {
            'f1': f1_score(y_oob, pred_oob, average='weighted'),
            'precision': precision_score(y_oob, pred_oob, average='weighted'),
            'recall': recall_score(y_oob, pred_oob, average='weighted'),
            'coverage': oob_mask.sum() / n_samples  # Fraction with OOB predictions
        }
        
        # Add AUC for binary classification
        if n_classes == 2:
            metrics['auc'] = roc_auc_score(y_oob, proba_oob[:, 1])
        
        return metrics
    
    # Usage
    oob_metrics = compute_custom_oob_metrics(sequential_clf, X_train, y_train)
    print("Custom OOB Metrics:")
    for metric, value in oob_metrics.items():
        print(f"  {metric}: {value:.4f}")


    本番環境でのデプロイに関する考慮事項

    メモリ管理


    大規模なアンサンブルはメモリを大量に消費する可能性があります。使用状況を監視し、必要に応じて最適化してください。

    import sys
    
    # Check ensemble memory footprint
    def estimate_ensemble_size(clf):
        """Estimate memory usage of fitted ensemble."""
        total_bytes = 0
        
        # Estimators
        for est in clf.estimators_:
            total_bytes += sys.getsizeof(est)
        
        # Sample indices
        for samples in clf.estimators_samples_:
            total_bytes += samples.nbytes
        
        # Feature indices
        if clf.estimators_features_ is not None:
            for features in clf.estimators_features_:
                total_bytes += features.nbytes
        
        return total_bytes / (1024 ** 2)  # Convert to MB
    
    size_mb = estimate_ensemble_size(sequential_clf)
    print(f"Ensemble size: {size_mb:.2f} MB")

    モデルのシリアライズ

    以下は、学習済みアンサンブルの効率的な保存および読み込みをおこないます。

    import joblib
    
    # Save entire ensemble
    joblib.dump(sequential_clf, 'sequential_bagging_model.pkl', compress=3)
    
    # Load for prediction
    loaded_clf = joblib.load('sequential_bagging_model.pkl')
    
    # Verify predictions match
    original_pred = sequential_clf.predict_proba(X_test)
    loaded_pred = loaded_clf.predict_proba(X_test)
    assert np.allclose(original_pred, loaded_pred)

    よくある落とし穴と解決策

    落とし穴1:時間メタデータの渡し忘れ

    問題点:samples_info_setsやprice_bars_indexを渡さずに分類器を使用しようとするとエラーになります。

    解決策:ラベリング処理からこれらを正しく構築することを常に確認します。

    # From triple-barrier labeling (Part 3)
    events = get_events(
        close=close_prices,
        t_events=trigger_times,
        pt_sl=[1, 1],
        target=daily_vol,
        min_ret=0.01,
        num_threads=4,
        vertical_barrier_times=vertical_barriers
    )
    
    # events['t1'] contains end times - this is samples_info_sets
    samples_info_sets = events['t1']
    price_bars_index = close_prices.index
    
    # Now safe to use
    clf = SequentiallyBootstrappedBaggingClassifier(
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars_index,
        # ... other params ...
    )

    落とし穴2:インデックス長の不一致

    問題点:len(samples_info_sets) != len(X)になると、原因が分かりにくいエラーが発生します。

    解決策:特徴量、ラベル、メタデータの長さを常に揃えます。

    # After computing features and labels, ensure alignment
    assert len(X) == len(y) == len(samples_info_sets), \
        "Feature matrix, labels, and metadata must have same length"
    
    # If they don't match, use index intersection
    common_idx = X.index.intersection(y.index).intersection(samples_info_sets.index)
    X_aligned = X.loc[common_idx]
    y_aligned = y.loc[common_idx]
    samples_aligned = samples_info_sets.loc[common_idx]

    落とし穴3:ウォームスタートの挙動を無視

    問題点:warm_start=Trueに設定してからn_estimatorsを変更しても、既存の推定器は再学習されません。

    解決策:ウォームスタートは既存推定器に追加するだけであることを理解します。

    # Initial training with 50 estimators
    clf = SequentiallyBootstrappedBaggingClassifier(
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars_index,
        n_estimators=50,
        warm_start=True,
        random_state=42
    )
    clf.fit(X_train, y_train)
    
    # Add 50 more estimators (total=100)
    clf.n_estimators = 100
    clf.fit(X_train, y_train)  # Only trains 50 new estimators
    
    print(len(clf.estimators_))  # Output: 100

    他の手法とのベンチマーク


    金融データ上で、逐次ブートストラップバギングの性能が他のアンサンブル手法と比べてどのように異なるかを評価します。

    from sklearn.ensemble import (
        RandomForestClassifier,
        GradientBoostingClassifier,
        BaggingClassifier
    )
    from sklearn.model_selection import cross_val_score
    
    # Define models
    models = {
        'Standard Bagging': BaggingClassifier(
            estimator=DecisionTreeClassifier(max_depth=6),
            n_estimators=100,
            random_state=42
        ),
        'Random Forest': RandomForestClassifier(
            n_estimators=100,
            max_depth=6,
            random_state=42
        ),
        'Sequential Bagging': SequentiallyBootstrappedBaggingClassifier(
            samples_info_sets=samples_info_sets,
            price_bars_index=price_bars_index,
            estimator=DecisionTreeClassifier(max_depth=6),
            n_estimators=100,
            random_state=42
        )
    }
    
    # Benchmark with purged K-Fold CV
    results = {}
    cv_gen = PurgedKFold(n_splits, t1, pct_embargo)
    for name, model in models.items():
        raw_scores, scores_df, folds = analyze_cross_val_scores(
    	model, X, y, cv_gen, 
    	sample_weights_train=w, 
    	sample_weights_score=w,
    	)
        results[name] = dict(scores=scores_df, folds=folds)
    
    

    まとめとベストプラクティス

    SequentiallyBootstrappedBaggingClassifierは、ラベルの同時発生という金融時系列特有の問題に対応することで、アンサンブル学習の力を金融機械学習に活かします。重要なポイントは次のとおりです。

    逐次ブートストラップバギングを使うべき状況:

    • トリプルバリア法や、時間的にラベルが重なる手法を使用する場合
    • 高頻度データで観測値が自然に重複する場合
    • 時間構造が重要な金融機械学習タスク全般
    • 正確な分散推定が必要な本番システム

    標準的なバギングで十分な状況:

    • 日次以下の低頻度データでラベル重複がほとんどない場合
    • クロスセクショナル予測(時間ではなく資産間の予測)
    • 他の手段で時間的リークが解消されている場合

    本番環境向け設定チェックリスト:

    1. ✓ samples_info_setsとprice_bars_indexが正しく整列していることを確認する
    2. ✓ 学習中の監視用にoob_score=Trueを有効化する
    3. ✓ 全CPUコアを利用するためn_jobs=-1を設定する
    4. ✓ 再現性のためにrandom_stateを設定する
    5. ✓ 大規模アンサンブルではメモリ使用量を監視する
    6. ✓ パージ/エンバーゴ付きクロスバリデーションで検証する
    7. ✓ OOBスコアとテスト性能を比較し、残存するリークを検出する

    性能最適化のヒント:

    • active_indices_を一度事前計算してキャッシュする
    • 高重複度の場合はmax_samplesを小さめに設定する
    • 高次元データではbootstrap_featuresを有効化する
    • 低レイテンシ用途ではバッチ予測を活用する
    • 本番用には圧縮してモデルをシリアライズする

    逐次ブートストラップバギングを用いることで、金融データの時間構造を尊重しつつ、従来の機械学習でバギングが提供する分散削減のメリットも享受できる、本番対応可能なアンサンブル手法を手に入れることができます。


    MQL5への逐次ブートストラップモデルのデプロイ(ONNX経由)

    Pythonで堅牢な逐次ブートストラップモデルを学習した後、次の重要なステップはMetaTrader 5への本番デプロイです。ONNX (Open Neural Network Exchange)は、Pythonの豊富なML環境とMQL5の実稼働環境をつなぐ最も信頼性の高い橋渡しを提供します。

    MQL5のデプロイにONNXを選ぶ理由

    ONNXには、金融機械学習モデルをデプロイする上でいくつかの大きな利点があります。

    • MetaTrader 5ネイティブ対応:MQL5にはONNXランタイムが組み込まれており、外部依存は不要
    • 高性能:モデルはコンパイル済みC++コードとして動作し、マイクロ秒単位の予測を実現する
    • クロスプラットフォーム:同一モデルがWindows、Mac、Linux上のMetaTrader 5で動作可能
    • 広範な互換性:scikit-learnのアンサンブルに対応し、逐次ブートストラップモデルも含む
    • バージョン管理:バイナリモデルファイルは簡単にバージョン管理およびデプロイ可能

    理解しておくべき主な制約:

    • アンサンブルのメタデータ(OOBスコア、推定器サンプル)は保持されず、予測ロジックのみがONNX化される
    • MQL5上での再学習は不可で、学習はPython側でおこなう
    • 大規模アンサンブル(200以上の推定器)はモデルロード時間とメモリ使用量が増加
    • 特徴量計算はPythonと完全に同一になるよう、MQL5側で手動で再現する必要がある

    完全なデプロイパイプライン


    ステップ1:学習済みモデルをONNX形式にエクスポートする

    逐次ブートストラップ分類器を学習した後、ONNXに変換します。

    import onnx
    from skl2onnx import convert_sklearn
    from skl2onnx.common.data_types import FloatTensorType
    import numpy as np
    
    # Your trained sequential bootstrap model
    clf = SequentiallyBootstrappedBaggingClassifier(
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars.index,
        estimator=DecisionTreeClassifier(max_depth=6, min_samples_leaf=50),
        n_estimators=100,
        max_samples=0.5,
        random_state=42
    )
    clf.fit(X_train, y_train)
    
    # Define input shape - CRITICAL: must match feature count exactly
    n_features = X_train.shape[1]
    initial_type = [('float_input', FloatTensorType([None, n_features]))]
    
    # Convert to ONNX with appropriate settings
    onnx_model = convert_sklearn(
        clf,
        initial_types=initial_type,
        target_opset=12,  # MT5 supports opset 9-15
        options={
            'zipmap': False  # Return raw probabilities, not dictionary
        }
    )
    
    # Save model file
    model_filename = "sequential_bagging_model.onnx"
    with open(model_filename, "wb") as f:
        f.write(onnx_model.SerializeToString())
    
    print(f"Model exported: {len(onnx_model.SerializeToString()) / 1024:.2f} KB")
    print(f"Input features: {n_features}")
    print(f"Output classes: {len(clf.classes_)}")

    ステップ2:ONNXモデルの正確性の確認

    デプロイ前に、ONNXによる予測結果が元のPythonモデルと一致することを必ず確認します。

    import onnxruntime as rt
    
    # Load ONNX model
    sess = rt.InferenceSession(model_filename)
    
    # Inspect model structure
    input_name = sess.get_inputs()[0].name
    output_name = sess.get_outputs()[0].name
    print(f"Input tensor name: {input_name}")
    print(f"Output tensor name: {output_name}")
    
    # Test with sample data
    X_test_sample = X_test[:5].astype(np.float32)
    
    # Original model predictions
    sklearn_pred = clf.predict_proba(X_test_sample)
    
    # ONNX model predictions
    onnx_pred = sess.run([output_name], {input_name: X_test_sample})[0]
    
    # Verify predictions match within tolerance
    print("\nVerification Results:")
    print("Scikit-learn predictions:\n", sklearn_pred[:3])
    print("\nONNX predictions:\n", onnx_pred[:3])
    print(f"\nMax absolute difference: {np.abs(sklearn_pred - onnx_pred).max():.2e}")
    
    assert np.allclose(sklearn_pred, onnx_pred, atol=1e-5), "ERROR: Predictions don't match!"
    print("✓ Model verification passed")

    ステップ3:特徴量エンジニアリングパイプラインのドキュメント化

    デプロイ失敗で最も多い原因は特徴量の不整合です。特徴量計算を正確に文書化しておきます。

    import json
    from datetime import datetime
    
    # Document feature metadata for MQL5 implementation
    feature_metadata = {
        'model_version': 'v1.0_seq_bagging',
        'timestamp': datetime.now().isoformat(),
        'n_features': n_features,
        'n_estimators': 100,
        'lookback_period': 20,
        'feature_names': [
            'bb_position',         # (close - bb_middle) / (bb_upper - bb_lower)
            'bb_width',           # (bb_upper - bb_lower) / bb_middle
            'return_1d',          # (close[0] - close[1]) / close[1]
            'return_5d',          # (close[0] - close[5]) / close[5]
            'volatility_20d',     # std(returns, 20) / close[0]
            'volume_ratio',       # volume[0] / ma(volume, 20)
            'rsi_14',             # RSI with 14-period lookback
            'mean_reversion_z',  # (close - ma_20) / std_20
        ],
        'bb_parameters': {
            'period': 20,
            'std_dev': 2.0
        }
    }
    
    with open('feature_metadata.json', 'w') as f:
        json.dump(feature_metadata, f, indent=2)
    
    # Create test dataset for validation in MQL5
    test_data = {
        'features': X_test[:10].tolist(),
        'expected_predictions': clf.predict_proba(X_test[:10]).tolist(),
        'expected_classes': clf.predict(X_test[:10]).tolist()
    }
    
    with open('test_predictions.json', 'w') as f:
        json.dump(test_data, f, indent=2)


    MQL5での実装:ボリンジャーバンド平均回帰戦略

    ここからは、Pythonで学習したモデルに対応する完全なMQL5システムを実装します。まずは、Pythonの学習時と完全に一致する特徴量エンジニアリングから始めます。

    特徴量計算モジュール

    Pythonでの特徴量計算を正確に再現するために、FeatureEngine.mqhを作成します。

    //+------------------------------------------------------------------+
    //| FeatureEngine.mqh                                                 |
    //| Feature calculation engine matching Python training pipeline     |
    //+------------------------------------------------------------------+
    #property strict
    
    class CFeatureEngine {
    private:
        int m_lookback;
        int m_bb_period;
        double m_bb_deviation;
        int m_rsi_period;
        
    public:
        CFeatureEngine(int lookback=20, int bb_period=20, double bb_dev=2.0, int rsi_period=14) {
            m_lookback = lookback;
            m_bb_period = bb_period;
            m_bb_deviation = bb_dev;
            m_rsi_period = rsi_period;
        }
        
        // Main feature calculation - must match Python exactly
        bool CalculateFeatures(const double &close[], 
                                  const double &high[], 
                                  const double &low[],
                                  const long &volume[],
                                  double &features[]) {
            
            if(ArraySize(close) < m_lookback + 10) return false;
            
            // Must have exactly 8 features to match Python
            ArrayResize(features, 8);
            int idx = 0;
            
            // Calculate Bollinger Bands
            double bb_upper, bb_middle, bb_lower;
            CalculateBollingerBands(close, m_bb_period, m_bb_deviation, 
                                    bb_upper, bb_middle, bb_lower);
            
            // Feature 1: Bollinger Band Position
            // Measures where price sits within the bands (-1 to +1)
            double bb_range = bb_upper - bb_lower;
            if(bb_range > 0) {
                features[idx++] = (close[0] - bb_middle) / bb_range;
            } else {
                features[idx++] = 0.0;
            }
            
            // Feature 2: Bollinger Band Width
            // Normalized measure of volatility
            if(bb_middle > 0) {
                features[idx++] = bb_range / bb_middle;
            } else {
                features[idx++] = 0.0;
            }
            
            // Feature 3: 1-day return
            features[idx++] = SafeReturn(close[0], close[1]);
            
            // Feature 4: 5-day return
            features[idx++] = SafeReturn(close[0], close[5]);
            
            // Feature 5: 20-day volatility (annualized)
            double returns_std = CalculateReturnsStdDev(close, m_lookback);
            features[idx++] = returns_std / close[0];
            
            // Feature 6: Volume ratio
            double vol_ma = CalculateVolumeMA(volume, m_lookback);
            if(vol_ma > 0) {
                features[idx++] = (double)volume[0] / vol_ma;
            } else {
                features[idx++] = 1.0;
            }
            
            // Feature 7: RSI
            features[idx++] = CalculateRSI(close, m_rsi_period) / 100.0;
            
            // Feature 8: Mean reversion Z-score
            double ma_20 = CalculateMA(close, 20);
            double std_20 = CalculateStdDev(close, 20);
            if(std_20 > 0) {
                features[idx++] = (close[0] - ma_20) / std_20;
            } else {
                features[idx++] = 0.0;
            }
            
            return true;
        }
        
    private:
        // Calculate Bollinger Bands using SMA and standard deviation
        void CalculateBollingerBands(const double &close[], int period, double deviation,
                                      double &upper, double &middle, double &lower) {
            middle = CalculateMA(close, period);
            double std = CalculateStdDev(close, period);
            upper = middle + deviation * std;
            lower = middle - deviation * std;
        }
        
        // Simple Moving Average
        double CalculateMA(const double &data[], int period) {
            double sum = 0.0;
            for(int i = 0; i < period; i++) {
                sum += data[i];
            }
            return sum / period;
        }
        
        // Standard Deviation
        double CalculateStdDev(const double &data[], int period) {
            double mean = CalculateMA(data, period);
            double sum_sq = 0.0;
            for(int i = 0; i < period; i++) {
                double diff = data[i] - mean;
                sum_sq += diff * diff;
            }
            return MathSqrt(sum_sq / period);
        }
        
        // Standard deviation of returns (not prices)
        double CalculateReturnsStdDev(const double &close[], int period) {
            double returns[];
            ArrayResize(returns, period);
            
            for(int i = 0; i < period; i++) {
                returns[i] = SafeReturn(close[i], close[i+1]);
            }
            
            return CalculateStdDev(returns, period);
        }
        
        // RSI calculation
        // RSI calculation
        double CalculateRSI(const double &close[], int period) {
            double gains = 0.0, losses = 0.0;
            
            for(int i = 1; i <= period; i++) {
                double change = close[i-1] - close[i];
                if(change > 0) {
                    gains += change;
                } else {
                    losses -= change;
                }
            }
            
            double avg_gain = gains / period;
            double avg_loss = losses / period;
            
            if(avg_loss == 0.0) return 100.0;
            
            double rs = avg_gain / avg_loss;
            return 100.0 - (100.0 / (1.0 + rs));
        }
        
        // Volume moving average
        double CalculateVolumeMA(const long &volume[], int period) {
            double sum = 0.0;
            for(int i = 0; i < period; i++) {
                sum += (double)volume[i];
            }
            return sum / period;
        }
        
        // Safe return calculation with division by zero protection
        double SafeReturn(double current, double previous) {
            if(previous == 0.0 || MathAbs(previous) < 1e-10) return 0.0;
            return (current - previous) / previous;
        }
    };
    

    ONNX統合付きメインエキスパートアドバイザー(EA)

    ONNXモデルを読み込み、ボリンジャーバンド平均回帰戦略を実行するEA本体を作成します。

    //+------------------------------------------------------------------+
    //| SequentialBaggingEA.mq5                                          |
    //| Bollinger Band Mean Reversion with Sequential Bootstrap Model    |
    //+------------------------------------------------------------------+
    #property copyright "Your Name"
    #property version   "1.00"
    #property strict
    
    #include <Trade\Trade.mqh>
    #include "FeatureEngine.mqh"
    
    //--- Input parameters
    input group "Model Settings"
    input string   InpModelFile = "sequential_bagging_model.onnx"; // ONNX model filename
    input double   InpConfidenceThreshold = 0.60; // Minimum confidence for trade
    
    input group "Feature Parameters"
    input int      InpLookback = 20;           // Feature lookback period
    input int      InpBBPeriod = 20;           // Bollinger Bands period
    input double   InpBBDeviation = 2.0;        // Bollinger Bands deviation
    input int      InpRSIPeriod = 14;          // RSI period
    
    input group "Risk Management"
    input double   InpRiskPercent = 1.0;        // Risk per trade (%)
    input int      InpStopLoss = 200;          // Stop loss (points)
    input int      InpTakeProfit = 400;        // Take profit (points)
    input int      InpMaxTrades = 1;           // Maximum concurrent trades
    
    input group "Trading Hours"
    input bool     InpUseTradingHours = false; // Enable trading hours filter
    input int      InpStartHour = 9;           // Trading start hour
    input int      InpEndHour = 17;            // Trading end hour
    
    //--- Global variables
    long            g_model_handle = INVALID_HANDLE;
    CTrade          g_trade;
    CFeatureEngine  g_features;
    datetime        g_last_bar_time = 0;
    
    //+------------------------------------------------------------------+
    //| Expert initialization function                                   |
    //+------------------------------------------------------------------+
    int OnInit() {
        // Initialize feature engine
        g_features.CFeatureEngine(InpLookback, InpBBPeriod, InpBBDeviation, InpRSIPeriod);
        
        // Load ONNX model from MQL5/Files directory
        g_model_handle = OnnxCreateFromFile(
            InpModelFile,
            ONNX_DEFAULT
        );
        
        if(g_model_handle == INVALID_HANDLE) {
            Print("❌ Failed to load ONNX model: ", InpModelFile);
            Print("Ensure model is in: Terminal_Data_Folder/MQL5/Files/");
            return INIT_FAILED;
        }
        
        // Verify model structure
        long input_count, output_count;
        OnnxGetInputCount(g_model_handle, input_count);
        OnnxGetOutputCount(g_model_handle, output_count);
        
        vector input_shape;
        OnnxGetInputShape(g_model_handle, 0, input_shape);
        
        Print("✓ Model loaded successfully");
        Print("  Model file: ", InpModelFile);
        Print("  Input count: ", input_count);
        Print("  Output count: ", output_count);
        Print("  Expected features: ", (int)input_shape[1]);
        Print("  Confidence threshold: ", InpConfidenceThreshold);
        
        // Set trade parameters
        g_trade.SetExpertMagicNumber(20241102);
        g_trade.SetDeviationInPoints(10);
        g_trade.SetTypeFilling(ORDER_FILLING_FOK);
        
        return INIT_SUCCEEDED;
    }
    
    //+------------------------------------------------------------------+
    //| Expert deinitialization function                                 |
    //+------------------------------------------------------------------+
    void OnDeinit(const int reason) {
        if(g_model_handle != INVALID_HANDLE) {
            OnnxRelease(g_model_handle);
            Print("Model released");
        }
    }
    
    //+------------------------------------------------------------------+
    //| Expert tick function                                             |
    //+------------------------------------------------------------------+
    void OnTick() {
        // Check for new bar
        datetime current_bar_time = iTime(_Symbol, _Period, 0);
        if(current_bar_time == g_last_bar_time) return;
        g_last_bar_time = current_bar_time;
        
        // Trading hours filter
        if(InpUseTradingHours) {
            MqlDateTime dt;
            TimeToStruct(TimeCurrent(), dt);
            if(dt.hour < InpStartHour || dt.hour >= InpEndHour) return;
        }
        
        // Get market data
        double close[], high[], low[];
        long volume[];
        
        ArraySetAsSeries(close, true);
        ArraySetAsSeries(high, true);
        ArraySetAsSeries(low, true);
        ArraySetAsSeries(volume, true);
        
        int required_bars = InpLookback + 10;
        int copied = CopyClose(_Symbol, _Period, 0, required_bars, close);
        
        if(copied < required_bars) {
            Print("Insufficient bars: ", copied, " < ", required_bars);
            return;
        }
        
        CopyHigh(_Symbol, _Period, 0, required_bars, high);
        CopyLow(_Symbol, _Period, 0, required_bars, low);
        CopyTickVolume(_Symbol, _Period, 0, required_bars, volume);
        
        // Calculate features
        double feature_array[];
        if(!g_features.CalculateFeatures(close, high, low, volume, feature_array)) {
            Print("Feature calculation failed");
            return;
        }
        
        // Prepare input matrix for ONNX (must be float32)
        matrix input_matrix(1, ArraySize(feature_array));
        for(int i = 0; i < ArraySize(feature_array); i++) {
            input_matrix[0][i] = (float)feature_array[i];
        }
        
        // Run model inference
        matrix output_matrix;
        if(!OnnxRun(g_model_handle, ONNX_NO_CONVERSION, input_matrix, output_matrix)) {
            Print("❌ Model prediction failed!");
            return;
        }
        
        // Extract probabilities
        // Output shape: [1, 2] for binary classification
        // Class 0 = SELL signal, Class 1 = BUY signal
        double prob_sell = output_matrix[0][0];
        double prob_buy = output_matrix[0][1];
        
        // Log predictions for monitoring
        Comment(StringFormat(
            "Sequential Bootstrap EA\n" +
            "Time: %s\n" +
            "Prob SELL: %.2f%%\n" +
            "Prob BUY: %.2f%%\n" +
            "Threshold: %.2f%%\n" +
            "Positions: %d/%d",
            TimeToString(TimeCurrent(), TIME_DATE|TIME_MINUTES),
            prob_sell * 100,
            prob_buy * 100,
            InpConfidenceThreshold * 100,
            PositionsTotal(),
            InpMaxTrades
        ));
        
        // Trading logic: Mean reversion strategy
        if(PositionsTotal() < InpMaxTrades) {
            
            // BUY signal: Price at lower band, model predicts reversion up
            if(prob_buy > InpConfidenceThreshold) {
                ExecuteBuy(prob_buy, feature_array);
            }
            // SELL signal: Price at upper band, model predicts reversion down
            else if(prob_sell > InpConfidenceThreshold) {
                ExecuteSell(prob_sell, feature_array);
            }
        }
    }
    
    //+------------------------------------------------------------------+
    //| Execute BUY order                                                |
    //+------------------------------------------------------------------+
    void ExecuteBuy(double confidence, const double &features[]) {
        double price = SymbolInfoDouble(_Symbol, SYMBOL_ASK);
        double sl = price - InpStopLoss * _Point;
        double tp = price + InpTakeProfit * _Point;
        
        // Calculate position size based on risk
        double lot = CalculateLotSize(InpRiskPercent, InpStopLoss);
        
        // Build comment with BB position for analysis
        string comment = StringFormat(
            "SB|BUY|Conf:%.2f|BB:%.3f",
            confidence,
            features[0]  // BB position feature
        );
        
        if(g_trade.Buy(lot, _Symbol, price, sl, tp, comment)) {
            Print("✓ BUY executed: Lot=", lot, " Conf=", confidence, " BB=", features[0]);
        } else {
            Print("❌ BUY failed: ", g_trade.ResultRetcodeDescription());
        }
    }
    
    //+------------------------------------------------------------------+
    //| Execute SELL order                                               |
    //+------------------------------------------------------------------+
    void ExecuteSell(double confidence, const double &features[]) {
        double price = SymbolInfoDouble(_Symbol, SYMBOL_BID);
        double sl = price + InpStopLoss * _Point;
        double tp = price - InpTakeProfit * _Point;
        
        double lot = CalculateLotSize(InpRiskPercent, InpStopLoss);
        
        string comment = StringFormat(
            "SB|SELL|Conf:%.2f|BB:%.3f",
            confidence,
            features[0]
        );
        
        if(g_trade.Sell(lot, _Symbol, price, sl, tp, comment)) {
            Print("✓ SELL executed: Lot=", lot, " Conf=", confidence, " BB=", features[0]);
        } else {
            Print("❌ SELL failed: ", g_trade.ResultRetcodeDescription());
        }
    }
    
    //+------------------------------------------------------------------+
    //| Calculate lot size based on risk percentage                     |
    //+------------------------------------------------------------------+
    double CalculateLotSize(double risk_percent, int sl_points) {
        double account_balance = AccountInfoDouble(ACCOUNT_BALANCE);
        double risk_amount = account_balance * risk_percent / 100.0;
        
        double tick_value = SymbolInfoDouble(_Symbol, SYMBOL_TRADE_TICK_VALUE);
        double tick_size = SymbolInfoDouble(_Symbol, SYMBOL_TRADE_TICK_SIZE);
        
        // Calculate value of stop loss in account currency
        double point_value = tick_value / tick_size;
        double sl_value = sl_points * _Point * point_value;
        
        // Calculate lot size
        double lot_size = risk_amount / sl_value;
        
        // Normalize to broker's lot step
        double lot_step = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_STEP);
        lot_size = MathFloor(lot_size / lot_step) * lot_step;
        
        // Apply broker limits
        double min_lot = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_MIN);
        double max_lot = SymbolInfoDouble(_Symbol, SYMBOL_VOLUME_MAX);
        
        return MathMax(min_lot, MathMin(max_lot, lot_size));
    }
    

    デプロイチェックリストと検証

    本番前の確認

    EAを本番環境で稼働させる前に、以下の重要な検証を完了してください。

    1. 特徴量の整合性確認

    # Python: Generate test vectors with known outputs
    import json
    
    # Create detailed test cases
    test_cases = []
    for i in range(10):
        features = X_test[i]
        prediction = clf.predict_proba([features])[0]
        
        test_cases.append({
            'test_id': i,
            'features': features.tolist(),
            'expected_prob_sell': float(prediction[0]),
            'expected_prob_buy': float(prediction[1]),
            'expected_class': int(clf.predict([features])[0]),
            'tolerance': 1e-4
        })
    
    with open('mql5_validation_tests.json', 'w') as f:
        json.dump(test_cases, f, indent=2)
    
    print(f"Generated {len(test_cases)} test cases for MQL5 validation")

    2. MQL5検証スクリプトの作成

    //+------------------------------------------------------------------+
    //| ValidationScript.mq5                                             |
    //| Validates ONNX model predictions against Python test cases       |
    //+------------------------------------------------------------------+
    #property script_show_inputs
    
    input string InpModelFile = "sequential_bagging_model.onnx";
    
    void OnStart() {
        // Load model
        long model = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT);
        if(model == INVALID_HANDLE) {
            Print("Failed to load model");
            return;
        }
        
        // Test case 1: Manually input features from Python
        double test_features[] = {
            -0.523,  // bb_position
            0.042,   // bb_width
            -0.012,  // return_1d
            -0.034,  // return_5d
            0.018,   // volatility_20d
            1.234,   // volume_ratio
            0.425,   // rsi_14 (normalized)
            -1.823   // mean_reversion_z
        };
        
        // Expected output from Python (copy from test file)
        double expected_prob_sell = 0.234;
        double expected_prob_buy = 0.766;
        
        // Run prediction
        matrix input(1, 8);
        for(int i=0; i<8; i++) {
            input[0][i] = (float)test_features[i];
        }
        
        matrix output;
        OnnxRun(model, ONNX_NO_CONVERSION, input, output);
        
        double mql5_prob_sell = output[0][0];
        double mql5_prob_buy = output[0][1];
        
        // Validate
        double tolerance = 0.0001;
        bool sell_match = MathAbs(mql5_prob_sell - expected_prob_sell) < tolerance;
        bool buy_match = MathAbs(mql5_prob_buy - expected_prob_buy) < tolerance;
        
        Print("========== VALIDATION RESULTS ==========");
        Print("Expected SELL prob: ", expected_prob_sell);
        Print("MQL5 SELL prob:     ", mql5_prob_sell);
        Print("Difference:         ", MathAbs(mql5_prob_sell - expected_prob_sell));
        Print("Match: ", sell_match ? "✓ PASS" : "✗ FAIL");
        Print("");
        Print("Expected BUY prob:  ", expected_prob_buy);
        Print("MQL5 BUY prob:      ", mql5_prob_buy);
        Print("Difference:         ", MathAbs(mql5_prob_buy - expected_prob_buy));
        Print("Match: ", buy_match ? "✓ PASS" : "✗ FAIL");
        Print("========================================");
        
        if(sell_match && buy_match) {
            Print("✓✓✓ VALIDATION PASSED ✓✓✓");
        } else {
            Print("✗✗✗ VALIDATION FAILED ✗✗✗");
            Print("Check feature calculations!");
        }
        
        OnnxRelease(model);
    }
    

    よくあるデプロイ問題と解決策

    問題 症状 解決策
    特徴量の不整合 Pythonとの予測誤差が1%以上 検証スクリプトを使用します。計算順序、参照(ルックバック)期間、0除算処理を確認します。
    モデル読み込み失敗 OnnxCreateFromFileでINVALID_HANDLE ファイルがMQL5/Files/にあること、ファイル名スペル、Opset互換性(9-15)を確認します。
    入力形状の誤り OnnxRunがfalseを返す 特徴量数が学習時と一致していることを確認します。OnnxGetInputShapeで期待次元をチェックします。
    予測が遅い EAがティックごとに遅延 n_estimatorsを減らす、木の深さを下げる(max_depthを低くする)、もしくは新しいバーのみで予測を実行します。
    インデックス配列エラー ArraySetAsSeriesの警告 CopyClose/High/Lowの前に必ずArraySetAsSeries(array, true)を呼び出します。

    本番監視ダッシュボード

    リアルタイムでモデル性能を追跡するには、以下のコードを追加します。

    //--- Add to global variables section
    struct PredictionStats {
        int total_predictions;
        int buy_signals;
        int sell_signals;
        double avg_confidence;
        double max_confidence;
        double min_confidence;
    } g_stats;
    
    //--- Add to OnInit()
    void ResetStats() {
        g_stats.total_predictions = 0;
        g_stats.buy_signals = 0;
        g_stats.sell_signals = 0;
        g_stats.avg_confidence = 0.0;
        g_stats.max_confidence = 0.0;
        g_stats.min_confidence = 1.0;
    }
    
    //--- Add after model prediction in OnTick()
    void UpdateStats(double prob_sell, double prob_buy) {
        g_stats.total_predictions++;
        
        double max_prob = MathMax(prob_sell, prob_buy);
        
        if(prob_buy > InpConfidenceThreshold) g_stats.buy_signals++;
        if(prob_sell > InpConfidenceThreshold) g_stats.sell_signals++;
        
        g_stats.avg_confidence = (g_stats.avg_confidence * (g_stats.total_predictions - 1) + max_prob) / 
                                  g_stats.total_predictions;
        g_stats.max_confidence = MathMax(g_stats.max_confidence, max_prob);
        g_stats.min_confidence = MathMin(g_stats.min_confidence, max_prob);
    }
    
    //--- Enhanced Comment() display
    Comment(StringFormat(
        "=== Sequential Bootstrap EA ===\n" +
        "Time: %s\n\n" +
        "Current Prediction:\n" +
        "  SELL: %.2f%%  %s\n" +
        "  BUY:  %.2f%%  %s\n\n" +
        "Statistics (Session):\n" +
        "  Predictions: %d\n" +
        "  BUY signals: %d\n" +
        "  SELL signals: %d\n" +
        "  Avg confidence: %.2f%%\n" +
        "  Range: %.2f%% - %.2f%%\n\n" +
        "Positions: %d / %d",
        TimeToString(TimeCurrent(), TIME_DATE|TIME_MINUTES),
        prob_sell * 100, prob_sell > InpConfidenceThreshold ? "[SIGNAL]" : "",
        prob_buy * 100, prob_buy > InpConfidenceThreshold ? "[SIGNAL]" : "",
        g_stats.total_predictions,
        g_stats.buy_signals,
        g_stats.sell_signals,
        g_stats.avg_confidence * 100,
        g_stats.min_confidence * 100,
        g_stats.max_confidence * 100,
        PositionsTotal(),
        InpMaxTrades
    ));


    本番環境向けパフォーマンス最適化

    モデルサイズの最適化

    ライブ取引では、同等の性能を維持しつつより小さいモデルを使用することが望ましいです。

    # Option 1: Train a smaller production model
    clf_prod = SequentiallyBootstrappedBaggingClassifier(
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars.index,
        estimator=DecisionTreeClassifier(
            max_depth=4,          # Reduced from 6
            min_samples_leaf=100  # Increased from 50
        ),
        n_estimators=50,        # Reduced from 100
        max_samples=0.5,
        random_state=42
    )
    clf_prod.fit(X_train, y_train)
    
    # Compare performance
    print("Full model test accuracy:", clf.score(X_test, y_test))
    print("Production model test accuracy:", clf_prod.score(X_test, y_test))
    
    # Option 2: Feature selection to reduce input dimensionality
    from sklearn.feature_selection import SelectKBest, f_classif
    
    selector = SelectKBest(f_classif, k=6)  # Keep only 6 best features
    X_train_selected = selector.fit_transform(X_train, y_train)
    X_test_selected = selector.transform(X_test)
    
    # Train on reduced features
    clf_reduced = SequentiallyBootstrappedBaggingClassifier(
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars.index,
        n_estimators=50,
        random_state=42
    )
    clf_reduced.fit(X_train_selected, y_train)
    
    # Show which features were selected
    selected_features = selector.get_support(indices=True)
    print("Selected feature indices:", selected_features)
    print("Reduced model accuracy:", clf_reduced.score(X_test_selected, y_test))

    代替デプロイ方法:REST API

    ONNXの制約(複雑な前処理や頻繁なモデル更新が必要な場合など)が問題となる場合、REST APIを使うことでより柔軟に対応できます。

    # Python: Simple Flask API
    from flask import Flask, request, jsonify
    import joblib
    import numpy as np
    
    app = Flask(__name__)
    model = joblib.load('sequential_bagging_model.pkl')
    
    @app.route('/predict', methods=['POST'])
    def predict():
        try:
            features = np.array(request.json['features']).reshape(1, -1)
            proba = model.predict_proba(features)[0]
            
            return jsonify({
                'success': True,
                'probability_sell': float(proba[0]),
                'probability_buy': float(proba[1]),
                'model_version': 'v1.0'
            })
        except Exception as e:
            return jsonify({'success': False, 'error': str(e)}), 400
    
    if __name__ == '__main__':
        app.run(host='0.0.0.0', port=5000)
    //--- MQL5: HTTP client for REST API
    #include <JAson.mqh>  // Or your preferred JSON library
    
    bool PredictViaAPI(const double &features[], double &prob_sell, double &prob_buy) {
        string url = "http://localhost:5000/predict";
        
        // Build JSON request
        string json_request = "{";
        json_request += "\"features\":[";
        for(int i=0; i<ArraySize(features); i++) {
            json_request += DoubleToString(features[i], 6);
            if(i < ArraySize(features)-1) json_request += ",";
        }
        json_request += "]}";
        
        // Send HTTP POST request
        char post_data[];
        char result_data[];
        string headers = "Content-Type: application/json\r\n";
        
        StringToCharArray(json_request, post_data, 0, WHOLE_ARRAY, CP_UTF8);
        int res = WebRequest("POST", url, headers, 5000, post_data, result_data, headers);
        
        if(res == -1) {
            Print("API request failed: ", GetLastError());
            return false;
        }
        
        // Parse JSON response
        string response = CharArrayToString(result_data, 0, WHOLE_ARRAY, CP_UTF8);
        // Parse using your JSON library
        // prob_sell = parsed_value;
        // prob_buy = parsed_value;
        
        return true;
    }

    REST APIのトレードオフ:

    項目 ONNX(推奨) REST API
    レイテンシ 約1ミリ秒 約10~50ミリ秒
    複雑さ 低(自己完結型) 中(サーバーが必要)
    更新 手動ファイル置換 ホットリロード可能
    前処理 制限あり(MQL5で再現必須) Python環境でフル対応
    インフラ 必要なし Webサーバーと監視が必要

    ベストプラクティスまとめ

    以下は成功の鍵となるポイントです。

    1. 特徴量の整合性が最重要:MQL5での特徴量がPythonと完全に一致することを検証スクリプトで確認します。小さな誤差でもアンサンブル予測で累積して影響します。
    2. すべてを文書化:特徴量メタデータ、テストケース、モデルバージョンを保存します。将来の自分が感謝します。
    3. 保守的に始める:小規模アンサンブル(50推定器)とシンプルな木(max_depth=4-6)で素早く反復します。
    4. 段階的にテストする:検証 → ペーパートレード → 小規模実運用 → 本番展開。
    5. 継続的に監視する:予測の信頼度、シグナル頻度、実運用とバックテストの比較を追跡します。

    モデル更新ワークフロー:

    # Step 1: Train new model with updated data
    clf_v2 = SequentiallyBootstrappedBaggingClassifier(...)
    clf_v2.fit(X_train_updated, y_train_updated)
    
    # Step 2: Validate against held-out test set
    score_v2 = clf_v2.score(X_test, y_test)
    assert score_v2 >= previous_score * 0.95, "New model performs worse!"
    
    # Step 3: Export with version tag
    model_file = f"sequential_bagging_v2_{datetime.now().strftime('%Y%m%d')}.onnx"
    onnx_model = convert_sklearn(clf_v2, initial_types=initial_type)
    with open(model_file, "wb") as f:
        f.write(onnx_model.SerializeToString())
    
    # Step 4: Run backtests comparing v1 vs v2
    # Step 5: Deploy to paper trading first
    # Step 6: Monitor for 1-2 weeks before live deployment
    # Step 7: Keep v1 as fallback

    以下は再学習のタイミングです。

    • 定期スケジュール:拡張データセットで月次または四半期ごとに再学習する
    • 性能低下:実運用での精度がバックテスト期待値より10%以上低下した場合
    • 市場環境の変化:ボラティリティ、相関、市場構造に大きな変化があった場合
    • 特徴量追加:新しいテクニカル指標やデータソースを追加した場合

    トラブルシューティングガイド

    問題:予測がランダムに見える(すべて0.5前後)

    診断:

    # Check if features have variance
    print("Feature statistics:")
    print(pd.DataFrame(X_train).describe())
    
    # Check class balance
    print("Class distribution:", np.bincount(y_train))
    
    # Verify model actually learned something
    print("Training accuracy:", clf.score(X_train, y_train))
    print("Test accuracy:", clf.score(X_test, y_test))

    解決策:

    • 特徴量がすべて0や定数になっていないことを確認する
    • クラス不均衡が激しい場合はsample_weightを検討する
    • モデルが学習時に適切に収束したことを確認する
    • 過少学習の場合はn_estimatorsや木の深さを増加する

    問題:MQL5での予測がPythonと大きく異なる

    体系的なデバッグ手順:

    # 1. Print raw feature values from both systems
    # Python:
    print("Python features:", X_test[0])
    
    # MQL5: Add to EA
    // Print all features before prediction
    string feat_str = "";
    for(int i=0; i<ArraySize(feature_array); i++) {
        feat_str += StringFormat("[%d]:%.6f ", i, feature_array[i]);
    }
    Print("MQL5 features: ", feat_str);
    
    # 2. Check intermediate calculations
    # Add debug prints to FeatureEngine.mqh for BB, RSI, etc.
    Print("BB Upper:", bb_upper, " Middle:", bb_middle, " Lower:", bb_lower);
    Print("RSI:", rsi_value);
    
    # 3. Verify data alignment
    # Ensure MQL5 arrays are time-series ordered (most recent first)
    # Python typically uses oldest first

    問題:モデルの読み込みが遅い、またはEAがフリーズする

    最適化戦略:

    // 1. Load model once in OnInit, not on every tick
    // ✓ Correct:
    int OnInit() {
        g_model_handle = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT);
    }
    
    // ✗ Wrong:
    void OnTick() {
        long model = OnnxCreateFromFile(InpModelFile, ONNX_DEFAULT); // DON'T DO THIS!
    }
    
    // 2. Reduce model complexity
    # Python: Train lighter model
    clf_fast = SequentiallyBootstrappedBaggingClassifier(
        n_estimators=30,  # Reduced from 100
        max_depth=3      # Reduced from 6
    )
    
    // 3. Predict only on new bar, not every tick
    datetime current_bar = iTime(_Symbol, _Period, 0);
    if(current_bar == g_last_bar_time) return;
    g_last_bar_time = current_bar;

    実運用でのデプロイ例

    以下は、実際に成功した本番デプロイのタイムラインです。

    活動 成功基準
    1 モデル学習、ONNXエクスポート、Pythonとの予測一致確認 最大予測差 < 0.01%
    2 2年以上の履歴データでストラテジーテスターによるバックテスト シャープ > 1.5、最大ドローダウン < 15%
    3 デモ口座でフォワードテスト(フルポジションサイズ) 20回以上の取引、技術的エラーなし
    4、5 ライブ取引(目標資金の10%) バックテスト期待値の±20%内のパフォーマンス
    6~8 徐々に資金を50% → 100%にスケール 一貫したパフォーマンス、予期しない挙動なし
    9~ 本番運用開始、月次パフォーマンスレビュー 月次再学習、四半期ごとのモデル評価


    結論:ONNXデプロイチェックリスト

    以下は、逐次ブートストラップモデルをMQL5で本番運用する前に確認すべき項目です。

    本番前(Python側):

    • ☐ モデルが十分なアウトオブサンプル性能を達成している
    • ☐ ONNXへのエクスポートが成功(skl2onnx)
    • ☐ ONNX予測結果が元のモデルと一致することを確認
    • ☐ 特徴量メタデータを文書化(名前、順序、計算方法)
    • ☐ 入力/出力が既知のテストケースを作成
    • ☐ モデルファイルをバージョン管理およびバックアップ

    実装(MQL5側):

    • ☐ FeatureEngine.mqhがPython計算と完全一致
    • ☐ 検証スクリプトですべてのテストケースに合格
    • ☐ OnInitでモデルが正常に読み込まれる
    • ☐ 予測がエラーなく実行される
    • ☐ リスク管理パラメータを設定
    • ☐ ログ記録と監視を実装

    テスト:

    • ☐ ストラテジーテスターによるバックテスト完了(2年以上)
    • ☐ デモ口座でのフォワードテスト(2週間以上)
    • ☐ パフォーマンス指標が許容範囲内
    • ☐ エッジケースを処理(出来高0、マーケットギャップ等)

    本番:

    • ☐ 最小資本(10%)で開始
    • ☐ 初期2週間は毎日監視
    • ☐ 週次パフォーマンスレビュー
    • ☐ モデル更新スケジュールを確立
    • ☐ フォールバック手順を文書化

    逐次ブートストラップにより学習時の時間的リークを解消し、ONNXを介してMQL5で信頼性の高いデプロイをおこなうことで、研究から本番運用まで一貫したパイプラインを構築できます。この組み合わせにより、学習済みモデルの堅牢性がライブ取引の信頼性に直結します。


    添付ファイル

    ファイル名 説明
    bootstrap_mc.py 標準ブートストラップと逐次ブートストラップの効果を比較するモンテカルロシミュレーションコード。ランダム時系列データを生成し、両手法のユニーク性指標を測定する実験を実行。
    bootstrapping.py 逐次ブートストラップアルゴリズムのコア実装。インディケーター行列の作成、ユニーク性スコアの計算、Numbaを用いた最適化逐次サンプリング関数を含む。
    misc.py データ整形、メモリ最適化、ログデコレータ、パフォーマンス監視、時間ユーティリティ、ファイル変換などの汎用ユーティリティ関数集。
    multiprocess.py 並列処理ユーティリティ。ジョブ分割、進捗報告、複数CPUコアを活用した並列実行用関数を含む。
    sb_bagging.py 逐次ブートストラップバギング分類器および回帰モデルの実装。逐次ブートストラップサンプリングとscikit-learnのバギングフレームワークを統合した金融機械学習向けアンサンブル手法。

    MetaQuotes Ltdにより英語から翻訳されました。
    元の記事: https://www.mql5.com/en/articles/20059

    添付されたファイル |
    bootstrap_mc.py (1.71 KB)
    bootstrapping.py (12.15 KB)
    misc.py (19.74 KB)
    sb_bagging.py (32.35 KB)
    MQL5での取引戦略の自動化(第38回):傾斜角フィルタ付き隠れRSIダイバージェンス取引 MQL5での取引戦略の自動化(第38回):傾斜角フィルタ付き隠れRSIダイバージェンス取引
    スイングポイントを用いて隠れRSIダイバージェンスを検出するMQL5 EAを構築します。これは、価格とRSIに対して、スイング強度、バー間隔、許容誤差、傾き角度のフィルタを適用し、検証済みのシグナルで固定ロット、SL/TP(pips単位)、およびオプションのトレーリングストップを用いて売買を実行するシステムです。
    MQL5における市場ポジショニング戦略の体系(第1回):NVIDIAのビットワイズ戦略研究 MQL5における市場ポジショニング戦略の体系(第1回):NVIDIAのビットワイズ戦略研究
    これまでの「MQL5ウィザード」シリーズで積み上げてきた取り組みを基盤とし、それをさらに発展させる新連載を開始します。本連載は、システムトレードおよび戦略テストへのアプローチを一段引き上げることを目的としています。単一タイプのポジションのみを保有するように設計されたエキスパートアドバイザーに焦点を当てます。主にロングポジションのみを扱う設計です。市場トレンドを一方向に限定することで、分析が簡素化され、戦略の複雑さが軽減されます。また、特に為替以外の資産を扱う場合には、重要な洞察が得られる可能性があります。したがって本連載では、株式やその他の非為替資産において、このアプローチが有効かどうかを検証していきます。買い専用戦略は、スマートマネーや機関投資家の戦略と相関することが多いため、その実用性を体系的に探究します。
    長期取引の最適化:包み足と流動性戦略 長期取引の最適化:包み足と流動性戦略
    高時間足(W1、D1、MN)に基づいて長期的な分析と取引判断をおこなうEAです。このEAは、短期的な値動きに翻弄されることなく、利確目標に到達するまで自分のトレンドの方向性(バイアス)を頻繁に変えずにポジションを保持できる、忍耐強い長期トレーダー向けに設計されています。
    MQL5標準ライブラリエクスプローラー(第3回):エキスパート標準偏差チャネル MQL5標準ライブラリエクスプローラー(第3回):エキスパート標準偏差チャネル
    CTradeクラスとCChartObjectStdDevChannelクラスを用いたエキスパートアドバイザー(EA)を開発し、さらに収益性を高めるためのいくつかのフィルタを適用します。前回の議論で扱った理論を実装へ落とし込むことが目的です。また、MQL5標準ライブラリとその内部コードベースを理解するのに役立つ、もう一つの簡単なアプローチも紹介します。本記事では、これらの概念を実践的に学ぶことができます。