English Русский 中文 Español Deutsch Português
preview
時系列マイニング用データラベル(第3回):ラベルデータの利用例

時系列マイニング用データラベル(第3回):ラベルデータの利用例

MetaTrader 5エキスパートアドバイザー | 30 1月 2024, 10:50
547 0
Yuqiang Pan
Yuqiang Pan

はじめに

この記事では、Metatrader 5取引プラットフォームを通じてPyTorchLightningおよびPyTorchForecastingフレームワークを使用して、ニューラルネットワークに基づく金融時系列予測を実装する方法を紹介します。

今回は、これら2つのフレームワークを選択した理由と使用したデータ形式についても説明します。

データに関しては、前2回の記事のデータラベリングによって生成されたデータを利用できます。これらは同じ形式を共有しているため、このペーパーの方法論に従って簡単に拡張できます。

前の2つの記事へのリンクは次のとおりです。 

  1. 時系列マイニング用データラベル(前編):EA運用チャートによるトレンドマーカー付きデータセットの作成
  2. 時系列マイニングのためのデータラベル(第2回):Pythonを使ってトレンドマーカー付きデータセットを作成する

目次


いくつかの重要なPythonライブラリ

まず、使用する主なPythonライブラリを紹介します。

1.PyTorch Lightning

PyTorch Lightningは、スケーラビリティを犠牲にすることなく最大限の柔軟性を必要とするプロのAI研究者や機械学習エンジニア向けに特別に設計されたディープラーニングフレームワークです。

その中心的な概念は、学術コード(モデル定義、順方向/逆方向伝播、オプティマイザー、検証など)をエンジニアリングコード(forループ、保存メカニズム、TensorBoard ログ、訓練戦略など)から分離することです。その結果、コードがより合理化され、理解しやすくなります。

主な利点は次のとおりです。

  • 高い再利用性:さまざまなプロジェクト間でコードを再利用できるように設計されている
  • 簡単なメンテナンス:構造化された設計のおかげで、コードのメンテナンスが簡単になる
  • 明確なロジック:定型的なエンジニアリングコードを抽象化することで、機械学習コードの識別と理解が容易になる

全体として、PyTorch Lightningは、PyTorchコードを整理および管理するための効率的な方法を提供する非常に強力なライブラリです。さらに、モデルの訓練、検証、テストなどの一般的だが複雑なタスクに対処するための構造化されたアプローチを提供します。

このライブラリの詳細な使用法は、公式ドキュメントhttps://lightning.ai/docsに記載されています。

2.PyTorch Forecasting

これは、時系列予測用に特別に設計されたPythonライブラリです。PyTorch上に構築されているため、PyTorchの強力な自動微分および最適化ライブラリを利用できると同時に、PyTorch Forecastingが時系列予測に提供する利便性の恩恵も受けられます。

PyTorch Forecasting内では、自己回帰モデル(AR、ARIMA)、状態空間モデル(SARIMAX)、ニューラルネットワーク(LSTM、GRU)、アンサンブル手法(Prophet、N-Beats)などを含むさまざまな予測モデルの実装を見つけることができます。これは、アプローチごとに広範な定型コードを作成することなく、同じフレームワーク内でさまざまな予測アプローチを実験して比較できることを意味します。

このライブラリは、時系列での一般的なタスクの処理を支援するさまざまなデータ前処理ツールも提供します。これらのツールには、欠損値の代入、スケーリング、特徴抽出、ローリングウィンドウ変換などが含まれます。これは、データ処理に多くの時間を費やす必要がなく、モデルの設計と最適化により集中できることを意味します。

また、モデルのパフォーマンスを評価するための統合インターフェイスも提供します。QuantileLossやSMAPEなどの時系列の損失関数と検証メトリクスを実装し、早期停止や相互検証などの訓練方法論をサポートします。これにより、モデルのパフォーマンスをより簡単に追跡および強化できるようになります。

時系列予測プロジェクトの効率と保守性を向上させる方法を探している場合は、PyTorch Forecastingが最適な選択肢となる可能性があります。PyTorchコードを整理および管理するための効果的かつ柔軟な手段を提供し、最も重要な側面である機械学習モデル自体に集中できるようにします。

このライブラリの詳細な使用方法は、公式ドキュメントhttps://pytorch-forecasting.readthedocs.io/en/stableに記載されています。

3.N-HiTSモデルについて

N-HiTSモデルは、革新的な階層内挿およびマルチレートデータサンプリング技術を導入することで、長期予測における予測の変動性と計算の複雑さの問題に対処します。これにより、N-HiTSモデルはあらゆる長さの予測範囲を効果的に近似できるようになります。

