多層パーセプトロンとバックプロパゲーションアルゴリズム(第II部): Pythonでの実装とMQL5との統合
はじめに
前回の記事では、単純なニューロン(パーセプトロン)の作成について検討しました。勾配降下法、相互接続されたパーセプトロンで構成される多層パーセプトロン(MLP)ネットワークの構築、およびそのようなネットワークの訓練について学びました。
この記事では、Python言語を使用してこのアルゴリズムタイプを実装することがいかに簡単であるかを示したいと思います。
MQLとの統合を開発するために利用できるPythonパッケージが存在し、データの探索、作成、機械学習モデルの使用などのさまざまな機会がもたらされます。
MQL5に組み込まれているPython統合により、単純な線形回帰から深層学習モデルまで、さまざまなソリューションを作成できます。この言語は専門的な使用を目的として設計されているため、難しい計算関連のタスクを実行できるライブラリが多数あります。
ネットワークは手動で作成します。ただし、前の記事で述べたように、これは学習と予測のプロセスで実際に何が起こっているのかを理解するのに役立つステップにすぎません。次に、TensorFlowとKerasを使用したより複雑な例を示します。
TensorFlowについて
TensorFlowは、高速な数値処理のためのオープンソースライブラリです。
Apacheオープンソースライセンスの元でGoogleによって作成、サポート、リリースされており、APIはPython言語用に設計されていますが、基本的なC ++ APIにアクセスできます。
Theanoなどの深層学習で使用するために設計された他の数値ライブラリとは異なり、TensorFlowは研究と本番の両方での使用を目的としています。Googleが使用する機械学習ベースのRankBrain検索エンジンや非常に興味深いDeepDreamコンピュータビジョンプロジェクトが例です。
実行は、1つのCPU、GPU、またはモバイルデバイス上の小規模なシステムだけでなく、数百台のコンピュータを使用する大規模な分散システムでも可能です。
Kerasについて
Kerasは、深層学習モデルを開発および評価するための強力で使いやすいオープンソースのPythonライブラリです。
強力なTheanoおよびTensorFlow計算ライブラリが含まれており、わずか数行のコードでニューラルネットワークモデルの定義と訓練が可能になります。
チュートリアル
このチュートリアルは、次の4つのセクションに分かれています。
- MetaEditorでのPython環境のインストールと準備
- 最初のステップとモデルの再構築(パーセプトロンとMLP)
- KerasとTensorFlowを使用した単純なモデルの作成
- MQL5とPythonを統合する方法
1. Python環境のインストールと準備
まず、公式Webサイトwww.python.org/downloads/からPythonをダウンロードします。
TensorFlowを使用するには、バージョン3.3〜3.8をインストールする必要があります(個人的には3.7を使用しています)。
ダウンロードしてインストールプロセスを開始した後で[Python3.xをPATHに追加する]オプションをオンにしてください。これにより、後で追加の構成を行わなくても、いくつかのことが確実に機能するようになります。
Pythonスクリプトは、MetaTrader 5ターミナルから直接簡単に実行できます。
- Python実行可能パス(環境)を定義する
- 必要なプロジェクトの依存関係をインストールする
MetaEditorを開き、[ツール]>[オプション]に移動します。
ここで、Python実行可能ファイルが配置されているパスを指定します。インストール後はデフォルトのPythonパスがあるはずですが、ない場合は、実行可能ファイルへのフルパスを手動で入力してください。MetaTrader 5ターミナルから直接スクリプトを実行できるようになります。
私は個人的に、仮想環境と呼ばれる完全に独立したライブラリ環境を使用しています。これは、「クリーンな」インストールを取得し、製品に必要なライブラリのみを収集する方法です。
venvパッケージの詳細については、こちらをお読みください。
完了すると、ターミナルから直接Pythonスクリプトを実行できるようになります。このプロジェクトでは、次のライブラリをインストールする必要があります。
ライブラリのインストール方法がわからない場合は、関連するモジュールインストールガイドをご覧ください。
- MetaTrader 5
- TensorFlow
- Matplotlib
- Pandas
- Sklearn
環境をインストールして構成したので、ターミナルで小さなスクリプトを作成して実行する方法を理解するために、小さなテストを実行しましょう。MetaEditorから直接新しいスクリプトを開始するには、以下の手順に従います。
[新規]>[Pythonスクリプト]
スクリプトの名前を指定します。MetaEditorのMQLウィザードは、いくつかのライブラリをインポートするように自動的に促します。これは非常に興味深いので、私たちの実験では、NumPyオプションを選択しましょう。
次に、正弦波グラフを生成する簡単なスクリプトを作成します。
# Copyright 2021, Lethan Corp. # https://www.mql5.com/pt/users/14134597 import numpy as np import matplotlib.pyplot as plt data = np.linspace(-np.pi, np.pi, 201) plt.plot(data, np.sin(data)) plt.xlabel('Angle [rad]') plt.ylabel('sin(data)') plt.axis('tight') plt.show()
スクリプトを実行するには、F7キーを押してコンパイルし、MetaTrader 5ターミナルを開いて、チャート上でスクリプトを実行します。出力がある場合は、結果は[エキスパート]タブに表示されます。この場合、スクリプトは、作成した関数グラフを含むウィンドウを開きます。
2. 最初のステップとモデルの再構築(パーセプトロンとMLP)
便宜上、MQL5の例と同じデータセットを使用します。
以下は、指定された重みのセットを持つ行の出力値を予測するpredict()関数です。ここで最初のケースは偏りでもあります。また、活性化関数があります。
# Transfer neuron activation def activation(activation): return 1.0 if activation >= 0.0 else 0.0 # Make a prediction with weights def predict(row, weights): z = weights[0] for i in range(len(row) - 1): z += weights[i + 1] * row[i] return activation(z)
すでにご存知のように、ネットワークを訓練するには、勾配降下プロセスを実装する必要があります。これについては、前の記事で詳しく説明しました。続きとして、訓練関数「train_weights()」を紹介します。
# Estimate Perceptron weights using stochastic gradient descent def train_weights(train, l_rate, n_epoch): weights = [0.0 for i in range(len(train[0]))] #random.random() for epoch in range(n_epoch): sum_error = 0.0 for row in train: y = predict(row, weights) error = row[-1] - y sum_error += error**2 weights[0] = weights[0] + l_rate * error for i in range(len(row) - 1): weights[i + 1] = weights[i + 1] + l_rate * error * row[i] print('>epoch=%d, lrate=%.3f, error=%.3f' % (epoch, l_rate, sum_error)) return weights
MLPモデルの適用:
このチュートリアルは5つのセクションに分かれています。
- ネットワークの立ち上げ
- フィードフォワード
- バックプロパゲーション
- 訓練
- 予測
ネットワークの立ち上げ
すぐに学習できる新しいネットワークを作成して、簡単なことから始めましょう。
各ニューロンには、維持する必要のある重みのセットがあります。入力接続ごとに1つの重みがあり、偏り用の追加の重みがあります。訓練中にニューロンの追加のプロパティを保存する必要があるため、辞書を使用して各ニューロンを表し、プロパティを名前で保存します。たとえば、重みには「weights」を使用します。
from random import seed from random import random # Initialize a network def initialize_network(n_inputs, n_hidden, n_outputs): network = list() hidden_layer = [{'weights':[random() for i in range(n_inputs + 1)]} for i in range(n_hidden)] network.append(hidden_layer) output_layer = [{'weights':[random() for i in range(n_hidden + 1)]} for i in range(n_outputs)] network.append(output_layer) return network seed(1) network = initialize_network(2, 1, 2) for layer in network: print(layer)
ネットワークを作成して起動する方法がわかったので、それを使用して出力データを計算する方法を見てみましょう。
フィードフォワード
from math import exp # Calculate neuron activation for an input def activate(weights, inputs): activation = weights[-1] for i in range(len(weights)-1): activation += weights[i] * inputs[i] return activation # Transfer neuron activation def transfer(activation): return 1.0 / (1.0 + exp(-activation)) # Forward propagate input to a network output def forward_propagate(network, row): inputs = row for layer in network: new_inputs = [] for neuron in layer: activation = activate(neuron['weights'], inputs) neuron['output'] = transfer(activation) new_inputs.append(neuron['output']) inputs = new_inputs return inputs # test forward propagation network = [[{'weights': [0.13436424411240122, 0.8474337369372327, 0.763774618976614]}], [{'weights': [0.2550690257394217, 0.49543508709194095]}, {'weights': [0.4494910647887381, 0.651592972722763]}]] row = [1, 0, None] output = forward_propagate(network, row) print(output)
上記のスクリプトを実行すると、次の結果が得られます。
[0.6629970129852887、0.7253160725279748]
実際の出力値は今のところばかげていますが、ニューロンの重みをより有用にする方法はすぐにわかります。
バックプロパゲーション
# Calculate the derivative of an neuron output def transfer_derivative(output): return output * (1.0 - output) # Backpropagate error and store in neurons def backward_propagate_error(network, expected): for i in reversed(range(len(network))): layer = network[i] errors = list() if i != len(network)-1: for j in range(len(layer)): error = 0.0 for neuron in network[i + 1]: error += (neuron['weights'][j] * neuron['delta']) errors.append(error) else: for j in range(len(layer)): neuron = layer[j] errors.append(expected[j] - neuron['output']) for j in range(len(layer)): neuron = layer[j] neuron['delta'] = errors[j] * transfer_derivative(neuron['output']) # test backpropagation of error network = [[{'output': 0.7105668883115941, 'weights': [0.13436424411240122, 0.8474337369372327, 0.763774618976614]}], [{'output': 0.6213859615555266, 'weights': [0.2550690257394217, 0.49543508709194095]}, {'output': 0.6573693455986976, 'weights': [0.4494910647887381, 0.651592972722763]}]] expected = [0, 1] backward_propagate_error(network, expected) for layer in network: print(layer)
実行中、この例では、エラーチェックの完了後にネットワークを出力します。ご覧のとおり、エラー値が計算され、出力層と非表示層のニューロンに保存されます。
[{'output': 0.7105668883115941, 'weights': [0.13436424411240122, 0.8474337369372327, 0.763774618976614], 'delta': -0.0005348048046610517}]
[{'output': 0.6213859615555266, 'weights': [0.2550690257394217, 0.49543508709194095], 'delta': -0.14619064683582808}, {'output': 0.6573693455986976, 'weights': [0.4494910647887381, 0.651592972722763], 'delta': 0.0771723774346327}]
ネットワークの訓練
from math import exp from random import seed from random import random # Initialize a network def initialize_network(n_inputs, n_hidden, n_outputs): network = list() hidden_layer = [{'weights':[random() for i in range(n_inputs + 1)]} for i in range(n_hidden)] network.append(hidden_layer) output_layer = [{'weights':[random() for i in range(n_hidden + 1)]} for i in range(n_outputs)] network.append(output_layer) return network # Calculate neuron activation for an input def activate(weights, inputs): activation = weights[-1] for i in range(len(weights)-1): activation += weights[i] * inputs[i] return activation # Transfer neuron activation def transfer(activation): return 1.0 / (1.0 + exp(-activation)) # Forward propagate input to a network output def forward_propagate(network, row): inputs = row for layer in network: new_inputs = [] for neuron in layer: activation = activate(neuron['weights'], inputs) neuron['output'] = transfer(activation) new_inputs.append(neuron['output']) inputs = new_inputs return inputs # Calculate the derivative of an neuron output def transfer_derivative(output): return output * (1.0 - output) # Backpropagate error and store in neurons def backward_propagate_error(network, expected): for i in reversed(range(len(network))): layer = network[i] errors = list() if i != len(network)-1: for j in range(len(layer)): error = 0.0 for neuron in network[i + 1]: error += (neuron['weights'][j] * neuron['delta']) errors.append(error) else: for j in range(len(layer)): neuron = layer[j] errors.append(expected[j] - neuron['output']) for j in range(len(layer)): neuron = layer[j] neuron['delta'] = errors[j] * transfer_derivative(neuron['output']) # Update network weights with error def update_weights(network, row, l_rate): for i in range(len(network)): inputs = row[:-1] if i != 0: inputs = [neuron['output'] for neuron in network[i - 1]] for neuron in network[i]: for j in range(len(inputs)): neuron['weights'][j] += l_rate * neuron['delta'] * inputs[j] neuron['weights'][-1] += l_rate * neuron['delta'] # Train a network for a fixed number of epochs def train_network(network, train, l_rate, n_epoch, n_outputs): for epoch in range(n_epoch): sum_error = 0 for row in train: outputs = forward_propagate(network, row) expected = [0 for i in range(n_outputs)] expected[row[-1]] = 1 sum_error += sum([(expected[i]-outputs[i])**2 for i in range(len(expected))]) backward_propagate_error(network, expected) update_weights(network, row, l_rate) print('>epoch=%d, lrate=%.3f, error=%.3f' % (epoch, l_rate, sum_error)) # Test training backprop algorithm seed(1) dataset = [[2.7810836,2.550537003,0], [1.465489372,2.362125076,0], [3.396561688,4.400293529,0], [1.38807019,1.850220317,0], [3.06407232,3.005305973,0], [7.627531214,2.759262235,1], [5.332441248,2.088626775,1], [6.922596716,1.77106367,1], [8.675418651,-0.242068655,1], [7.673756466,3.508563011,1]] n_inputs = len(dataset[0]) - 1 n_outputs = len(set([row[-1] for row in dataset])) network = initialize_network(n_inputs, 2, n_outputs) train_network(network, dataset, 0.5, 20, n_outputs) for layer in network: print(layer)
訓練後、ネットワークが出力され、学習された重みが示されます。また、ネットワークにはまだ、無視できる出力値とデルタ値があります。必要に応じて、訓練関数を更新してこのデータを削除できます。
>epoch=13, lrate=0.500, error=1.953
>epoch=14, lrate=0.500, error=1.774
>epoch=15, lrate=0.500, error=1.614
>epoch=16, lrate=0.500, error=1.472
>epoch=17, lrate=0.500, error=1.346
>epoch=18, lrate=0.500, error=1.233
>epoch=19, lrate=0.500, error=1.132
[{'weights': [-1.4688375095432327, 1.850887325439514, 1.0858178629550297], 'output': 0.029980305604426185, 'delta': -0.0059546604162323625}, {'weights': [0.37711098142462157, -0.0625909894552989, 0.2765123702642716], 'output': 0.9456229000211323, 'delta': 0.0026279652850863837}]
[{'weights': [2.515394649397849, -0.3391927502445985, -0.9671565426390275], 'output': 0.23648794202357587, 'delta': -0.04270059278364587}, {'weights': [-2.5584149848484263, 1.0036422106209202, 0.42383086467582715], 'output': 0.7790535202438367, 'delta': 0.03803132596437354}]
予測を行うためには、前の例ですでに構成されている重みのセットを使用できます。
予測
from math import exp # Calculate neuron activation for an input def activate(weights, inputs): activation = weights[-1] for i in range(len(weights)-1): activation += weights[i] * inputs[i] return activation # Transfer neuron activation def transfer(activation): return 1.0 / (1.0 + exp(-activation)) # Forward propagate input to a network output def forward_propagate(network, row): inputs = row for layer in network: new_inputs = [] for neuron in layer: activation = activate(neuron['weights'], inputs) neuron['output'] = transfer(activation) new_inputs.append(neuron['output']) inputs = new_inputs return inputs # Make a prediction with a network def predict(network, row): outputs = forward_propagate(network, row) return outputs.index(max(outputs)) # Test making predictions with the network dataset = [[2.7810836,2.550537003,0], [1.465489372,2.362125076,0], [3.396561688,4.400293529,0], [1.38807019,1.850220317,0], [3.06407232,3.005305973,0], [7.627531214,2.759262235,1], [5.332441248,2.088626775,1], [6.922596716,1.77106367,1], [8.675418651,-0.242068655,1], [7.673756466,3.508563011,1]] network = [[{'weights': [-1.482313569067226, 1.8308790073202204, 1.078381922048799]}, {'weights': [0.23244990332399884, 0.3621998343835864, 0.40289821191094327]}], [{'weights': [2.5001872433501404, 0.7887233511355132, -1.1026649757805829]}, {'weights': [-2.429350576245497, 0.8357651039198697, 1.0699217181280656]}]] for row in dataset: prediction = predict(network, row) print('Expected=%d, Got=%d' % (row[-1], prediction))
この例を実行すると、訓練データセット内の各レコードの期待される結果が出力され、その後にネットワークによって明確な予測が行われます。
結果によると、ネットワークはこの小さなデータセットで100%の精度を達成しています。
Expected=0, Got=0
Expected=0, Got=0
Expected=0, Got=0
Expected=0, Got=0
Expected=0, Got=0
Expected=1, Got=1
Expected=1, Got=1
Expected=1, Got=1
Expected=1, Got=1
Expected=1, Got=1
3. KerasとTensorFlowを使用した単純なモデルの作成
データを収集するために、MetaTrader 5パッケージを使用します。価格の抽出、変換、予測に必要なライブラリをインポートして、スクリプトを起動します。ここではデータの準備について詳しくは検討しませんが、これはモデルにとって非常に重要なステップであることを忘れないでください。
簡単なデータの概要から始めましょう。データセットは、最後の1000 EURUSDバーで構成されています。この部分は、いくつかのステップで構成されています。
- ライブラリのインポート
- MetaTraderとの接続
- データの収集
- データの変換、日付の調整
- データのプロット
import MetaTrader5 as mt5 from pandas import to_datetime, DataFrame import matplotlib.pyplot as plt symbol = "EURUSD" if not mt5.initialize(): print("initialize() failed") mt5.shutdown() rates = mt5.copy_rates_from_pos(symbol, mt5.TIMEFRAME_D1, 0, 1000) mt5.shutdown() rates = DataFrame(rates) rates['time'] = to_datetime(rates['time'], unit='s') rates = rates.set_index(['time']) plt.figure(figsize = (15,10)) plt.plot(rates.close) plt.show()
コードを実行した後、終了データを下のグラフの線として視覚化します。
単純回帰アプローチを使用して、次の期間の終値を予測しましょう。
この例では、単変量アプローチを使用します。
単変量時系列は、時間的順序のある単一の一連の観測値で構成されるデータセットです。シーケンス内の次の値を予測するには、一連の過去の観測から学習するモデルが必要です。
最初のステップは、ロードされたシリーズを訓練セットとテストセットに分割することです。このようなシリーズを2つの部分に分割する関数を作成しましょう。分割は、指定された割合(例: 70%訓練、30%テスト)に従って実行されます。検証(バックテスト)については、シリーズを訓練、テスト、検証に分割するなど、他のアプローチもあります。ここでは金融シリーズを扱っているため、過剰適合を避けるように非常に注意する必要があります。
この関数は、numpy配列とクリッピング値を受け取り、2つの分割された系列を返します。
最初の戻り値は、位置0から因子のサイズを表すサイズまでのセット全体であり、2番目の系列は残りのセットです。
def train_test_split(values, fator): train_size = int(len(values) * fator) return values[0:train_size], values[train_size:len(values)]
Kerasモデルは一連の層として定義できます。
sequentialモデルを作成し、ネットワークアーキテクチャに満足するまで、一度に1つずつ層を追加していきます。
まず、入力層に正しい数の入力リソースがあることを確認する必要があります。これは、引数input_dimを使用して最初の層を作成することで実行できます。
必要な層とタイプの数をどのように知ることができるのでしょうか。
これはとても難しい質問です。使用できるヒューリスティックは存在し、最良のネットワーク構造はしばしば試行錯誤の実験を通じて見つかります。一般に、問題の構造を把握するのに十分な大きさのネットワークが必要です。
この例では、全結合の単層ネットワーク構造を使用します。
全結合層はDenseクラスを使用して定義されます。層内のニューロンまたはノードの数を最初の引数として指定し、活性化引数を使用して活性化関数を指定できます。
第1層で正規化線形ユニット(ReLU)活性化関数を使用します。
単変量級数を予測する前に、それを準備する必要があります。
MLPモデルは、過去の観測値のシーケンスを入力として出力観測値にマッピングする関数を使用して学習します。したがって、一連の観測は、モデルが学習できる複数の例に変換する必要があります。
単変量シーケンスについて考えてみます。
[10, 20, 30, 40, 50, 60, 70, 80, 90]
シーケンスは、標本と呼ばれるいくつかのI/Oパターンに分割できます。
この例では、調査対象の予測で入力として使用される3つのタイムステップと出力に使用される1つのタイムステップを使用します。
X, y
10, 20, 30 40
20, 30, 40 50
30, 40, 50 60
...
以下では、この動作を実装するsplit_sequence()関数を作成します。また、単変量セットをいくつかの標本に分割します。それぞれには特定の数のタイムステップがあります。出力は単一のタイムステップです。
上記の例のデータのような小さなデータセットで関数をテストします。
# univariate data preparation from numpy import array # split a univariate sequence into samples def split_sequence(sequence, n_steps): X, y = list(), list() for i in range(len(sequence)): # find the end of this pattern end_ix = i + n_steps # check if we are beyond the sequence if end_ix > len(sequence)-1: break # gather input and output parts of the pattern seq_x, seq_y = sequence[i:end_ix], sequence[end_ix] X.append(seq_x) y.append(seq_y) return array(X), array(y) # define input sequence raw_seq = [10, 20, 30, 40, 50, 60, 70, 80, 90] # choose a number of time steps n_steps = 3 # split into samples X, y = split_sequence(raw_seq, n_steps) # summarize the data for i in range(len(X)): print(X[i], y[i])
このコードでは、単変量セットを6つの標本に分割します。それぞれには3つの入力タイムステップと1つの出力タイムステップがあります。
[10 20 30] 40
[20 30 40] 50
[30 40 50] 60
[40 50 60] 70
[50 60 70] 80
[60 70 80] 90
続行するには、標本をX(特徴)とy(目標)に分割して、ネットワークを訓練できるようにする必要があります。これを行うには、以前に作成したsplit_sequence()関数を使用します。
X_train, y_train = split_sequence(train, 3) X_test, y_test = split_sequence(test, 3)
データの標本を準備したので、MLPネットワークを作成します。
単純なMLPモデルには、ノード(ニューロン)の隠れ層が1つと、予測に使用される出力層が1つだけあります。
単変量時系列を予測するためのMLPは、次のように定義できます。
# define model model = Sequential() model.add(Dense(100, activation='relu', input_dim=n_steps)) model.add(Dense(1)) model.compile(optimizer='adam', loss='mse')
入力データの形式を定義するには、モデルが各標本の入力データとして受け取ると予想されるものを、タイムステップ数の観点から理解する必要があります。
入力としてのタイムステップ数は、split_sequence()関数の引数としてデータセットを準備するときに選択する数です。
各標本の入力次元は、最初の隠れ層の定義のinput_dim引数で指定されます。技術的には、モデルは各タイムステップを個別のタイムステップではなく個別のリソースとして表示します。
通常は複数の標本があるため、モデルでは、入力訓練コンポーネントが寸法または形状を持つことを想定しています。
[標本、特徴]
split_sequence()関数は、すぐに使用できる[標本、特徴]の形式でXを生成します。
モデルは、確率的勾配降下法のための「アダム」と呼ばれる効率的なアルゴリズムを使用して訓練されます。これは、MSE(二乗誤差を意味する)損失関数を使用して最適化されます。
モデルを定義したら、データセットを使用してモデルを訓練します。
model.fit(X_train, y_train, epochs=100, verbose=2)
モデルを訓練した後、将来の価値を予測できます。モデルでは、入力形状が2次元[標本、特徴]であることを想定しているため、予測を行う前に、単一の入力標本の形状を変更する必要があります。たとえば、特徴として使用される1つの標本と3つのタイムステップに対してフォーム[1、3]を使用します。
テスト標本X_testの最後のレコードを選択し、予測した後、最後の標本y_testに含まれている実際の値と比較します。
# demonstrate prediction x_input = X_test[-1] x_input = x_input.reshape((1, n_steps)) yhat = model.predict(x_input, verbose=0) print("Valor previsto: ", yhat) print("Valor real: ", y_test[-1])
4. MQL5とPythonを統合する方法
取引口座でモデルを使用するにはいくつかのオプションがあります。そのうちの1つは、ポジションを開いたり閉じたりするネイティブPython関数を使用することです。ただし、この場合、MQLが提供する幅広い機会を逃します。このため、PythonとMQL環境の統合を選択しました。これにより、ポジション/注文の管理の自律性が高まります。
MaximDmitrievskyによるMetaTrader 5とPYTHON インテグレーション:データの受信と送信稿に基づいて、シングルトンパターンを使用してこのクラスを実装しました。これは通信用のSocketクライアントの作成を担当します。このパターンにより、特定のタイプのオブジェクトのコピーが1つだけになることが保証されます。これは、プログラムが2つのポインターを使用し、両方が同じオブジェクトを参照している場合、ポインターが同じオブジェクトを指すためです。
class CClientSocket { private: static CClientSocket* m_socket; int m_handler_socket; int m_port; string m_host; int m_time_out; CClientSocket(void); ~CClientSocket(void); public: static bool DeleteSocket(void); bool SocketSend(string payload); string SocketReceive(void); bool IsConnected(void); static CClientSocket *Socket(void); bool Config(string host, int port); bool Close(void); };
CClienteSockeクラスは、静的ポインタをprivateメンバーとして格納します。このクラスには、privateで呼び出すことができないコンストラクタが1つだけあります。コンストラクタのコードの代わりに、Socketメソッドを使用して、1つのオブジェクトのみが使用されることを保証できます。
static CClientSocket *CClientSocket::Socket(void) { if(CheckPointer(m_socket)==POINTER_INVALID) m_socket=new CClientSocket(); return m_socket; }
このメソッドは、静的ポインタがCClienteSocketソケットを指しているかどうかを確認します。trueの場合、参照を返します。 それ以外の場合は、新しいオブジェクトが作成されてポインタに関連付けられるため、このオブジェクトがシステム内で排他的になります。
サーバとの接続を確立するには、接続を開始する必要があります。これは、接続を確立するためのIsConnectedメソッドです。その後、データの送受信を開始できます。
bool CClientSocket::IsConnected(void) { ResetLastError(); bool res=true; m_handler_socket=SocketCreate(); if(m_handler_socket==INVALID_HANDLE) res=false; if(!::SocketConnect(m_handler_socket,m_host,m_port,m_time_out)) res=false; return res; }
接続とメッセージの送信が成功したら、この接続を閉じる必要があります。これを行うには、Closeメソッドを使用して、以前に開いた接続を閉じます。
bool CClientSocket::Close(void) { bool res=false; if(SocketClose(m_handler_socket)) { res=true; m_handler_socket=INVALID_HANDLE; } return res; }
次に、新しいMQL接続を受信し、モデルに予測を送信するサーバを登録する必要があります。
import socket class socketserver(object): def __init__(self, address, port): self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.address = address self.port = port self.sock.bind((self.address, self.port)) def socket_receive(self): self.sock.listen(1) self.conn, self.addr = self.sock.accept() self.cummdata = '' while True: data = self.conn.recv(10000) self.cummdata+=data.decode("utf-8") if not data: self.conn.close() break return self.cummdata def socket_send(self, message): self.sock.listen(1) self.conn, self.addr = self.sock.accept() self.conn.send(bytes(message, "utf-8")) def __del__(self): self.conn.close()
オブジェクトは単純です。コンストラクタで、サーバのアドレスとポートを受け取ります。socket_receivedメソッドは、新しい接続を受け入れ、送信されたメッセージの存在を確認する役割を果たします。受信するメッセージがある場合は、メッセージのすべての部分を受信するまでループを実行します。次に、クライアントとの接続を閉じて、ループを終了します。一方、socket_sendメソッドは、クライアントにメッセージを送信する役割を果たします。したがって、この提案されたモデルは、モデルに予測を送信することを可能にするだけでなく、他のいくつかの可能性を広げます。すべてが、あなたの創造性とニーズに依存します。
これで、コミュニケーションの準備が整いました。2つのことを考えてみましょう。
- モデルを訓練して保存する方法
- 訓練済みモデルを使用して予測を行う方法
予測を行うたびにデータを取得して訓練することは非現実的で間違っています。このため、私は常に訓練を行い、最適なハイパーパラメータを見つけて、後で使用できるようにモデルを保存します。
ネットワーク訓練コードを含めるためにmodel_trainという名前のファイルを作成します。終値間の割合の差を使用して、この差を予測しようとします。モデル自体ではなく、MQL環境と統合してモデルを使用する方法のみを示すだけにします。
import MetaTrader5 as mt5 from numpy.lib.financial import rate from pandas import to_datetime, DataFrame from datetime import datetime, timezone from matplotlib import pyplot from sklearn.metrics import mean_squared_error from math import sqrt import numpy as np from tensorflow.keras import Sequential from tensorflow.keras.layers import Dense from tensorflow.keras.callbacks import * symbol = "EURUSD" date_ini = datetime(2020, 1, 1, tzinfo=timezone.utc) date_end = datetime(2021, 7, 1, tzinfo=timezone.utc) period = mt5.TIMEFRAME_D1 def train_test_split(values, fator): train_size = int(len(values) * fator) return np.array(values[0:train_size]), np.array(values[train_size:len(values)]) # split a univariate sequence into samples def split_sequence(sequence, n_steps): X, y = list(), list() for i in range(len(sequence)): end_ix = i + n_steps if end_ix > len(sequence)-1: break seq_x, seq_y = sequence[i:end_ix], sequence[end_ix] X.append(seq_x) y.append(seq_y) return np.array(X), np.array(y) if not mt5.initialize(): print("initialize() failed") mt5.shutdown() raise Exception("Error Getting Data") rates = mt5.copy_rates_range(symbol, period, date_ini, date_end) mt5.shutdown() rates = DataFrame(rates) if rates.empty: raise Exception("Error Getting Data") rates['time'] = to_datetime(rates['time'], unit='s') rates.set_index(['time'], inplace=True) rates = rates.close.pct_change(1) rates = rates.dropna() X, y = train_test_split(rates, 0.70) X = X.reshape(X.shape[0]) y = y.reshape(y.shape[0]) train, test = train_test_split(X, 0.7) n_steps = 60 verbose = 1 epochs = 50 X_train, y_train = split_sequence(train, n_steps) X_test, y_test = split_sequence(test, n_steps) X_val, y_val = split_sequence(y, n_steps) # define model model = Sequential() model.add(Dense(200, activation='relu', input_dim=n_steps)) model.add(Dense(1)) model.compile(optimizer='adam', loss='mse') history = model.fit(X_train ,y_train ,epochs=epochs ,verbose=verbose ,validation_data=(X_test, y_test)) model.save(r'C:\YOUR_PATH\MQL5\Experts\YOUR_PATH\model_train_'+symbol+'.h5') pyplot.title('Loss') pyplot.plot(history.history['loss'], label='train') pyplot.plot(history.history['val_loss'], label='test') pyplot.legend() pyplot.show() history = list() yhat = list() for i in range(0, len(X_val)): pred = X_val[i] pred = pred.reshape((1, n_steps)) history.append(y_val[i]) yhat.append(model.predict(pred).flatten()[0]) pyplot.figure(figsize=(10, 5)) pyplot.plot(history,"*") pyplot.plot(yhat,"+") pyplot.plot(history, label='real') pyplot.plot(yhat, label='prediction') pyplot.ylabel('Price Close', size=10) pyplot.xlabel('time', size=10) pyplot.legend(fontsize=10) pyplot.show() rmse = sqrt(mean_squared_error(history, yhat)) mse = mean_squared_error(history, yhat) print('Test RMSE: %.3f' % rmse) print('Test MSE: %.3f' % mse)
これで、訓練済みのモデルが作成されました。このモデルは、拡張子が.h5のフォルダに保存されます。このモデルを予測に使用できます。次に、接続に使用するこのモデルをインスタンス化するオブジェクトを作成しましょう。
from tensorflow.keras.models import * class Model(object): def __init__(self, n_steps:int, symbol:str, period:int) -> None: super().__init__() self.n_steps = n_steps self.model = load_model(r'C:\YOUR_PATH\MQL5\Experts\YOUR_PATH\model_train_'+symbol+'.h5') def predict(self, data): return(self.model.predict(data.reshape((1, self.n_steps))).flatten()[0])
オブジェクトは単純です。そのコンストラクタは、保存されたモデルを含むmodelという属性のインスタンスを作成します。予測モデルは、予測を行う責任があります。
ここで、クライアントと連携して機能し、関数を受信し、予測を送信するメインメソッドが必要です。
import ast import pandas as pd from model import Model from server_socket import socketserver host = 'localhost' port = 9091 n_steps = 60 TIMEFRAME = 24 | 0x4000 model = Model(n_steps, "EURUSD", TIMEFRAME) if __name__ == "__main__": serv = socketserver(host, port) while True: print("<<--Waiting Prices to Predict-->>") rates = pd.DataFrame(ast.literal_eval(serv.socket_receive())) rates = rates.rates.pct_change(1) rates.dropna(inplace=True) rates = rates.values.reshape((1, n_steps)) serv.socket_send(str(model.predict(rates).flatten()[0]))
MQLクライアント側では、データを収集してサーバに送信し、そこから予測を受信するロボットを作成する必要があります。モデルは閉じたローソク足からのデータで訓練されたため、ローソク足が閉じた後にのみデータが収集および送信されるというチェックを追加する必要があります。開始したばかりの現在のバーを閉じるための違いを予測するための完全なデータが必要です。新しいバーの出現を確認する関数を使用します。
bool NewBar(void) { datetime time[]; if(CopyTime(Symbol(), Period(), 0, 1, time) < 1) return false; if(time[0] == m_last_time) return false; return bool(m_last_time = time[0]); }
m_last_time変数は、グローバルスコープで宣言されていて、バーの開いた日時が格納されているため、「time」変数がm_last_timeと異なるかどうかを確認します。trueの場合、新しいバーが開始されています。この場合、m_last_timeは「time」に置き換える必要があります。
すでにポジションがある場合、EAは新しいポジションを開くべきではありません。したがって、グローバルスコープで宣言された売買変数にtrueまたはfalseを設定するCheckPositionメソッドを使用して、ポジションの存在を確認します。
void CheckPosition(void) { buy = false; sell = false; if(PositionSelect(Symbol())) { if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_BUY&&PositionGetInteger(POSITION_MAGIC) == InpMagicEA) { buy = true; sell = false; } if(PositionGetInteger(POSITION_TYPE)==POSITION_TYPE_SELL&&PositionGetInteger(POSITION_MAGIC) == InpMagicEA) { sell = true; buy = false; } } }
新しいバーが表示されたら、ポジションがあるかどうかを確認します。ポジションがある場合は、閉まるのを待ちます。ポジションがない場合は、CClienteSocketクラスのIsConnectedメソッドを呼び出して接続プロセスを開始します。
if(NewBar()) { if(!Socket.IsConnected()) Print("Error : ", GetLastError(), " Line: ", __LINE__); ... }
trueが返された場合は、サーバとの接続を確立し、データを収集して送り返すことができます。
string payload = "{'rates':["; for(int i=InpSteps; i>=0; i--) { if(i>=1) payload += string(iClose(Symbol(), Period(), i))+","; else payload += string(iClose(Symbol(), Period(), i))+"]}"; }
{'rates':[1,2,3,4]} の形式でデータを送信することにしました。この方法でデータをpandasデータフレームに変換すれば、 データ変換の時間を無駄にする必要はありません。
データが収集されたら、それらを送信して予測を待ち、それに基づいて決定を下すことができます。移動平均を使用して価格の方向を確認します。価格の方向性と予測の価値に応じて、売買します。
void OnTick(void) { .... bool send = Socket.SocketSend(payload); if(send) { if(!Socket.IsConnected()) Print("Error : ", GetLastError(), " Line: ", __LINE__); double yhat = StringToDouble(Socket.SocketReceive()); Print("Value of Prediction: ", yhat); if(CopyBuffer(handle, 0, 0, 4, m_fast_ma)==-1) Print("Error in CopyBuffer"); if(m_fast_ma[1]>m_fast_ma[2]&&m_fast_ma[2]>m_fast_ma[3]) { if((iClose(Symbol(), Period(), 2)>iOpen(Symbol(), Period(), 2)&&iClose(Symbol(), Period(), 1)>iOpen(Symbol(), Period(), 1))&&yhat<0) { m_trade.Sell(mim_vol); } } if(m_fast_ma[1]<m_fast_ma[2]&&m_fast_ma[2]<m_fast_ma[3]) { if((iClose(Symbol(), Period(), 2)<iOpen(Symbol(), Period(), 2)&&iClose(Symbol(), Period(), 1)<iOpen(Symbol(), Period(), 1))&&yhat>0) { m_trade.Buy(mim_vol); } } } Socket.Close(); } }
最後のステップは、以前に確立された接続を閉じて、新しいバーの出現を待つことです。新しいバーが開いたら、データを送信して予測を受信するプロセスを開始します。
このアーキテクチャは非常に便利で、待ち時間が短いことが証明されています。一部の個人プロジェクトでは、このアーキテクチャをトレーディングアカウントで使用しています。これにより、MQLの全機能と、Python for MachineLearningで利用できる豊富なリソースを楽しむことができます。
次の段階
次の記事では、ストラテジーテスターでモデルを使用できるようにする、より柔軟なアーキテクチャを開発したいと思います。
終わりに
これが、さまざまなPythonモデルの使用方法と開発方法、およびそれらをMQL環境と統合する方法に関する役立つ小さなチュートリアルであることを願っています。
この記事では、次のことを行います。
- Python開発環境を設定する
- PythonでパーセプトロンニューロンとMLPネットワークを実装する
- 単純なネットワークを学習するために単変量データを準備する
- PythonとMQL間の通信のアーキテクチャを設定する
さらなるアイデア
このセクションでは、このチュートリアルを拡張するのに役立ついくつかの追加のアイデアを提供します。
- 入力サイズ - モデル入力に使用される日数(たとえば、21日、30日)を調べます。
- モデルの調整 - さまざまな構造とハイパーパラメータを調べて、モデルの平均パフォーマンスを取得します。
- データスケーリング - 標準化や正規化などのデータサイズを使用して、モデルのパフォーマンスを向上させることができるかどうかを確認します。
- 診断の学習 - 学習と検証の喪失については、学習曲線などの診断を使用します。 二乗平均平方根誤差を使用して、モデルの構造とハイパーパラメータを調整します。
これらの拡張の機会をさらに探求する場合は、アイデアを共有してください。
MetaQuotes Ltdによりポルトガル語から翻訳されました。
元の記事: https://www.mql5.com/pt/articles/9514
- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索