記事「ニューラルネットワークの実践:最初のニューロン」についてのディスカッション

 

新しい記事「ニューラルネットワークの実践:最初のニューロン」はパブリッシュされました:

この記事では、シンプルで控えめなもの、つまりニューロンの構築を始めます。ごく少量のMQL5コードでプログラムしますが、それでも私のテストではこのニューロンは見事に機能しました。とはいえ、私がここで何を言おうとしているのかを理解するには、これまでのニューラルネットワークに関する連載を少し振り返ってみる必要があります。

これまでの記事をよく振り返ってみると、傾きを表す定数aと、切片を表す定数bの最適な値を求めるために、私たちが特定の数学的な工夫をしてきたことに気づくはずです。これらの調整によって、最も適切な線形方程式を導くことができました。具体的には、微分計算による方法と、行列演算による方法の2通りを検討しました。

しかし、ここから先は、それらの計算手法はもはや有効ではありません。なぜなら、線形方程式の定数を求めるために、新たなアプローチが必要になるからです。前回の記事では、傾きを示す定数の求め方について紹介しました。もしそのコードを実際に試して楽しんでいただけたなら嬉しいです。今回はそれより少しだけ複雑な内容に取り組みますが、その分、多くの可能性が開かれることになります。実際のところ、この記事はこのニューラルネットワーク連載の中で最も興味深い内容になるかもしれません。なぜなら、この記事で紹介する考え方を理解すれば、今後のすべての内容がよりシンプルで、実用的なものになるからです。


作者: Daniel Jose

 

そうそう、YouTubeで、N.E.A.Tメソッドでフラッピーバードや他のゲームをプレイするAIを訓練するビデオを見たんだ。データを正規化するのに2週間、モデルを作成するのに2日かかりましたが、トレーニングにかかった時間はたったの1時間、1300世代、1世代あたり20ゲノム、私の5年前の中古ノートパソコンが火を噴きました。データが適切に正規化されていなかったため、私はまだモデルのコードを理解していません。しかし、学習し、モデルが予測するのを見るのは楽しかったです。


import os
import csv
import math
import random
import json
from datetime import datetime

# -----------------------------------------
# ♪ UTILSアクティベーションとシンプルなフォワードパス
# -----------------------------------------
def tanh(x):
    return math.tanh(x)

def forward_pass(genome, inputs):
    """
    genome = {
       'hidden_weights': 2D list,
       'hidden_bias': 1D list,
       'output_weights': list,
       'output_bias': float,
       'fitness': float
    }
    inputs: list of 5 floats
    Returns: single float in (-1, 1)
    """
    hidden_activations = []
    for h in range(len(genome['hidden_bias'])):
        z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs)))
        z += genome['hidden_bias'][h]
        hidden_activations.append(tanh(z))
    
    z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations)))
    z_out += genome['output_bias']
    return tanh(z_out)

def interpret_output(output):
    """
    Convert the Tanh output to discrete values:
      if output >= 0.5  =>  1  (Buy)
      if output <= -0.5 => -1  (Sell)
      otherwise         =>  0  (Hold)
    """
    if output >= 0.5:
        return 1
    elif output <= -0.5:
        return -1
    else:
        return 0

# ------------------------------------------------------
# 遺伝的アルゴリズム:初期化、選択、突然変異
# ------------------------------------------------------
def create_random_genome(input_size=5, hidden_size=8):
    return {
        'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)]
                           for _ in range(hidden_size)],
        'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_bias': random.uniform(-1, 1),
        'fitness': 0.0
    }

def mutate(genome, mutation_rate=0.1, mutation_strength=0.5):
    for h in range(len(genome['hidden_weights'])):
        for i in range(len(genome['hidden_weights'][h])):
            if random.random() < mutation_rate:
                genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['hidden_bias'])):
        if random.random() < mutation_rate:
            genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['output_weights'])):
        if random.random() < mutation_rate:
            genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength)
    if random.random() < mutation_rate:
        genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength)

def crossover(genome1, genome2):
    child = create_random_genome()  # 上書きされる
    for h in range(len(genome1['hidden_weights'])):
        for i in range(len(genome1['hidden_weights'][h])):
            child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i]
                                             if random.random() < 0.5
                                             else genome2['hidden_weights'][h][i])
    for h in range(len(genome1['hidden_bias'])):
        child['hidden_bias'][h] = (genome1['hidden_bias'][h]
                                   if random.random() < 0.5
                                   else genome2['hidden_bias'][h])
    for h in range(len(genome1['output_weights'])):
        child['output_weights'][h] = (genome1['output_weights'][h]
                                      if random.random() < 0.5
                                      else genome2['output_weights'][h])
    child['output_bias'] = (genome1['output_bias']
                            if random.random() < 0.5
                            else genome2['output_bias'])
    child['fitness'] = 0.0
    return child