さらに、大規模なデータセットに対しておこなわれた広範な実験により、N-HiTSモデルは最新のTransformerアーキテクチャと比較して精度が平均20%近く向上し、同時に計算時間を1桁(50分の1に)削減できることが実証されました。

論文へのリンク:https://doi.org/10.48550/arXiv.2201.12886


初期化

まず、必要なライブラリをインポートする必要があります。これらのライブラリには、Metatrader 5(MT5ターミナルとの対話用)、PyTorch Lightning(モデルの訓練用)、およびデータ処理と視覚化用のその他のライブラリが含まれます。

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

次に、Metatrader 5を初期化する必要があります。これは、mt.initialize()関数を呼び出すことによっておこなわれます。これを使用するだけでは初期化できない場合は、MT5ターミナルのパスをパラメータとしてこの関数に渡す必要があります(例では「D:\Project\mt\MT5\terminal64.exe」は私の個人的なパスの場所ですが、実際のアプリケーションでは、独自のパスの場所に構成する必要があります)。初期化が成功した場合、関数はTrueを返し、それ以外の場合はFalseを返します。

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

mt.symbols_total()関数は、MT5ターミナルで利用可能な取引可能な品種の総数を取得するために使用されます。データを正しく取得できているかどうかを判断するために使用できます。合計数が0より大きい場合、mt.copy_rates_from_pos()関数を使用して、指定された取引可能な種類の履歴データを取得できます。この例では、GOLD_microバリエーションの「mt_data_len」M15(15分)周期の最新の長さのデータを取得しました。

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

最後に、mt.shutdown()関数を使ってMT5端末との接続を切断し、取得したデータをPandasDataFrame形式に変換します。

mt.shutdown()
rts_fm=pd.DataFrame(rts)

次に、MT5ターミナルから取得したデータを前処理する方法について説明します。

まずタイムスタンプを日付に変換する必要があります。

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

ここでは、データにラベルを付ける方法については説明しません。方法については、以前の2つの記事を参照してください(この記事の冒頭に記事リンクがあります)。予測モデルの使用方法を簡潔に示すために、すべての「max_encoder_length+2max_prediction_length」個のデータをグループに分割するだけです。各グループには0から「max_encoder_length+2max_prediction_length-1」までのシーケンスがあり、それらを埋めます。このようにして、元のデータに必要なラベルを追加します。まず、元の時間インデックス(つまり、DataFrameのインデックス)を時間インデックスに変換する必要があります。元の時間インデックスを(max_encoder_length+2max_prediction_length)で割った余りを計算し、その結果を新しい時間インデックスとして使用します。これは、時間インデックスを0から「max_encoder_length+2*max_prediction_length-1」の範囲にマップします。

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

また、元の時間インデックスをグループに変換する必要があります。元の時間インデックスを「max_encoder_length+2*max_prediction_length」で割った値を計算し、その結果を新しいグループとして使用します。

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

データの前処理部分を関数にカプセル化します。取得する必要があるデータの長さを渡すだけで、データの前処理作業を完了できます。

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


pytorch_forecasting.TimeSeriesDataSetクラスの書き換え

pytorch_forecastingのto_dataloader()関数を書き換えます。 これにより、データをシャッフルするかどうか、およびバッチの最後のグループを削除するかどうかを制御できます(主に、データの最後のグループの長さ不足によって引き起こされる予期せぬエラーを防ぐため)。その方法は次のとおりです。

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
このコードは、TimeSeriesDataSetを継承する新しいクラスNew_TmSrDtを作成します。その後、この新しいクラスでto_dataloader()関数がオーバーライドされ、shuffleパラメータとDrop_lastパラメータが組み込まれます。これにより、データ読み込みプロセスをより詳細に制御できるようになります。コード内では、TimeSeriesDataSetのインスタンスをNew_TmSrDtに忘れずに置き換えてください。


訓練データセットおよび検証データセットの作成

まず、訓練データのカットオフポイントを決定する必要があります。これは、最大のtime_idx値から最大予測長を減算することによっておこなわれます。

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

次に、New_TmSrDtクラス(書き直したTimeSeriesDataSetクラス)を使用して訓練データセットを作成します。このクラスには次のパラメータが必要です。

  • データフレーム(この場合は「rts_fm」)
  • time_idx列:連続する整数シーケンス
  • ターゲット列(この場合は「close」):予測する値
  • さまざまな時系列を表すグループ列(この場合はseries)
  • エンコーダとプレディクタの最大長
context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

次に、New_TmSrDt.from_dataset()関数を使用して検証データセットを作成します。この関数には次のパラメータが必要です。

  • 訓練データセット
  • データフレーム
  • 最小予測インデックス:訓練データの最大time_idx値より1大きい必要

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

