時系列マイニング用データラベル(第3回):ラベルデータの利用例
Yuqiang Pan | 30 1月, 2024
はじめに
この記事では、Metatrader 5取引プラットフォームを通じてPyTorchLightningおよびPyTorchForecastingフレームワークを使用して、ニューラルネットワークに基づく金融時系列予測を実装する方法を紹介します。
今回は、これら2つのフレームワークを選択した理由と使用したデータ形式についても説明します。
データに関しては、前2回の記事のデータラベリングによって生成されたデータを利用できます。これらは同じ形式を共有しているため、このペーパーの方法論に従って簡単に拡張できます。
前の2つの記事へのリンクは次のとおりです。
- 時系列マイニング用データラベル(前編):EA運用チャートによるトレンドマーカー付きデータセットの作成
- 時系列マイニングのためのデータラベル(第2回):Pythonを使ってトレンドマーカー付きデータセットを作成する
目次
- はじめに
- いくつかの重要なPythonライブラリ
- 初期化
- pytorch_forecasting.TimeSeriesDataSetクラスの書き換え
- 訓練および検証データセットの作成
- モデルの作成と訓練
- 実行ロジックの定義
- 結論
いくつかの重要なPythonライブラリ
まず、使用する主なPythonライブラリを紹介します。
1.PyTorch Lightning
PyTorch Lightningは、スケーラビリティを犠牲にすることなく最大限の柔軟性を必要とするプロのAI研究者や機械学習エンジニア向けに特別に設計されたディープラーニングフレームワークです。
その中心的な概念は、学術コード(モデル定義、順方向/逆方向伝播、オプティマイザー、検証など)をエンジニアリングコード(forループ、保存メカニズム、TensorBoard ログ、訓練戦略など)から分離することです。その結果、コードがより合理化され、理解しやすくなります。
主な利点は次のとおりです。
- 高い再利用性:さまざまなプロジェクト間でコードを再利用できるように設計されている
- 簡単なメンテナンス:構造化された設計のおかげで、コードのメンテナンスが簡単になる
- 明確なロジック:定型的なエンジニアリングコードを抽象化することで、機械学習コードの識別と理解が容易になる
全体として、PyTorch Lightningは、PyTorchコードを整理および管理するための効率的な方法を提供する非常に強力なライブラリです。さらに、モデルの訓練、検証、テストなどの一般的だが複雑なタスクに対処するための構造化されたアプローチを提供します。
このライブラリの詳細な使用法は、公式ドキュメントhttps://lightning.ai/docsに記載されています。2.PyTorch Forecasting
これは、時系列予測用に特別に設計されたPythonライブラリです。PyTorch上に構築されているため、PyTorchの強力な自動微分および最適化ライブラリを利用できると同時に、PyTorch Forecastingが時系列予測に提供する利便性の恩恵も受けられます。
PyTorch Forecasting内では、自己回帰モデル(AR、ARIMA)、状態空間モデル(SARIMAX)、ニューラルネットワーク(LSTM、GRU)、アンサンブル手法(Prophet、N-Beats)などを含むさまざまな予測モデルの実装を見つけることができます。これは、アプローチごとに広範な定型コードを作成することなく、同じフレームワーク内でさまざまな予測アプローチを実験して比較できることを意味します。
このライブラリは、時系列での一般的なタスクの処理を支援するさまざまなデータ前処理ツールも提供します。これらのツールには、欠損値の代入、スケーリング、特徴抽出、ローリングウィンドウ変換などが含まれます。これは、データ処理に多くの時間を費やす必要がなく、モデルの設計と最適化により集中できることを意味します。
また、モデルのパフォーマンスを評価するための統合インターフェイスも提供します。QuantileLossやSMAPEなどの時系列の損失関数と検証メトリクスを実装し、早期停止や相互検証などの訓練方法論をサポートします。これにより、モデルのパフォーマンスをより簡単に追跡および強化できるようになります。
時系列予測プロジェクトの効率と保守性を向上させる方法を探している場合は、PyTorch Forecastingが最適な選択肢となる可能性があります。PyTorchコードを整理および管理するための効果的かつ柔軟な手段を提供し、最も重要な側面である機械学習モデル自体に集中できるようにします。
このライブラリの詳細な使用方法は、公式ドキュメントhttps://pytorch-forecasting.readthedocs.io/en/stableに記載されています。
3.N-HiTSモデルについて
N-HiTSモデルは、革新的な階層内挿およびマルチレートデータサンプリング技術を導入することで、長期予測における予測の変動性と計算の複雑さの問題に対処します。これにより、N-HiTSモデルはあらゆる長さの予測範囲を効果的に近似できるようになります。
さらに、大規模なデータセットに対しておこなわれた広範な実験により、N-HiTSモデルは最新のTransformerアーキテクチャと比較して精度が平均20%近く向上し、同時に計算時間を1桁(50分の1に)削減できることが実証されました。
論文へのリンク:https://doi.org/10.48550/arXiv.2201.12886
初期化
まず、必要なライブラリをインポートする必要があります。これらのライブラリには、Metatrader 5(MT5ターミナルとの対話用)、PyTorch Lightning(モデルの訓練用)、およびデータ処理と視覚化用のその他のライブラリが含まれます。
import MetaTrader5 as mt5 import lightning.pytorch as pl from lightning.pytorch.callbacks import EarlyStopping import matplotlib.pyplot as plt import pandas as pd from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss from lightning.pytorch.tuner import Tuner
次に、Metatrader 5を初期化する必要があります。これは、mt.initialize()関数を呼び出すことによっておこなわれます。これを使用するだけでは初期化できない場合は、MT5ターミナルのパスをパラメータとしてこの関数に渡す必要があります(例では「D:\Project\mt\MT5\terminal64.exe」は私の個人的なパスの場所ですが、実際のアプリケーションでは、独自のパスの場所に構成する必要があります)。初期化が成功した場合、関数はTrueを返し、それ以外の場合はFalseを返します。
if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version())
mt.symbols_total()関数は、MT5ターミナルで利用可能な取引可能な品種の総数を取得するために使用されます。データを正しく取得できているかどうかを判断するために使用できます。合計数が0より大きい場合、mt.copy_rates_from_pos()関数を使用して、指定された取引可能な種類の履歴データを取得できます。この例では、GOLD_microバリエーションの「mt_data_len」M15(15分)周期の最新の長さのデータを取得しました。
sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown()
最後に、mt.shutdown()関数を使ってMT5端末との接続を切断し、取得したデータをPandasDataFrame形式に変換します。
mt.shutdown() rts_fm=pd.DataFrame(rts)
次に、MT5ターミナルから取得したデータを前処理する方法について説明します。
まずタイムスタンプを日付に変換する必要があります。
rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s')
ここでは、データにラベルを付ける方法については説明しません。方法については、以前の2つの記事を参照してください(この記事の冒頭に記事リンクがあります)。予測モデルの使用方法を簡潔に示すために、すべての「max_encoder_length+2max_prediction_length」個のデータをグループに分割するだけです。各グループには0から「max_encoder_length+2max_prediction_length-1」までのシーケンスがあり、それらを埋めます。このようにして、元のデータに必要なラベルを追加します。まず、元の時間インデックス(つまり、DataFrameのインデックス)を時間インデックスに変換する必要があります。元の時間インデックスを(max_encoder_length+2max_prediction_length)で割った余りを計算し、その結果を新しい時間インデックスとして使用します。これは、時間インデックスを0から「max_encoder_length+2*max_prediction_length-1」の範囲にマップします。
rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length)
また、元の時間インデックスをグループに変換する必要があります。元の時間インデックスを「max_encoder_length+2*max_prediction_length」で割った値を計算し、その結果を新しいグループとして使用します。
rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
データの前処理部分を関数にカプセル化します。取得する必要があるデータの長さを渡すだけで、データの前処理作業を完了できます。
def get_data(mt_data_len:int): if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm
pytorch_forecasting.TimeSeriesDataSetクラスの書き換え
pytorch_forecastingのto_dataloader()関数を書き換えます。 これにより、データをシャッフルするかどうか、およびバッチの最後のグループを削除するかどうかを制御できます(主に、データの最後のグループの長さ不足によって引き起こされる予期せぬエラーを防ぐため)。その方法は次のとおりです。
class New_TmSrDt(TimeSeriesDataSet): def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, drop_last=drop_last, #modification collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs)このコードは、TimeSeriesDataSetを継承する新しいクラスNew_TmSrDtを作成します。その後、この新しいクラスでto_dataloader()関数がオーバーライドされ、shuffleパラメータとDrop_lastパラメータが組み込まれます。これにより、データ読み込みプロセスをより詳細に制御できるようになります。コード内では、TimeSeriesDataSetのインスタンスをNew_TmSrDtに忘れずに置き換えてください。
訓練データセットおよび検証データセットの作成
まず、訓練データのカットオフポイントを決定する必要があります。これは、最大のtime_idx値から最大予測長を減算することによっておこなわれます。
max_encoder_length = 2*96 max_prediction_length = 30 training_cutoff = rts_fm["time_idx"].max() - max_prediction_length
次に、New_TmSrDtクラス(書き直したTimeSeriesDataSetクラス)を使用して訓練データセットを作成します。このクラスには次のパラメータが必要です。
- データフレーム(この場合は「rts_fm」)
- time_idx列:連続する整数シーケンス
- ターゲット列(この場合は「close」):予測する値
- さまざまな時系列を表すグループ列(この場合はseries)
- エンコーダとプレディクタの最大長
context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1)
次に、New_TmSrDt.from_dataset()関数を使用して検証データセットを作成します。この関数には次のパラメータが必要です。
- 訓練データセット
- データフレーム
- 最小予測インデックス:訓練データの最大time_idx値より1大きい必要
validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)
最後に、to_dataloader()関数を使用して、訓練データセットと検証データセットをPyTorchDataLoaderオブジェクトに変換します。この関数には次のパラメータが必要です。
- train:データをシャッフルするかどうかを示す
- batch_size:バッチごとのサンプル数
- num_workers:データロード用のワーカープロセスの数
train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0)
最後に、コードのこの部分を関数spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool)にカプセル化し、次のパラメータを指定します。
- data:処理する必要があるデータセットを受け取るために使用
- t_drop_last:訓練データセットの最後のグループを削除するかどうか
- t_shuffle:訓練データをシャッフルするかどうか
- v_drop_last:検証データセットの最後のグループを削除するかどうか
- v_shuffle:検証データをシャッフルするかどうか
train_dataloader(訓練データセットのデータローダーのインスタンス)、val_dataloader)検証データセットのデータローダーのインスタンス)、training(データセットのTimeSeriesDataSetのインスタンス)を、後で使用するこの関数の戻り値として作成します。
def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training
モデルの作成と訓練
ここで、NHiTSモデルの作成を開始します。この部分では、パラメータの設定方法と訓練方法を説明します。
1. 最適な学習率を見つける
モデルの作成を開始する前に、PyTorch LightningのTunerオブジェクトを使用して、最適な学習率を見つけます。
まず、PyTorch LightningのTrainerオブジェクトを作成する必要があります。ここでは、acceleratorパラメータを使用してデバイスタイプを指定し、gradient_clip_valを使用して勾配爆発を防止します。
pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)
次に、NHiTS.from_dataset()関数を使用してNHiTSモデルnetを作成します。この関数には次のパラメータが必要です。
- 訓練データセット
- 学習率
- 重みの減少
- 損失関数
- 隠れ層のサイズ
- オプティマイザー
net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", )
次に、Tunerクラスをインスタンス化し、lr_find()関数を呼び出します。この関数は、データセットに基づいて一連の学習率でモデルを訓練し、各学習率の損失を比較して最適な学習率を取得します。
res = Tuner(trainer).lr_find( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion()
同様に、最良の学習率を取得するコードのこの部分を関数get_learning_rate()にカプセル化し、取得した最良の学習率をその戻り値として作成します。
def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) lr_=res.suggestion() return lr_
学習率を視覚化したい場合は、次のコードを追加できます。
print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()
この例の結果は次のようになります。
推奨学習率:0.003981071705534973
2.EarlyStoppingコールバックの定義
このコールバックは主に検証損失を監視し、連続するいくつかのエポックで損失が改善されない場合に訓練を停止するために使用されます。これにより、モデルの過剰適合を防ぐことができます。
early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min")
ここで注意すべきパラメータはpatienceです。これは主に、連続したいくつかのエポックで損失が改善されなかった場合に、訓練中にいつ停止するかを制御します。10に設定しました。
3.ModelCheckpointコールバックの定義
このコールバックは主に、モデルのアーカイブとアーカイブの名前を制御するために使用されます。主にこの2つの変数を設定します。
ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}')
save_top_kは、上位数個の最良のモデルの保存を制御するために使用されます。これを1に設定し、最良のモデルのみを保存します。
4.訓練モデルの定義
まず、lightning.pytorchでTrainerクラスをインスタンス化し、前に定義した2つのコールバックを追加する必要があります。
trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, )
ここで注意する必要があるパラメータは、max_epochs(訓練エポックの最大数)、gradient_clip_val(勾配爆発を防ぐために使用)、callbacksです。ここで、max_epochsは後で定義するグローバル変数であるepを使用し、callbacksはコールバックのコレクションです。
次に、NHiTSモデルを定義してインスタンス化する必要もあります。
net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), )
ここで、通常、パラメータを変更する必要はなく、デフォルトのパラメータを使用するだけです。ここでは、lossのみをMQF2DistributionLoss損失関数に変更します。
5.訓練モジュール
Trainerオブジェクトのfit()関数を使用してモデルを訓練します。
trainer.fit( net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, )
同様に、コードのこの部分を関数train()にカプセル化します。
def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, # The number of times without improvement will stop verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, # Save the top few best ones filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer
この関数は、予測タスクに使用できる訓練済みモデルを返します。
実行ロジックの定義
1.グローバル変数の定義
ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128
__trainは、現在モデルを訓練しているかテストしているかを制御するために使用されます。
epは最大訓練エポックを制御するために使用されます。EarlyStoppingを設定したので、モデルが収束しなくなったときに自動的に停止するため、この値を少し大きく設定できます。
mt_data_lenはクライアントから取得した最近の時系列データの数です。
max_encoder_lengthとmax_prediction_lengthは、それぞれ最大符号化長と最大予測長です。
2.訓練
また、訓練の完了時に現在の最適な訓練結果をローカルファイルに保存する必要があるため、この情報を保存するためのjsonファイルを定義します。
info_file='results.json'
訓練プロセスを明確にするために、訓練中に不必要な警告情報の出力を避ける必要があるため、次のコードを追加します。
warnings.filterwarnings("ignore")
次は訓練ロジックです。
dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)
訓練が完了すると、ルートディレクトリのresults.jsonファイルで、最良のモデルの保存場所と最良のスコアを見つけることができます。
訓練プロセス中に、各エポックの進行状況を示す進行状況バーが表示されます。
訓練:
訓練完了:
3.モデルの検証
訓練後、モデルを検証して視覚化したいと思います。次のコードを追加できます。
best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) # sample 500 paths samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show()
TensorBoardを使用して、訓練中にリアルタイムで訓練状況の視覚化を表示することもできます。ここではデモンストレーションはおこなっていません。
結果:
4.訓練されたモデルのテスト
まず、jsonファイルを開いて、最適なモデルの保存場所を見つけます。
with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p)
次に、モデルをロードします。
best_model = NHiTS.load_from_checkpoint(best_m_p)
次に、モデルをテストするためにクライアントからリアルタイムでデータを取得します。
offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] #get the last group of data # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()
結果は以下のとおりです。
5. モデルを評価する
もちろん、PyTorchForecastingライブラリのいくつかのメトリクスを使用して、モデルのパフォーマンスを評価できます。平均絶対誤差(MAE)と対称平均絶対パーセント誤差(SMAPE)を使用して評価し、評価結果を出力する方法は次のとおりです。
from pytorch_forecasting.metrics import MAE, SMAPE mae = MAE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Mean Absolute Error: {mae}") smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"]) print(f"Symmetric Mean Absolute Percentage Error: {smape}")
このコードスニペットでは、最初にMAEメトリクスとSMAPEメトリクスをインポートします。次に、これらのメトリクスを使用して、予測値(raw_predictions["prediction"])と実際の値(raw_predictions["target"])の間の誤差を計算します。これらのメトリクスは、モデルのパフォーマンスを理解し、モデルをさらに改善するための方向性を提供するのに役立ちます。
結論
この記事では、前の2つの記事で説明したラベルデータの使用方法を検討し、データを使用してN-HiTsモデルを作成する方法を示しました。次に、モデルを訓練し、モデルを検証しました。結果グラフから、結果が良好であることが簡単にわかります。また、MT5でこのモデルを使用して30本のローソク足の予測をおこなう方法も示しました。もちろん、実際の取引では読者が実際の状況に応じて多くのテストをおこない、対応する取引ルールを指定する必要があるため、予測結果に基づいて注文をおこなう方法については触れませんでした。最後に、楽しい時間を過ごしてください!
添付:
完全なコード:
# Copyright 2021, MetaQuotes Ltd. # https://www.mql5.com # from typing import Union import lightning.pytorch as pl import os from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint import matplotlib.pyplot as plt import numpy as np import pandas as pd # import torch from pytorch_forecasting import NHiTS, TimeSeriesDataSet from pytorch_forecasting.data import NaNLabelEncoder,timeseries from pytorch_forecasting.metrics import MQF2DistributionLoss from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler from lightning.pytorch.tuner import Tuner import MetaTrader5 as mt import warnings import json from torch.utils.data import DataLoader from torch.utils.data.sampler import Sampler,SequentialSampler class New_TmSrDt(TimeSeriesDataSet): ''' rewrite dataset class ''' def to_dataloader(self, train: bool = True, batch_size: int = 64, batch_sampler: Sampler | str = None, shuffle:bool=False, drop_last:bool=False, **kwargs) -> DataLoader: default_kwargs = dict( shuffle=shuffle, # drop_last=train and len(self) > batch_size, drop_last=drop_last, # collate_fn=self._collate_fn, batch_size=batch_size, batch_sampler=batch_sampler, ) default_kwargs.update(kwargs) kwargs = default_kwargs # print(kwargs['drop_last']) if kwargs["batch_sampler"] is not None: sampler = kwargs["batch_sampler"] if isinstance(sampler, str): if sampler == "synchronized": kwargs["batch_sampler"] = TimeSynchronizedBatchSampler( SequentialSampler(self), batch_size=kwargs["batch_size"], shuffle=kwargs["shuffle"], drop_last=kwargs["drop_last"], ) else: raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler") del kwargs["batch_size"] del kwargs["shuffle"] del kwargs["drop_last"] return DataLoader(self,**kwargs) def get_data(mt_data_len:int): if not mt.initialize(): print('initialize() failed!') else: print(mt.version()) sb=mt.symbols_total() rts=None if sb > 0: rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) mt.shutdown() # print(len(rts)) rts_fm=pd.DataFrame(rts) rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length) return rts_fm def spilt_data(data:pd.DataFrame, t_drop_last:bool, t_shuffle:bool, v_drop_last:bool, v_shuffle:bool): training_cutoff = data["time_idx"].max() - max_prediction_length #max:95 context_length = max_encoder_length prediction_length = max_prediction_length training = New_TmSrDt( data[lambda x: x.time_idx <= training_cutoff], time_idx="time_idx", target="close", categorical_encoders={"series":NaNLabelEncoder().fit(data.series)}, group_ids=["series"], time_varying_unknown_reals=["close"], max_encoder_length=context_length, # min_encoder_length=max_encoder_length//2, max_prediction_length=prediction_length, # min_prediction_length=1, ) validation = New_TmSrDt.from_dataset(training, data, min_prediction_idx=training_cutoff + 1) train_dataloader = training.to_dataloader(train=True, shuffle=t_shuffle, drop_last=t_drop_last, batch_size=batch_size, num_workers=0,) val_dataloader = validation.to_dataloader(train=False, shuffle=v_shuffle, drop_last=v_drop_last, batch_size=batch_size, num_workers=0) return train_dataloader,val_dataloader,training def get_learning_rate(): pl.seed_everything(42) trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False) net = NHiTS.from_dataset( training, learning_rate=3e-2, weight_decay=1e-2, loss=MQF2DistributionLoss(prediction_length=max_prediction_length), backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", ) res = Tuner(trainer).lr_find( net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1 ) # print(f"suggested learning rate: {res.suggestion()}") lr_=res.suggestion() return lr_ def train(): early_stop_callback = EarlyStopping(monitor="val_loss", min_delta=1e-4, patience=10, verbose=True, mode="min") ck_callback=ModelCheckpoint(monitor='val_loss', mode="min", save_top_k=1, filename='{epoch}-{val_loss:.2f}') trainer = pl.Trainer( max_epochs=ep, accelerator="cpu", enable_model_summary=True, gradient_clip_val=1.0, callbacks=[early_stop_callback,ck_callback], limit_train_batches=30, enable_checkpointing=True, ) net = NHiTS.from_dataset( training, learning_rate=lr, log_interval=10, log_val_interval=1, weight_decay=1e-2, backcast_loss_ratio=0.0, hidden_size=64, optimizer="AdamW", loss=MQF2DistributionLoss(prediction_length=max_prediction_length), ) trainer.fit( net, train_dataloaders=t_loader, val_dataloaders=v_loader, # ckpt_path='best' ) return trainer if __name__=='__main__': ep=200 __train=False mt_data_len=200000 max_encoder_length = 2*96 max_prediction_length = 30 batch_size = 128 info_file='results.json' warnings.filterwarnings("ignore") dt=get_data(mt_data_len=mt_data_len) if __train: # print(dt) # dt=get_data(mt_data_len=mt_data_len) t_loader,v_loader,training=spilt_data(dt, t_shuffle=False,t_drop_last=True, v_shuffle=False,v_drop_last=True) lr=get_learning_rate() trainer__=train() m_c_back=trainer__.checkpoint_callback m_l_back=trainer__.early_stopping_callback best_m_p=m_c_back.best_model_path best_m_l=m_l_back.best_score.item() # print(best_m_p) if os.path.exists(info_file): with open(info_file,'r+') as f1: last=json.load(fp=f1) last_best_model=last['last_best_model'] last_best_score=last['last_best_score'] if last_best_score > best_m_l: last['last_best_model']=best_m_p last['last_best_score']=best_m_l json.dump(last,fp=f1) else: with open(info_file,'w') as f2: json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2) best_model = NHiTS.load_from_checkpoint(best_m_p) predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True) raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False)) for idx in range(10): # plot 10 examples best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True) samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0] # plot prediction fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True) ax = fig.get_axes()[0] # plot first two sampled paths ax.plot(samples[:, 0], color="g", label="Sample 1") ax.plot(samples[:, 1], color="r", label="Sample 2") fig.legend() plt.show() else: with open(info_file) as f: best_m_p=json.load(fp=f)['last_best_model'] print('model path is:',best_m_p) best_model = NHiTS.load_from_checkpoint(best_m_p) offset=1 dt=dt.iloc[-max_encoder_length-offset:-offset,:] last_=dt.iloc[-1] # print(len(dt)) for i in range(1,max_prediction_length+1): dt.loc[dt.index[-1]+1]=last_ dt['series']=0 # dt['time_idx']=dt.apply(lambda x:x.index,args=1) dt['time_idx']=dt.index-dt.index[0] # dt=get_data(mt_data_len=max_encoder_length) predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True) best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False) plt.show()