def evolve_population(population, keep_best=5):
    population.sort(key=lambda g: g['fitness'], reverse=True)
    new_population = population[:keep_best]  # 最高のゲノムを維持する
    while len(new_population) < len(population):
        parent1 = random.choice(new_population)
        parent2 = random.choice(new_population)
        child = crossover(parent1, parent2)
        mutate(child)
        new_population.append(child)
    return new_population

# -------------------------------------------
# 再トレーニングを避けるために、最適なモデルを保存してロードする。
# -------------------------------------------
def save_best_model(genome, filename="best_model.json"):
    data = {
        'hidden_weights': genome['hidden_weights'],
        'hidden_bias': genome['hidden_bias'],
        'output_weights': genome['output_weights'],
        'output_bias': genome['output_bias'],
        'fitness': genome['fitness']
    }
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_best_model(filename="best_model.json"):
    if not os.path.exists(filename):
        return None
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

# ----------------
# メイン・トレーニング・ロジック
# ----------------
def train_neural_network_data(folder_path):
    # ハイパーパラメータ
    POP_SIZE = 20
    KEEP_BEST = 5
    input_size = 5
    hidden_size = 8

    # 初期母集団を作成するか、利用可能であれば最良のモデルをロードする。
    population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)]
    best_saved = load_best_model()
    if best_saved is not None:
        population[0]['hidden_weights'] = best_saved['hidden_weights']
        population[0]['hidden_bias']    = best_saved['hidden_bias']
        population[0]['output_weights'] = best_saved['output_weights']
        population[0]['output_bias']    = best_saved['output_bias']
        population[0]['fitness']        = 0.0

    # ISO8601 日付/時刻文字列をタイムゾーン情報付きでパースするヘルパー
    def get_date_from_time_str(t_str):
        dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z")
        return dt.date()

    csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')])

    # 各ゲノムについて、最後のシグナルと対応する価格(もしあれば)を保存する
    genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)]
    current_date = None
    generation_count = 0

    # 各CSVファイルを一度に1つずつ処理する(行ごとにストリーミングする)
    for csv_file in csv_files:
        file_path = os.path.join(folder_path, csv_file)
        with open(file_path, 'r', newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                row_time_str = row['Time']
                row_date = get_date_from_time_str(row_time_str)

                # 新しい日が始まったかどうかをチェックする(世代末)
                if current_date is None:
                    current_date = row_date
                elif row_date != current_date:
                    # モニタリングのための統計情報を表示
                    generation_count += 1
                    fitnesses = [g['fitness'] for g in population]
                    avg_fitness = sum(fitnesses) / len(fitnesses)
                    best_fitness = max(fitnesses)
                    print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}")

                    # 新世代のために母集団を進化させる
                    population = evolve_population(population, keep_best=KEEP_BEST)

                    # 新しい世代のためにゲノムの状態をリセットする
                    for state in genome_states:
                        state['last_signal'] = None
                        state['last_price'] = None

                    current_date = row_date

                # その場で入力を準備する
                inputs = [
                    float(row['NTP']),
                    float(row['NCP']),
                    float(row['NT']),
                    float(row['NIP']),
                    float(row['N14IP'])
                ]
                bid_price = float(row['BidOpen'])
                ask_price = float(row['AskOpen'])

                # この行に対する各ゲノムの決定を処理する
                for i, genome in enumerate(population):
                    raw_output = forward_pass(genome, inputs)
                    signal_val = interpret_output(raw_output)  # 1、0、または1
                    prev_signal = genome_states[i]['last_signal']
                    prev_price = genome_states[i]['last_price']

                    # シグナルに変化がなければ処理をスキップする
                    if signal_val == prev_signal:
                        continue

                    genome_states[i]['last_signal'] = signal_val

                    # 信号遷移にフィットネス・ロジックを適用する
                    if signal_val == 1:  # 買いシグナル
                        genome_states[i]['last_price'] = ask_price
                        if prev_signal == -1 and prev_price is not None:
                            if ask_price < prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    elif signal_val == -1:  # 売りシグナル
                        genome_states[i]['last_price'] = bid_price
                        if prev_signal == 1 and prev_price is not None:
                            if bid_price > prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    else:  # ホールド信号
                        genome_states[i]['last_price'] = None

    # すべてのCSVファイルを処理した後、トレーニングを確定する。
    population.sort(key=lambda g: g['fitness'], reverse=True)
    best_genome = population[0]
    print("Final Best genome fitness after all files:", best_genome['fitness'])
    save_best_model(best_genome, filename="best_model.json")

if __name__ == "__main__":
    folder_path = r""
    train_neural_network_data(folder_path)
MT5用
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import json
import time