最後に、to_dataloader()関数を使用して、訓練データセットと検証データセットをPyTorchDataLoaderオブジェクトに変換します。この関数には次のパラメータが必要です。

  • train:データをシャッフルするかどうかを示す
  • batch_size:バッチごとのサンプル数
  • num_workers:データロード用のワーカープロセスの数

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

最後に、コードのこの部分を関数spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool)にカプセル化し、次のパラメータを指定します。

  • data:処理する必要があるデータセットを受け取るために使用
  • t_drop_last:訓練データセットの最後のグループを削除するかどうか
  • t_shuffle:訓練データをシャッフルするかどうか
  • v_drop_last:検証データセットの最後のグループを削除するかどうか
  • v_shuffle:検証データをシャッフルするかどうか

train_dataloader(訓練データセットのデータローダーのインスタンス)、val_dataloader)検証データセットのデータローダーのインスタンス)、training(データセットのTimeSeriesDataSetのインスタンス)を、後で使用するこの関数の戻り値として作成します。

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


モデルの作成と訓練

ここで、NHiTSモデルの作成を開始します。この部分では、パラメータの設定方法と訓練方法を説明します。

1. 最適な学習率を見つける

モデルの作成を開始する前に、PyTorch LightningのTunerオブジェクトを使用して、最適な学習率を見つけます。

まず、PyTorch LightningのTrainerオブジェクトを作成する必要があります。ここでは、acceleratorパラメータを使用してデバイスタイプを指定し、gradient_clip_valを使用して勾配爆発を防止します。

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

次に、NHiTS.from_dataset()関数を使用してNHiTSモデルnetを作成します。この関数には次のパラメータが必要です。

  • 訓練データセット
  • 学習率
  • 重みの減少
  • 損失関数
  • 隠れ層のサイズ
  • オプティマイザー
net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

次に、Tunerクラスをインスタンス化し、lr_find()関数を呼び出します。この関数は、データセットに基づいて一連の学習率でモデルを訓練し、各学習率の損失を比較して最適な学習率を取得します。

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

同様に、最良の学習率を取得するコードのこの部分を関数get_learning_rate()にカプセル化し、取得した最良の学習率をその戻り値として作成します。

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

学習率を視覚化したい場合は、次のコードを追加できます。

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

この例の結果は次のようになります。

lr

推奨学習率:0.003981071705534973

2.EarlyStoppingコールバックの定義

このコールバックは主に検証損失を監視し、連続するいくつかのエポックで損失が改善されない場合に訓練を停止するために使用されます。これにより、モデルの過剰適合を防ぐことができます。

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

ここで注意すべきパラメータはpatienceです。これは主に、連続したいくつかのエポックで損失が改善されなかった場合に、訓練中にいつ停止するかを制御します。10に設定しました。

3.ModelCheckpointコールバックの定義

このコールバックは主に、モデルのアーカイブとアーカイブの名前を制御するために使用されます。主にこの2つの変数を設定します。

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

save_top_kは、上位数個の最良のモデルの保存を制御するために使用されます。これを1に設定し、最良のモデルのみを保存します。

4.訓練モデルの定義

まず、lightning.pytorchでTrainerクラスをインスタンス化し、前に定義した2つのコールバックを追加する必要があります。

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

ここで注意する必要があるパラメータは、max_epochs(訓練エポックの最大数)、gradient_clip_val(勾配爆発を防ぐために使用)、callbacksです。ここで、max_epochsは後で定義するグローバル変数であるepを使用し、callbacksはコールバックのコレクションです。

次に、NHiTSモデルを定義してインスタンス化する必要もあります。

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

ここで、通常、パラメータを変更する必要はなく、デフォルトのパラメータを使用するだけです。ここでは、lossのみをMQF2DistributionLoss損失関数に変更します。

5.訓練モジュール 

Trainerオブジェクトのfit()関数を使用してモデルを訓練します。

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

同様に、コードのこの部分を関数train()にカプセル化します。

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

この関数は、予測タスクに使用できる訓練済みモデルを返します。


実行ロジックの定義

1.グローバル変数の定義

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__trainは、現在モデルを訓練しているかテストしているかを制御するために使用されます。

epは最大訓練エポックを制御するために使用されます。EarlyStoppingを設定したので、モデルが収束しなくなったときに自動的に停止するため、この値を少し大きく設定できます。

mt_data_lenはクライアントから取得した最近の時系列データの数です。

max_encoder_lengthとmax_prediction_lengthは、それぞれ最大符号化長と最大予測長です。

2.訓練

