Diskussion zum Artikel "Neuronales Netz in der Praxis: Das erste Neuron"

 

Neuer Artikel Neuronales Netz in der Praxis: Das erste Neuron :

In diesem Artikel beginnen wir damit, etwas Einfaches und Bescheidenes zu bauen: ein Neuron. Wir werden es mit einer sehr kleinen Menge an MQL5-Code programmieren. Das Neuron hat in meinen Tests hervorragend funktioniert. Gehen wir in dieser Artikelserie über neuronale Netze ein wenig zurück, um zu verstehen, wovon ich spreche.

Wenn Sie die vorangegangenen Artikel aufmerksam lesen, werden Sie feststellen, dass wir bestimmte mathematische Manöver anwenden mussten, um die bestmöglichen Werte sowohl für die Konstante <a>, die die Steigung darstellt, als auch für die Konstante <b>, die den Achsenabschnitt bezeichnet, zu ermitteln. Diese Anpassungen ermöglichten es uns, die am besten geeignete lineare Gleichung zu finden. Wir haben zwei Methoden untersucht, um dies zu erreichen: eine durch Ableitungsberechnungen und die andere durch Matrixberechnungen.

Von diesem Zeitpunkt an sind solche Berechnungen jedoch nicht mehr sinnvoll. Das liegt daran, dass wir eine alternative Methode zur Bestimmung der Konstanten in unserer linearen Gleichung entwickeln müssen. Im vorherigen Artikel habe ich gezeigt, wie man die Konstante der Steigung findet. Ich hoffe, es hat Ihnen Spaß gemacht, mit diesem Code zu experimentieren, denn jetzt werden wir uns an eine etwas komplexere Aufgabe machen. Dieser nächste Schritt ist zwar nur eine kleine Herausforderung, eröffnet aber zahlreiche Möglichkeiten. In der Tat könnte dies der interessanteste Artikel in unserer Serie über neuronale Netze sein, da alles, was folgt, wesentlich einfacher und praktischer wird.


Autor: Daniel Jose

 

ja, richtig, ich habe gerade ein Video auf YouTube gesehen, trainieren eine KI, um Flappy Bird und andere Spiele mit N.E.A.T-Methode zu spielen, bekam ich eine Idee, eine KI für den Handel mit N.E.A.T zu trainieren, ich habe gerade die Grundlagen des neuronalen Netzes gelernt, und erstellt ein Modell mit Hilfe von Chat GPT, weil ich nicht wissen, zu programmieren. Ich habe zwei Wochen gebraucht, um die Daten zu normalisieren und zwei Tage, um das Modell zu erstellen, aber es hat nur eine Stunde gedauert, um zu trainieren, 1300 Generationen, 20 Genome pro Generation, mein 5 Jahre alter Laptop aus zweiter Hand stand in Flammen, und als ich das Modell mit MT5 verbunden habe, war das Modell so aggressiv und war so genau bei der Vorhersage der nächsten Kerzen, aber nicht profitabel. weil die Daten nicht richtig normalisiert wurden, und ich verstehe den Code des Modells immer noch nicht. Aber es hat Spaß gemacht, zu lernen und zu sehen, wie das Modell Vorhersagen trifft, und deshalb bin ich hierher gekommen, um mehr über KI zu lernen, und das ist der Code von NEAT AI


import os
import csv
import math
import random
import json
from datetime import datetime

# -----------------------------------------
# UTILS: Aktivierung und einfacher Vorwärtspass
# -----------------------------------------
def tanh(x):
    return math.tanh(x)

def forward_pass(genome, inputs):
    """
    genome = {
       'hidden_weights': 2D list,
       'hidden_bias': 1D list,
       'output_weights': list,
       'output_bias': float,
       'fitness': float
    }
    inputs: list of 5 floats
    Returns: single float in (-1, 1)
    """
    hidden_activations = []
    for h in range(len(genome['hidden_bias'])):
        z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs)))
        z += genome['hidden_bias'][h]
        hidden_activations.append(tanh(z))
    
    z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations)))
    z_out += genome['output_bias']
    return tanh(z_out)

def interpret_output(output):
    """
    Convert the Tanh output to discrete values:
      if output >= 0.5  =>  1  (Buy)
      if output <= -0.5 => -1  (Sell)
      otherwise         =>  0  (Hold)
    """
    if output >= 0.5:
        return 1
    elif output <= -0.5:
        return -1
    else:
        return 0