def tournament(values):
    """Tournament method to find maximum and minimum in a list."""
    max_val = values[0]
    min_val = values[0]
    for v in values[1:]:
        if v > max_val:
            max_val = v
        if v < min_val:
            min_val = v
    return max_val, min_val

# -------------------------------
# JSONから最適なモデルをロードする
with open("best_model.json", "r") as f:
    best_model = json.load(f)

def forward_pass(inputs, model):
    """
    Perform a forward pass through a feedforward network with one hidden layer.
    Uses tanh for both hidden and output layers.
    The output is discretized:
       >0.5 --> 1, < -0.5 --> -1, else 0.
    """
    x = np.array(inputs)  #シェイプ (5,)
    hidden_weights = np.array(model["hidden_weights"])  # shape (8, 5)
    hidden_bias = np.array(model["hidden_bias"])          # 形状 (8,)
    output_weights = np.array(model["output_weights"])    # 形状 (8,)
    output_bias = model["output_bias"]                    # スカラー

    hidden_input = np.dot(hidden_weights, x) + hidden_bias
    hidden_output = np.tanh(hidden_input)
    output_val = np.dot(output_weights, hidden_output) + output_bias
    output_activation = np.tanh(output_val)
    
    if output_activation > 0.5:
        return 1
    elif output_activation < -0.5:
        return -1
    else:
        return 0

# -------------------------------
# ポジションを使ったオーダー管理機能
def get_open_position(symbol):
    """
    Returns information about an open position for the symbol as a dictionary:
    {"ticket": ticket, "type": "buy"/"sell", "volume": volume}.
    If no position is open, returns None.
    """
    positions = mt5.positions_get(symbol=symbol)
    if positions is None or len(positions) == 0:
        return None
    pos = positions[0]  # 1シンボルにつきオープンポジションは1つだけとする
    if pos.type == mt5.POSITION_TYPE_BUY:
        return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol}
    elif pos.type == mt5.POSITION_TYPE_SELL:
        return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol}
    return None

def close_position(position_info):
    """
    Closes the given position using its ticket.
    """
    symbol = position_info["symbol"]
    tick = mt5.symbol_info_tick(symbol)
    # 買い(BUY)が開いていれば、買値で売り、売り(SELL)が開いていれば、売値で買う。
    if position_info["type"] == "buy":
        price = tick.bid
        order_type = mt5.ORDER_TYPE_SELL
    else:
        price = tick.ask
        order_type = mt5.ORDER_TYPE_BUY

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": position_info["volume"],
        "type": order_type,
        "position": position_info["ticket"],
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "Close position",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print("Close position result:", result)

def place_order(symbol, order_type, volume=0.01):
    """
    Place an order for the symbol.
    - order_type "buy": place BUY order.
    - order_type "sell": place SELL order.
    Before placing, if an open position exists with the opposite signal,
    it is closed using its order ticket.
    """
    # 現在のオープンポジションを確認する:
    current_position = get_open_position(symbol)
    if current_position:
        # 既存のポジションが新しいシグナルと反対であれば、それを閉じる。
        if (order_type == "buy" and current_position["type"] == "sell") or \
           (order_type == "sell" and current_position["type"] == "buy"):
            print(f"Opposite position ({current_position['type']}) detected. Closing it first.")
            close_position(current_position)
        # 同じタイプなら何もしない。
        elif (order_type == current_position["type"]):
            print(f"{order_type.upper()} order already open. No new order will be placed.")
            return

    tick = mt5.symbol_info_tick(symbol)
    if order_type == "buy":
        price = tick.ask
        order_type_mt5 = mt5.ORDER_TYPE_BUY
    elif order_type == "sell":
        price = tick.bid
        order_type_mt5 = mt5.ORDER_TYPE_SELL
    else:
        print("Invalid order type:", order_type)
        return

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": volume,
        "type": order_type_mt5,
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "NEAT AI trade",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print(f"Placed {order_type.upper()} order result:", result)

# -------------------------------
# MetaTrader 5への接続を初期化する。
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

symbol = "XAUUSDm"
timeframe = mt5.TIMEFRAME_M1  # 1分枠

if not mt5.symbol_select(symbol, True):
    print("Failed to select symbol:", symbol)
    mt5.shutdown()
    quit()