また、訓練の完了時に現在の最適な訓練結果をローカルファイルに保存する必要があるため、この情報を保存するためのjsonファイルを定義します。

info_file='results.json'

訓練プロセスを明確にするために、訓練中に不必要な警告情報の出力を避ける必要があるため、次のコードを追加します。

warnings.filterwarnings("ignore")

次は訓練ロジックです。

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

訓練が完了すると、ルートディレクトリのresults.jsonファイルで、最良のモデルの保存場所と最良のスコアを見つけることができます。

訓練プロセス中に、各エポックの進行状況を示す進行状況バーが表示されます。 

訓練:

訓練

訓練完了:

ts

3.モデルの検証

訓練後、モデルを検証して視覚化したいと思います。次のコードを追加できます。

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

TensorBoardを使用して、訓練中にリアルタイムで訓練状況の視覚化を表示することもできます。ここではデモンストレーションはおこなっていません。

結果:

参照

4.訓練されたモデルのテスト

まず、jsonファイルを開いて、最適なモデルの保存場所を見つけます。

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

次に、モデルをロードします。

best_model = NHiTS.load_from_checkpoint(best_m_p)

次に、モデルをテストするためにクライアントからリアルタイムでデータを取得します。

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

結果は以下のとおりです。

pref

5.  モデルを評価する

もちろん、PyTorchForecastingライブラリのいくつかのメトリクスを使用して、モデルのパフォーマンスを評価できます。平均絶対誤差(MAE)と対称平均絶対パーセント誤差(SMAPE)を使用して評価し、評価結果を出力する方法は次のとおりです。

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

このコードスニペットでは、最初にMAEメトリクスとSMAPEメトリクスをインポートします。次に、これらのメトリクスを使用して、予測値(raw_predictions["prediction"])と実際の値(raw_predictions["target"])の間の誤差を計算します。これらのメトリクスは、モデルのパフォーマンスを理解し、モデルをさらに改善するための方向性を提供するのに役立ちます。



結論

この記事では、前の2つの記事で説明したラベルデータの使用方法を検討し、データを使用してN-HiTsモデルを作成する方法を示しました。次に、モデルを訓練し、モデルを検証しました。結果グラフから、結果が良好であることが簡単にわかります。また、MT5でこのモデルを使用して30本のローソク足の予測をおこなう方法も示しました。もちろん、実際の取引では読者が実際の状況に応じて多くのテストをおこない、対応する取引ルールを指定する必要があるため、予測結果に基づいて注文をおこなう方法については触れませんでした。

最後に、楽しい時間を過ごしてください!


添付:

完全なコード:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()


MetaQuotes Ltdにより英語から翻訳されました。
元の記事: https://www.mql5.com/en/articles/13255

添付されたファイル |
n_hits.py (9.7 KB)
エキスパートアドバイザー(EA)に指標を追加するための既製のテンプレート(第2部):出来高指標とビルウィリアムズの指標 エキスパートアドバイザー(EA)に指標を追加するための既製のテンプレート(第2部):出来高指標とビルウィリアムズの指標
この記事では、標準的な出来高指標とビルウィリアムズ指標のカテゴリについて見ていきます。パラメータの宣言と設定、指標の初期化と解除、EAの指標バッファからのデータとシグナルの受信など、EAで指標を使用するためのすぐに使えるテンプレートを作成します。
独自のLLMをEAに統合する(第2部):環境展開例 独自のLLMをEAに統合する(第2部):環境展開例
今日の人工知能の急速な発展に伴い、言語モデル(LLM)は人工知能の重要な部分となっています。私たちは、強力なLLMをアルゴリズム取引に統合する方法を考える必要があります。ほとんどの人にとって、これらの強力なモデルをニーズに応じて微調整し、ローカルに展開して、アルゴリズム取引に適用することは困難です。本連載では、この目標を達成するために段階的なアプローチをとっていきます。
MQL5のALGLIB数値解析ライブラリ MQL5のALGLIB数値解析ライブラリ
この記事では、ALGLIB3.19数値分析ライブラリ、その応用、金融データ分析の効率を向上させる新しいアルゴリズムについて簡単に説明します。
独自のLLMをEAに統合する(第1部):ハードウェアと環境の導入 独自のLLMをEAに統合する(第1部):ハードウェアと環境の導入
今日の人工知能の急速な発展に伴い、言語モデル(LLM)は人工知能の重要な部分となっています。私たちは、強力なLLMをアルゴリズム取引に統合する方法を考える必要があります。ほとんどの人にとって、これらの強力なモデルをニーズに応じて微調整し、ローカルに展開して、アルゴリズム取引に適用することは困難です。本連載では、この目標を達成するために段階的なアプローチをとっていきます。