# ------------------------------------------------------
# GENETISCHER ALGORITHMUS: Initialisierung, Auswahl, Mutation
# ------------------------------------------------------
def create_random_genome(input_size=5, hidden_size=8):
    return {
        'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)]
                           for _ in range(hidden_size)],
        'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_bias': random.uniform(-1, 1),
        'fitness': 0.0
    }

def mutate(genome, mutation_rate=0.1, mutation_strength=0.5):
    for h in range(len(genome['hidden_weights'])):
        for i in range(len(genome['hidden_weights'][h])):
            if random.random() < mutation_rate:
                genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['hidden_bias'])):
        if random.random() < mutation_rate:
            genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['output_weights'])):
        if random.random() < mutation_rate:
            genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength)
    if random.random() < mutation_rate:
        genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength)

def crossover(genome1, genome2):
    child = create_random_genome()  # zufällige Init, wird überschrieben
    for h in range(len(genome1['hidden_weights'])):
        for i in range(len(genome1['hidden_weights'][h])):
            child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i]
                                             if random.random() < 0.5
                                             else genome2['hidden_weights'][h][i])
    for h in range(len(genome1['hidden_bias'])):
        child['hidden_bias'][h] = (genome1['hidden_bias'][h]
                                   if random.random() < 0.5
                                   else genome2['hidden_bias'][h])
    for h in range(len(genome1['output_weights'])):
        child['output_weights'][h] = (genome1['output_weights'][h]
                                      if random.random() < 0.5
                                      else genome2['output_weights'][h])
    child['output_bias'] = (genome1['output_bias']
                            if random.random() < 0.5
                            else genome2['output_bias'])
    child['fitness'] = 0.0
    return child

def evolve_population(population, keep_best=5):
    population.sort(key=lambda g: g['fitness'], reverse=True)
    new_population = population[:keep_best]  # Behalte die besten Genome
    while len(new_population) < len(population):
        parent1 = random.choice(new_population)
        parent2 = random.choice(new_population)
        child = crossover(parent1, parent2)
        mutate(child)
        new_population.append(child)
    return new_population

# -------------------------------------------
# SAVE & LOAD bestes Modell, um erneutes Training zu vermeiden
# -------------------------------------------
def save_best_model(genome, filename="best_model.json"):
    data = {
        'hidden_weights': genome['hidden_weights'],
        'hidden_bias': genome['hidden_bias'],
        'output_weights': genome['output_weights'],
        'output_bias': genome['output_bias'],
        'fitness': genome['fitness']
    }
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_best_model(filename="best_model.json"):
    if not os.path.exists(filename):
        return None
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

