そうそう、YouTubeで、N.E.A.Tメソッドでフラッピーバードや他のゲームをプレイするAIを訓練するビデオを見たんだ。データを正規化するのに2週間、モデルを作成するのに2日かかりましたが、トレーニングにかかった時間はたったの1時間、1300世代、1世代あたり20ゲノム、私の5年前の中古ノートパソコンが火を噴きました。データが適切に正規化されていなかったため、私はまだモデルのコードを理解していません。しかし、学習し、モデルが予測するのを見るのは楽しかったです。
import os import csv import math import random import json from datetime import datetime # ----------------------------------------- # ♪ UTILSアクティベーションとシンプルなフォワードパス # ----------------------------------------- def tanh(x): return math.tanh(x) def forward_pass(genome, inputs): """ genome = { 'hidden_weights': 2D list, 'hidden_bias': 1D list, 'output_weights': list, 'output_bias': float, 'fitness': float } inputs: list of 5 floats Returns: single float in (-1, 1) """ hidden_activations = [] for h in range(len(genome['hidden_bias'])): z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs))) z += genome['hidden_bias'][h] hidden_activations.append(tanh(z)) z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations))) z_out += genome['output_bias'] return tanh(z_out) def interpret_output(output): """ Convert the Tanh output to discrete values: if output >= 0.5 => 1 (Buy) if output <= -0.5 => -1 (Sell) otherwise => 0 (Hold) """ if output >= 0.5: return 1 elif output <= -0.5: return -1 else: return 0 # ------------------------------------------------------ # 遺伝的アルゴリズム:初期化、選択、突然変異 # ------------------------------------------------------ def create_random_genome(input_size=5, hidden_size=8): return { 'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)] for _ in range(hidden_size)], 'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_bias': random.uniform(-1, 1), 'fitness': 0.0 } def mutate(genome, mutation_rate=0.1, mutation_strength=0.5): for h in range(len(genome['hidden_weights'])): for i in range(len(genome['hidden_weights'][h])): if random.random() < mutation_rate: genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['hidden_bias'])): if random.random() < mutation_rate: genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['output_weights'])): if random.random() < mutation_rate: genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength) if random.random() < mutation_rate: genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength) def crossover(genome1, genome2): child = create_random_genome() # 上書きされる for h in range(len(genome1['hidden_weights'])): for i in range(len(genome1['hidden_weights'][h])): child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i] if random.random() < 0.5 else genome2['hidden_weights'][h][i]) for h in range(len(genome1['hidden_bias'])): child['hidden_bias'][h] = (genome1['hidden_bias'][h] if random.random() < 0.5 else genome2['hidden_bias'][h]) for h in range(len(genome1['output_weights'])): child['output_weights'][h] = (genome1['output_weights'][h] if random.random() < 0.5 else genome2['output_weights'][h]) child['output_bias'] = (genome1['output_bias'] if random.random() < 0.5 else genome2['output_bias']) child['fitness'] = 0.0 return child def evolve_population(population, keep_best=5): population.sort(key=lambda g: g['fitness'], reverse=True) new_population = population[:keep_best] # 最高のゲノムを維持する while len(new_population) < len(population): parent1 = random.choice(new_population) parent2 = random.choice(new_population) child = crossover(parent1, parent2) mutate(child) new_population.append(child) return new_population # ------------------------------------------- # 再トレーニングを避けるために、最適なモデルを保存してロードする。 # ------------------------------------------- def save_best_model(genome, filename="best_model.json"): data = { 'hidden_weights': genome['hidden_weights'], 'hidden_bias': genome['hidden_bias'], 'output_weights': genome['output_weights'], 'output_bias': genome['output_bias'], 'fitness': genome['fitness'] } with open(filename, 'w') as f: json.dump(data, f, indent=2) def load_best_model(filename="best_model.json"): if not os.path.exists(filename): return None with open(filename, 'r') as f: data = json.load(f) return data # ---------------- # メイン・トレーニング・ロジック # ---------------- def train_neural_network_data(folder_path): # ハイパーパラメータ POP_SIZE = 20 KEEP_BEST = 5 input_size = 5 hidden_size = 8 # 初期母集団を作成するか、利用可能であれば最良のモデルをロードする。 population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)] best_saved = load_best_model() if best_saved is not None: population[0]['hidden_weights'] = best_saved['hidden_weights'] population[0]['hidden_bias'] = best_saved['hidden_bias'] population[0]['output_weights'] = best_saved['output_weights'] population[0]['output_bias'] = best_saved['output_bias'] population[0]['fitness'] = 0.0 # ISO8601 日付/時刻文字列をタイムゾーン情報付きでパースするヘルパー def get_date_from_time_str(t_str): dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z") return dt.date() csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')]) # 各ゲノムについて、最後のシグナルと対応する価格(もしあれば)を保存する genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)] current_date = None generation_count = 0 # 各CSVファイルを一度に1つずつ処理する(行ごとにストリーミングする) for csv_file in csv_files: file_path = os.path.join(folder_path, csv_file) with open(file_path, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: row_time_str = row['Time'] row_date = get_date_from_time_str(row_time_str) # 新しい日が始まったかどうかをチェックする(世代末) if current_date is None: current_date = row_date elif row_date != current_date: # モニタリングのための統計情報を表示 generation_count += 1 fitnesses = [g['fitness'] for g in population] avg_fitness = sum(fitnesses) / len(fitnesses) best_fitness = max(fitnesses) print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}") # 新世代のために母集団を進化させる population = evolve_population(population, keep_best=KEEP_BEST) # 新しい世代のためにゲノムの状態をリセットする for state in genome_states: state['last_signal'] = None state['last_price'] = None current_date = row_date # その場で入力を準備する inputs = [ float(row['NTP']), float(row['NCP']), float(row['NT']), float(row['NIP']), float(row['N14IP']) ] bid_price = float(row['BidOpen']) ask_price = float(row['AskOpen']) # この行に対する各ゲノムの決定を処理する for i, genome in enumerate(population): raw_output = forward_pass(genome, inputs) signal_val = interpret_output(raw_output) # 1、0、または1 prev_signal = genome_states[i]['last_signal'] prev_price = genome_states[i]['last_price'] # シグナルに変化がなければ処理をスキップする if signal_val == prev_signal: continue genome_states[i]['last_signal'] = signal_val # 信号遷移にフィットネス・ロジックを適用する if signal_val == 1: # 買いシグナル genome_states[i]['last_price'] = ask_price if prev_signal == -1 and prev_price is not None: if ask_price < prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 elif signal_val == -1: # 売りシグナル genome_states[i]['last_price'] = bid_price if prev_signal == 1 and prev_price is not None: if bid_price > prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 else: # ホールド信号 genome_states[i]['last_price'] = None # すべてのCSVファイルを処理した後、トレーニングを確定する。 population.sort(key=lambda g: g['fitness'], reverse=True) best_genome = population[0] print("Final Best genome fitness after all files:", best_genome['fitness']) save_best_model(best_genome, filename="best_model.json") if __name__ == "__main__": folder_path = r"" train_neural_network_data(folder_path)MT5用
import MetaTrader5 as mt5 import pandas as pd import numpy as np import json import time def tournament(values): """Tournament method to find maximum and minimum in a list.""" max_val = values[0] min_val = values[0] for v in values[1:]: if v > max_val: max_val = v if v < min_val: min_val = v return max_val, min_val # ------------------------------- # JSONから最適なモデルをロードする with open("best_model.json", "r") as f: best_model = json.load(f) def forward_pass(inputs, model): """ Perform a forward pass through a feedforward network with one hidden layer. Uses tanh for both hidden and output layers. The output is discretized: >0.5 --> 1, < -0.5 --> -1, else 0. """ x = np.array(inputs) #シェイプ (5,) hidden_weights = np.array(model["hidden_weights"]) # shape (8, 5) hidden_bias = np.array(model["hidden_bias"]) # 形状 (8,) output_weights = np.array(model["output_weights"]) # 形状 (8,) output_bias = model["output_bias"] # スカラー hidden_input = np.dot(hidden_weights, x) + hidden_bias hidden_output = np.tanh(hidden_input) output_val = np.dot(output_weights, hidden_output) + output_bias output_activation = np.tanh(output_val) if output_activation > 0.5: return 1 elif output_activation < -0.5: return -1 else: return 0 # ------------------------------- # ポジションを使ったオーダー管理機能 def get_open_position(symbol): """ Returns information about an open position for the symbol as a dictionary: {"ticket": ticket, "type": "buy"/"sell", "volume": volume}. If no position is open, returns None. """ positions = mt5.positions_get(symbol=symbol) if positions is None or len(positions) == 0: return None pos = positions[0] # 1シンボルにつきオープンポジションは1つだけとする if pos.type == mt5.POSITION_TYPE_BUY: return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol} elif pos.type == mt5.POSITION_TYPE_SELL: return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol} return None def close_position(position_info): """ Closes the given position using its ticket. """ symbol = position_info["symbol"] tick = mt5.symbol_info_tick(symbol) # 買い(BUY)が開いていれば、買値で売り、売り(SELL)が開いていれば、売値で買う。 if position_info["type"] == "buy": price = tick.bid order_type = mt5.ORDER_TYPE_SELL else: price = tick.ask order_type = mt5.ORDER_TYPE_BUY request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": position_info["volume"], "type": order_type, "position": position_info["ticket"], "price": price, "deviation": 10, "magic": 123456, "comment": "Close position", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print("Close position result:", result) def place_order(symbol, order_type, volume=0.01): """ Place an order for the symbol. - order_type "buy": place BUY order. - order_type "sell": place SELL order. Before placing, if an open position exists with the opposite signal, it is closed using its order ticket. """ # 現在のオープンポジションを確認する: current_position = get_open_position(symbol) if current_position: # 既存のポジションが新しいシグナルと反対であれば、それを閉じる。 if (order_type == "buy" and current_position["type"] == "sell") or \ (order_type == "sell" and current_position["type"] == "buy"): print(f"Opposite position ({current_position['type']}) detected. Closing it first.") close_position(current_position) # 同じタイプなら何もしない。 elif (order_type == current_position["type"]): print(f"{order_type.upper()} order already open. No new order will be placed.") return tick = mt5.symbol_info_tick(symbol) if order_type == "buy": price = tick.ask order_type_mt5 = mt5.ORDER_TYPE_BUY elif order_type == "sell": price = tick.bid order_type_mt5 = mt5.ORDER_TYPE_SELL else: print("Invalid order type:", order_type) return request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type_mt5, "price": price, "deviation": 10, "magic": 123456, "comment": "NEAT AI trade", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print(f"Placed {order_type.upper()} order result:", result) # ------------------------------- # MetaTrader 5への接続を初期化する。 if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() symbol = "XAUUSDm" timeframe = mt5.TIMEFRAME_M1 # 1分枠 if not mt5.symbol_select(symbol, True): print("Failed to select symbol:", symbol) mt5.shutdown() quit() # 連続モニタリング・ループ try: while True: print("\n" + "="*80) print("Retrieving candle data...") num_candles = 15 rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles) if rates is None: print("Failed to get rates for", symbol) else: df = pd.DataFrame(rates) df['tick_count'] = df['tick_volume'] df['9ma'] = df['close'].rolling(window=9).mean() df['14ma'] = df['close'].rolling(window=14).mean() df['TP'] = (df['high'] + df['low'] + df['close']) / 3 df['NTP'] = (df['TP'] - df['low']) / (df['high'] - df['low']) df['NIP9'] = (df['9ma'] - df['low']) / (df['high'] - df['low']) df['NIP14'] = (df['14ma'] - df['low']) / (df['high'] - df['low']) close_prices = list(df['close'].tail(14)) HC, LC = tournament(close_prices) tick_counts = list(df['tick_count'].tail(14)) HT, LT = tournament(tick_counts) df['NCP'] = (df['close'] - LC) / (HC - LC) df['NT'] = (df['tick_count'] - LT) / (HT - LT) df = df.round(3) # 直近のローソク足の正規化データを表示する display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14'] print("\nNormalized Candle Data (Last Candle):") print(df[display_cols].tail(1).to_string(index=False)) # 最後のローソクから入力を抽出する last_row = df.iloc[-1] input_vector = [ last_row['NTP'], last_row['NCP'], last_row['NT'], last_row['NIP9'], last_row['NIP14'] ] print("\nExtracted Inputs:") print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}") print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close']) # ネットワーク信号を計算する signal = forward_pass(input_vector, best_model) print("\nNetwork Signal (discretized):", signal) # 注文管理ロジックを使ってシグナルに基づいて取引を実行する if signal == 1: print("Received BUY signal.") place_order(symbol, "buy", volume=0.01) elif signal == -1: print("Received SELL signal.") place_order(symbol, "sell", volume=0.01) else: print("Signal is neutral. No action taken.") # 次の境界まで待つ tick = mt5.symbol_info_tick(symbol) if tick is None: print("Failed to get tick info for", symbol) break server_time = tick.time next_minute = ((server_time // 60) + 1) * 60 sleep_seconds = next_minute - server_time print(f"Sleeping for {sleep_seconds} seconds until next candle...") time.sleep(sleep_seconds) except KeyboardInterrupt: print("Stopping continuous monitoring.") finally: mt5.shutdown()
ニューラルネットワークは本当に興味深く、楽しいテーマだ。とはいえ、まずはリプレイ/シミュレーターを完成させることにしたので、説明はお休みしている。しかし、シミュレーターに関する記事の投稿が終わり次第、またニューラルネットワークに関する新しい記事をお届けします。その目的は常に、ニューラルネットワークがフードの下でどのように機能するかを示すことだ。ほとんどの人はニューラルネットワークを魔法のコードだと思っているが、それは間違いだ。しかし、それでも興味深く面白いテーマであることに変わりはない。私は、ニューラルネットワークが監視や事前のデータなしにどのように学習するのかを誰もがわかるように、何かをモデリングしようとさえ考えている。これは非常に興味深く、ある物事を理解するのに役立つだろう。詳細:私のコードはすべてMQL5で書かれます。あなたはプログラマーではないとおっしゃいました。MQL5を学び、あなた自身のソリューションを実装し始めるのはどうですか?私はあなたのような人を対象にした一連の記事を書いている。最新のものはこちらhttps://www.mql5.com/ja/articles/15833。 このシリーズでは、非常に基本的なことから説明しています。ですから、もしあなたがプログラミングについて全く知らないのであれば、シリーズの最初の記事に戻ってください。以前の記事へのリンクは常に記事の最初にあります。
- www.mql5.com
- 無料取引アプリ
- 8千を超えるシグナルをコピー
- 金融ニュースで金融マーケットを探索
新しい記事「ニューラルネットワークの実践:最初のニューロン」はパブリッシュされました:
これまでの記事をよく振り返ってみると、傾きを表す定数aと、切片を表す定数bの最適な値を求めるために、私たちが特定の数学的な工夫をしてきたことに気づくはずです。これらの調整によって、最も適切な線形方程式を導くことができました。具体的には、微分計算による方法と、行列演算による方法の2通りを検討しました。
しかし、ここから先は、それらの計算手法はもはや有効ではありません。なぜなら、線形方程式の定数を求めるために、新たなアプローチが必要になるからです。前回の記事では、傾きを示す定数の求め方について紹介しました。もしそのコードを実際に試して楽しんでいただけたなら嬉しいです。今回はそれより少しだけ複雑な内容に取り組みますが、その分、多くの可能性が開かれることになります。実際のところ、この記事はこのニューラルネットワーク連載の中で最も興味深い内容になるかもしれません。なぜなら、この記事で紹介する考え方を理解すれば、今後のすべての内容がよりシンプルで、実用的なものになるからです。
作者: Daniel Jose