# 連続モニタリング・ループ
try:
    while True:
        print("\n" + "="*80)
        print("Retrieving candle data...")
        num_candles = 15
        rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles)
        if rates is None:
            print("Failed to get rates for", symbol)
        else:
            df = pd.DataFrame(rates)
            df['tick_count'] = df['tick_volume']
            df['9ma'] = df['close'].rolling(window=9).mean()
            df['14ma'] = df['close'].rolling(window=14).mean()
            df['TP'] = (df['high'] + df['low'] + df['close']) / 3
            df['NTP']   = (df['TP']    - df['low']) / (df['high'] - df['low'])
            df['NIP9']  = (df['9ma']   - df['low']) / (df['high'] - df['low'])
            df['NIP14'] = (df['14ma']  - df['low']) / (df['high'] - df['low'])
            close_prices = list(df['close'].tail(14))
            HC, LC = tournament(close_prices)
            tick_counts = list(df['tick_count'].tail(14))
            HT, LT = tournament(tick_counts)
            df['NCP'] = (df['close'] - LC) / (HC - LC)
            df['NT']  = (df['tick_count'] - LT) / (HT - LT)
            df = df.round(3)
            
            # 直近のローソク足の正規化データを表示する
            display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14']
            print("\nNormalized Candle Data (Last Candle):")
            print(df[display_cols].tail(1).to_string(index=False))
            
            # 最後のローソクから入力を抽出する
            last_row = df.iloc[-1]
            input_vector = [
                last_row['NTP'],
                last_row['NCP'],
                last_row['NT'],
                last_row['NIP9'],
                last_row['NIP14']
            ]
            
            print("\nExtracted Inputs:")
            print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}")
            print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close'])
            
            # ネットワーク信号を計算する
            signal = forward_pass(input_vector, best_model)
            print("\nNetwork Signal (discretized):", signal)
            
            # 注文管理ロジックを使ってシグナルに基づいて取引を実行する
            if signal == 1:
                print("Received BUY signal.")
                place_order(symbol, "buy", volume=0.01)
            elif signal == -1:
                print("Received SELL signal.")
                place_order(symbol, "sell", volume=0.01)
            else:
                print("Signal is neutral. No action taken.")
        
        # 次の境界まで待つ
        tick = mt5.symbol_info_tick(symbol)
        if tick is None:
            print("Failed to get tick info for", symbol)
            break
        server_time = tick.time
        next_minute = ((server_time // 60) + 1) * 60
        sleep_seconds = next_minute - server_time
        print(f"Sleeping for {sleep_seconds} seconds until next candle...")
        time.sleep(sleep_seconds)
except KeyboardInterrupt:
    print("Stopping continuous monitoring.")
finally:
    mt5.shutdown()
 
とても良い。
 
Multi Dead ニューラルネットワークの 基礎を学んで、GPTチャットの助けを借りてモデルを作っただけなんだ。プログラミングのやり方は知らないからね。データを正規化するのに2週間、モデルを作成するのに2日かかりましたが、トレーニングにかかった時間はたったの1時間、1,300世代、1世代あたり20ゲノム、5年前の中古ノートパソコンが火を噴きました。モデルをMT5に接続すると、モデルは非常にアグレッシブで、次のローソク足の予測は非常に正確でしたが、利益は出ませんでした。しかし、モデルの予測を学び、見るのは楽しかった。それが、AIについてもっと学ぶためにここに来た理由であり、これがNEAT AIコードである。


そしてMT5用

ニューラルネットワークは本当に興味深く、楽しいテーマだ。とはいえ、まずはリプレイ/シミュレーターを完成させることにしたので、説明はお休みしている。しかし、シミュレーターに関する記事の投稿が終わり次第、またニューラルネットワークに関する新しい記事をお届けします。その目的は常に、ニューラルネットワークがフードの下でどのように機能するかを示すことだ。ほとんどの人はニューラルネットワークを魔法のコードだと思っているが、それは間違いだ。しかし、それでも興味深く面白いテーマであることに変わりはない。私は、ニューラルネットワークが監視や事前のデータなしにどのように学習するのかを誰もがわかるように、何かをモデリングしようとさえ考えている。これは非常に興味深く、ある物事を理解するのに役立つだろう。詳細:私のコードはすべてMQL5で書かれます。あなたはプログラマーではないとおっしゃいました。MQL5を学び、あなた自身のソリューションを実装し始めるのはどうですか?私はあなたのような人を対象にした一連の記事を書いている。最新のものはこちらhttps://www.mql5.com/ja/articles/15833。 このシリーズでは、非常に基本的なことから説明しています。ですから、もしあなたがプログラミングについて全く知らないのであれば、シリーズの最初の記事に戻ってください。以前の記事へのリンクは常に記事の最初にあります。

Do básico ao intermediário: Indicador (IV)
Do básico ao intermediário: Indicador (IV)
  • www.mql5.com
Neste artigo, vermos como é fácil de criar e implementar uma metodologia operacional, visando colorir candles. Sendo este um conceito, que diversos operadores apreciam imensamente. Porém, é preciso se tomar cuidado ao implementar tal tipo de coisa. Isto para que as barras, ou candles, mantenham a sua aparência original. Visando assim não prejudicar a leitura que muitos operadores fazem candle a candle.
 
紹介してくれてありがとう!