# ----------------
# DIE HAUPTLOGIK DER AUSBILDUNG
# ----------------
def train_neural_network_data(folder_path):
    # Hyperparameter
    POP_SIZE = 20
    KEEP_BEST = 5
    input_size = 5
    hidden_size = 8

    # Erstellen Sie eine Ausgangspopulation oder laden Sie das beste Modell, falls verfügbar.
    population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)]
    best_saved = load_best_model()
    if best_saved is not None:
        population[0]['hidden_weights'] = best_saved['hidden_weights']
        population[0]['hidden_bias']    = best_saved['hidden_bias']
        population[0]['output_weights'] = best_saved['output_weights']
        population[0]['output_bias']    = best_saved['output_bias']
        population[0]['fitness']        = 0.0

    # Hilfsprogramm zum Parsen von ISO8601-Datums-/Zeitstrings mit Zeitzoneninformationen
    def get_date_from_time_str(t_str):
        dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z")
        return dt.date()

    csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')])

    # Für jedes Genom das letzte Signal und den entsprechenden Preis (falls vorhanden) speichern
    genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)]
    current_date = None
    generation_count = 0

    # Jede CSV-Datei einzeln verarbeiten (Streaming Zeile für Zeile)
    for csv_file in csv_files:
        file_path = os.path.join(folder_path, csv_file)
        with open(file_path, 'r', newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                row_time_str = row['Time']
                row_date = get_date_from_time_str(row_time_str)

                # Prüfen, ob ein neuer Tag begonnen hat (Ende der Erzeugung)
                if current_date is None:
                    current_date = row_date
                elif row_date != current_date:
                    # Generierung beendet; Statistik zur Überwachung ausgeben
                    generation_count += 1
                    fitnesses = [g['fitness'] for g in population]
                    avg_fitness = sum(fitnesses) / len(fitnesses)
                    best_fitness = max(fitnesses)
                    print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}")

                    # Evolve the population for the new generation
                    population = evolve_population(population, keep_best=KEEP_BEST)

                    # Genomzustände für neue Generation zurücksetzen
                    for state in genome_states:
                        state['last_signal'] = None
                        state['last_price'] = None

                    current_date = row_date

                # Eingaben on-the-fly vorbereiten
                inputs = [
                    float(row['NTP']),
                    float(row['NCP']),
                    float(row['NT']),
                    float(row['NIP']),
                    float(row['N14IP'])
                ]
                bid_price = float(row['BidOpen'])
                ask_price = float(row['AskOpen'])

                # Die Entscheidung jedes Genoms für diese Zeile verarbeiten
                for i, genome in enumerate(population):
                    raw_output = forward_pass(genome, inputs)
                    signal_val = interpret_output(raw_output)  -1, 0, oder 1
                    prev_signal = genome_states[i]['last_signal']
                    prev_price = genome_states[i]['last_price']

                    # Verarbeitung überspringen, wenn sich das Signal nicht geändert hat
                    if signal_val == prev_signal:
                        continue

                    genome_states[i]['last_signal'] = signal_val

                    # Fitness-Logik auf Signalübergänge anwenden
                    if signal_val == 1:  # Kaufsignal
                        genome_states[i]['last_price'] = ask_price
                        if prev_signal == -1 and prev_price is not None:
                            if ask_price < prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    elif signal_val == -1:  # Verkaufssignal
                        genome_states[i]['last_price'] = bid_price
                        if prev_signal == 1 and prev_price is not None:
                            if bid_price > prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    else:  # Haltesignal
                        genome_states[i]['last_price'] = None

    # Nach der Verarbeitung aller CSV-Dateien wird das Training abgeschlossen.
    population.sort(key=lambda g: g['fitness'], reverse=True)
    best_genome = population[0]
    print("Final Best genome fitness after all files:", best_genome['fitness'])
    save_best_model(best_genome, filename="best_model.json")

if __name__ == "__main__":
    folder_path = r""
    train_neural_network_data(folder_path)
und für MT5
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import json
import time

def tournament(values):
    """Tournament method to find maximum and minimum in a list."""
    max_val = values[0]
    min_val = values[0]
    for v in values[1:]:
        if v > max_val:
            max_val = v
        if v < min_val:
            min_val = v
    return max_val, min_val

# -------------------------------
# Laden Sie das beste Modell aus JSON
with open("best_model.json", "r") as f:
    best_model = json.load(f)

def forward_pass(inputs, model):
    """
    Perform a forward pass through a feedforward network with one hidden layer.
    Uses tanh for both hidden and output layers.
    The output is discretized:
       >0.5 --> 1, < -0.5 --> -1, else 0.
    """
    x = np.array(inputs)  # Form (5,)
    hidden_weights = np.array(model["hidden_weights"])  # Form (8, 5)
    hidden_bias = np.array(model["hidden_bias"])          # Form (8,)
    output_weights = np.array(model["output_weights"])    # Form (8,)
    output_bias = model["output_bias"]                    # skalar

    hidden_input = np.dot(hidden_weights, x) + hidden_bias
    hidden_output = np.tanh(hidden_input)
    output_val = np.dot(output_weights, hidden_output) + output_bias
    output_activation = np.tanh(output_val)
    
    if output_activation > 0.5:
        return 1
    elif output_activation < -0.5:
        return -1
    else:
        return 0

# -------------------------------
# Funktionen der Auftragsverwaltung über Positionen
def get_open_position(symbol):
    """
    Returns information about an open position for the symbol as a dictionary:
    {"ticket": ticket, "type": "buy"/"sell", "volume": volume}.
    If no position is open, returns None.
    """
    positions = mt5.positions_get(symbol=symbol)
    if positions is None or len(positions) == 0:
        return None
    pos = positions[0]  # nur eine offene Position pro Symbol annehmen
    if pos.type == mt5.POSITION_TYPE_BUY:
        return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol}
    elif pos.type == mt5.POSITION_TYPE_SELL:
        return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol}
    return None

def close_position(position_info):
    """
    Closes the given position using its ticket.
    """
    symbol = position_info["symbol"]
    tick = mt5.symbol_info_tick(symbol)
    # Für den Abschluss: Wenn BUY offen ist, verkaufen wir zum Bid; wenn SELL offen ist, kaufen wir zum Ask.
    if position_info["type"] == "buy":
        price = tick.bid
        order_type = mt5.ORDER_TYPE_SELL
    else:
        price = tick.ask
        order_type = mt5.ORDER_TYPE_BUY

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": position_info["volume"],
        "type": order_type,
        "position": position_info["ticket"],
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "Close position",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print("Close position result:", result)

def place_order(symbol, order_type, volume=0.01):
    """
    Place an order for the symbol.
    - order_type "buy": place BUY order.
    - order_type "sell": place SELL order.
    Before placing, if an open position exists with the opposite signal,
    it is closed using its order ticket.
    """
    # Aktuelle offene Position prüfen:
    current_position = get_open_position(symbol)
    if current_position:
        # Wenn die bestehende Position dem neuen Signal entgegengesetzt ist, schließen Sie sie.
        if (order_type == "buy" and current_position["type"] == "sell") or \
           (order_type == "sell" and current_position["type"] == "buy"):
            print(f"Opposite position ({current_position['type']}) detected. Closing it first.")
            close_position(current_position)
        # Wenn gleicher Typ, nichts tun.
        elif (order_type == current_position["type"]):
            print(f"{order_type.upper()} order already open. No new order will be placed.")
            return

    tick = mt5.symbol_info_tick(symbol)
    if order_type == "buy":
        price = tick.ask
        order_type_mt5 = mt5.ORDER_TYPE_BUY
    elif order_type == "sell":
        price = tick.bid
        order_type_mt5 = mt5.ORDER_TYPE_SELL
    else:
        print("Invalid order type:", order_type)
        return

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": volume,
        "type": order_type_mt5,
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "NEAT AI trade",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print(f"Placed {order_type.upper()} order result:", result)

# -------------------------------
# Initialisierung der Verbindung zu MetaTrader 5
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

symbol = "XAUUSDm"
timeframe = mt5.TIMEFRAME_M1  # 1-minütiger Zeitrahmen

if not mt5.symbol_select(symbol, True):
    print("Failed to select symbol:", symbol)
    mt5.shutdown()
    quit()

# Kontinuierliche Überwachungsschleife
try:
    while True:
        print("\n" + "="*80)
        print("Retrieving candle data...")
        num_candles = 15
        rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles)
        if rates is None:
            print("Failed to get rates for", symbol)
        else:
            df = pd.DataFrame(rates)
            df['tick_count'] = df['tick_volume']
            df['9ma'] = df['close'].rolling(window=9).mean()
            df['14ma'] = df['close'].rolling(window=14).mean()
            df['TP'] = (df['high'] + df['low'] + df['close']) / 3
            df['NTP']   = (df['TP']    - df['low']) / (df['high'] - df['low'])
            df['NIP9']  = (df['9ma']   - df['low']) / (df['high'] - df['low'])
            df['NIP14'] = (df['14ma']  - df['low']) / (df['high'] - df['low'])
            close_prices = list(df['close'].tail(14))
            HC, LC = tournament(close_prices)
            tick_counts = list(df['tick_count'].tail(14))
            HT, LT = tournament(tick_counts)
            df['NCP'] = (df['close'] - LC) / (HC - LC)
            df['NT']  = (df['tick_count'] - LT) / (HT - LT)
            df = df.round(3)
            
            # Druckt die normalisierten Daten der letzten Kerze
            display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14']
            print("\nNormalized Candle Data (Last Candle):")
            print(df[display_cols].tail(1).to_string(index=False))
            
            # Eingaben von der letzten Kerze extrahieren
            last_row = df.iloc[-1]
            input_vector = [
                last_row['NTP'],
                last_row['NCP'],
                last_row['NT'],
                last_row['NIP9'],
                last_row['NIP14']
            ]
            
            print("\nExtracted Inputs:")
            print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}")
            print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close'])
            
            # Berechnung des Netzwerksignals
            signal = forward_pass(input_vector, best_model)
            print("\nNetwork Signal (discretized):", signal)
            
            # Handel auf der Grundlage des Signals mit unserer Auftragsverwaltungslogik ausführen
            if signal == 1:
                print("Received BUY signal.")
                place_order(symbol, "buy", volume=0.01)
            elif signal == -1:
                print("Received SELL signal.")
                place_order(symbol, "sell", volume=0.01)
            else:
                print("Signal is neutral. No action taken.")
        
        # Warte bis zur nächsten Minute Grenze
        tick = mt5.symbol_info_tick(symbol)
        if tick is None:
            print("Failed to get tick info for", symbol)
            break
        server_time = tick.time
        next_minute = ((server_time // 60) + 1) * 60
        sleep_seconds = next_minute - server_time
        print(f"Sleeping for {sleep_seconds} seconds until next candle...")
        time.sleep(sleep_seconds)
except KeyboardInterrupt:
    print("Stopping continuous monitoring.")
finally:
    mt5.shutdown()
 
Sehr gut.
 
Multi Dead neuronalen Netzen gelernt und ein Modell mit Hilfe von GPT-Chat erstellt, weil ich nicht programmieren kann. Ich brauchte zwei Wochen, um die Daten zu normalisieren, und zwei Tage, um das Modell zu erstellen, aber ich brauchte nur eine Stunde, um es zu trainieren, 1.300 Generationen, 20 Genome pro Generation, mein fünf Jahre alter, gebrauchter Laptop stand in Flammen, und als ich das Modell an MT5 anschloss, war das Modell sehr aggressiv und sehr genau bei der Vorhersage der nächsten Kerzen, aber es war nicht profitabel. Aber es hat Spaß gemacht, zu lernen und die Vorhersage des Modells zu sehen, weshalb ich hierher gekommen bin, um mehr über KI zu lernen, und das ist der NEAT-KI-Code


und für MT5

Neuronale Netze sind ein wirklich interessantes und unterhaltsames Thema. Ich habe allerdings eine Pause eingelegt, um es zu erklären, da ich mich entschlossen habe, zuerst das Replay / den Simulator fertigzustellen. Sobald ich jedoch die Artikel über den Simulator fertiggestellt habe, werden wir mit neuen Artikeln über neuronale Netze zurückkehren. Das Ziel ist immer zu zeigen, wie sie unter der Haube funktionieren. Die meisten Leute denken, sie seien magische Codes, was nicht stimmt. Aber sie sind trotzdem ein interessantes und unterhaltsames Thema. Ich denke sogar darüber nach, etwas zu modellieren, damit jeder sehen kann, wie ein neuronales Netz ohne Überwachung oder vorherige Daten lernt. Das ist sehr interessant und kann helfen, bestimmte Dinge zu verstehen. Detail: Mein gesamter Code wird in MQL5 geschrieben sein. Und da Sie sagten, Sie seien kein Programmierer. Wie wäre es, wenn Sie MQL5 lernen und anfangen, Ihre eigenen Lösungen zu implementieren? Ich schreibe eine Reihe von Artikeln, die sich an Leute wie Sie richten. Der neueste Artikel ist hier zu sehen: https: //www.mql5.com/pt/articles/15833. In dieser Serie erkläre ich die Dinge von den absoluten Grundlagen her. Wenn Sie also absolut keine Ahnung vom Programmieren haben, gehen Sie zum ersten Artikel der Serie zurück. Die Links zu den vorherigen Artikeln stehen immer am Anfang des Artikels.

Do básico ao intermediário: Indicador (IV)
Do básico ao intermediário: Indicador (IV)
  • www.mql5.com
Neste artigo, vermos como é fácil de criar e implementar uma metodologia operacional, visando colorir candles. Sendo este um conceito, que diversos operadores apreciam imensamente. Porém, é preciso se tomar cuidado ao implementar tal tipo de coisa. Isto para que as barras, ou candles, mantenham a sua aparência original. Visando assim não prejudicar a leitura que muitos operadores fazem candle a candle.
 
Vielen Dank für diese Einführung!