yes, correct, I just saw a video on YouTube, train an AI to play flappy bird and other games with N.E.A.T method , I got an idea to train an AI for trading with N.E.A.T, I just learned the basics of neural network, and created an model with the help of chat GPT, because I don't know to code. it took me two weeks to normalize the data and two days to create the model, but it took only one hour to train, 1300 generation, 20 genomes per generation, my 5 years old second hand laptop was on fire, and when I connected the model with MT5, the model was so aggressive and was so accurate in predicting the next candles, but not profitable. because the data was not normalized properly, and still I don't understand the models code. but it was fun to learn and to see the model predicting, and that's why I came here to learn more about AI, and this is the code of NEAT AI
import os import csv import math import random import json from datetime import datetime # ----------------------------------------- # UTILS: Activation and simple forward pass # ----------------------------------------- def tanh(x): return math.tanh(x) def forward_pass(genome, inputs): """ genome = { 'hidden_weights': 2D list, 'hidden_bias': 1D list, 'output_weights': list, 'output_bias': float, 'fitness': float } inputs: list of 5 floats Returns: single float in (-1, 1) """ hidden_activations = [] for h in range(len(genome['hidden_bias'])): z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs))) z += genome['hidden_bias'][h] hidden_activations.append(tanh(z)) z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations))) z_out += genome['output_bias'] return tanh(z_out) def interpret_output(output): """ Convert the Tanh output to discrete values: if output >= 0.5 => 1 (Buy) if output <= -0.5 => -1 (Sell) otherwise => 0 (Hold) """ if output >= 0.5: return 1 elif output <= -0.5: return -1 else: return 0 # ------------------------------------------------------ # GENETIC ALGORITHM: Initialization, selection, mutation # ------------------------------------------------------ def create_random_genome(input_size=5, hidden_size=8): return { 'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)] for _ in range(hidden_size)], 'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_bias': random.uniform(-1, 1), 'fitness': 0.0 } def mutate(genome, mutation_rate=0.1, mutation_strength=0.5): for h in range(len(genome['hidden_weights'])): for i in range(len(genome['hidden_weights'][h])): if random.random() < mutation_rate: genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['hidden_bias'])): if random.random() < mutation_rate: genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['output_weights'])): if random.random() < mutation_rate: genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength) if random.random() < mutation_rate: genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength) def crossover(genome1, genome2): child = create_random_genome() # random init, will be overwritten for h in range(len(genome1['hidden_weights'])): for i in range(len(genome1['hidden_weights'][h])): child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i] if random.random() < 0.5 else genome2['hidden_weights'][h][i]) for h in range(len(genome1['hidden_bias'])): child['hidden_bias'][h] = (genome1['hidden_bias'][h] if random.random() < 0.5 else genome2['hidden_bias'][h]) for h in range(len(genome1['output_weights'])): child['output_weights'][h] = (genome1['output_weights'][h] if random.random() < 0.5 else genome2['output_weights'][h]) child['output_bias'] = (genome1['output_bias'] if random.random() < 0.5 else genome2['output_bias']) child['fitness'] = 0.0 return child def evolve_population(population, keep_best=5): population.sort(key=lambda g: g['fitness'], reverse=True) new_population = population[:keep_best] # Keep the best genomes while len(new_population) < len(population): parent1 = random.choice(new_population) parent2 = random.choice(new_population) child = crossover(parent1, parent2) mutate(child) new_population.append(child) return new_population # ------------------------------------------- # SAVE & LOAD best model to avoid re-training # ------------------------------------------- def save_best_model(genome, filename="best_model.json"): data = { 'hidden_weights': genome['hidden_weights'], 'hidden_bias': genome['hidden_bias'], 'output_weights': genome['output_weights'], 'output_bias': genome['output_bias'], 'fitness': genome['fitness'] } with open(filename, 'w') as f: json.dump(data, f, indent=2) def load_best_model(filename="best_model.json"): if not os.path.exists(filename): return None with open(filename, 'r') as f: data = json.load(f) return data # ---------------- # MAIN TRAINING LOGIC # ---------------- def train_neural_network_data(folder_path): # Hyperparameters POP_SIZE = 20 KEEP_BEST = 5 input_size = 5 hidden_size = 8 # Create initial population or load the best model if available population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)] best_saved = load_best_model() if best_saved is not None: population[0]['hidden_weights'] = best_saved['hidden_weights'] population[0]['hidden_bias'] = best_saved['hidden_bias'] population[0]['output_weights'] = best_saved['output_weights'] population[0]['output_bias'] = best_saved['output_bias'] population[0]['fitness'] = 0.0 # Helper to parse ISO8601 date/time strings with timezone info def get_date_from_time_str(t_str): dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z") return dt.date() csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')]) # For each genome, store the last signal and corresponding price (if any) genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)] current_date = None generation_count = 0 # Process each CSV file one at a time (streaming row by row) for csv_file in csv_files: file_path = os.path.join(folder_path, csv_file) with open(file_path, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: row_time_str = row['Time'] row_date = get_date_from_time_str(row_time_str) # Check if a new day has started (end of generation) if current_date is None: current_date = row_date elif row_date != current_date: # Generation ended; print statistics for monitoring generation_count += 1 fitnesses = [g['fitness'] for g in population] avg_fitness = sum(fitnesses) / len(fitnesses) best_fitness = max(fitnesses) print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2f} | Best Fitness: {best_fitness:.2f}") # Evolve the population for the new generation population = evolve_population(population, keep_best=KEEP_BEST) # Reset genome states for new generation for state in genome_states: state['last_signal'] = None state['last_price'] = None current_date = row_date # Prepare inputs on-the-fly inputs = [ float(row['NTP']), float(row['NCP']), float(row['NT']), float(row['NIP']), float(row['N14IP']) ] bid_price = float(row['BidOpen']) ask_price = float(row['AskOpen']) # Process each genome's decision for this row for i, genome in enumerate(population): raw_output = forward_pass(genome, inputs) signal_val = interpret_output(raw_output) # -1, 0, or 1 prev_signal = genome_states[i]['last_signal'] prev_price = genome_states[i]['last_price'] # Skip processing if signal hasn't changed if signal_val == prev_signal: continue genome_states[i]['last_signal'] = signal_val # Apply fitness logic on signal transitions if signal_val == 1: # Buy signal genome_states[i]['last_price'] = ask_price if prev_signal == -1 and prev_price is not None: if ask_price < prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 elif signal_val == -1: # Sell signal genome_states[i]['last_price'] = bid_price if prev_signal == 1 and prev_price is not None: if bid_price > prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 else: # Hold signal genome_states[i]['last_price'] = None # After processing all CSV files, finalize training population.sort(key=lambda g: g['fitness'], reverse=True) best_genome = population[0] print("Final Best genome fitness after all files:", best_genome['fitness']) save_best_model(best_genome, filename="best_model.json") if __name__ == "__main__": folder_path = r"" train_neural_network_data(folder_path)and for MT5
import MetaTrader5 as mt5 import pandas as pd import numpy as np import json import time def tournament(values): """Tournament method to find maximum and minimum in a list.""" max_val = values[0] min_val = values[0] for v in values[1:]: if v > max_val: max_val = v if v < min_val: min_val = v return max_val, min_val # ------------------------------- # Load the best model from JSON with open("best_model.json", "r") as f: best_model = json.load(f) def forward_pass(inputs, model): """ Perform a forward pass through a feedforward network with one hidden layer. Uses tanh for both hidden and output layers. The output is discretized: >0.5 --> 1, < -0.5 --> -1, else 0. """ x = np.array(inputs) # shape (5,) hidden_weights = np.array(model["hidden_weights"]) # shape (8, 5) hidden_bias = np.array(model["hidden_bias"]) # shape (8,) output_weights = np.array(model["output_weights"]) # shape (8,) output_bias = model["output_bias"] # scalar hidden_input = np.dot(hidden_weights, x) + hidden_bias hidden_output = np.tanh(hidden_input) output_val = np.dot(output_weights, hidden_output) + output_bias output_activation = np.tanh(output_val) if output_activation > 0.5: return 1 elif output_activation < -0.5: return -1 else: return 0 # ------------------------------- # Order management functions using positions def get_open_position(symbol): """ Returns information about an open position for the symbol as a dictionary: {"ticket": ticket, "type": "buy"/"sell", "volume": volume}. If no position is open, returns None. """ positions = mt5.positions_get(symbol=symbol) if positions is None or len(positions) == 0: return None pos = positions[0] # assume only one open position per symbol if pos.type == mt5.POSITION_TYPE_BUY: return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol} elif pos.type == mt5.POSITION_TYPE_SELL: return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol} return None def close_position(position_info): """ Closes the given position using its ticket. """ symbol = position_info["symbol"] tick = mt5.symbol_info_tick(symbol) # For closing: if BUY is open, we sell at bid; if SELL is open, we buy at ask. if position_info["type"] == "buy": price = tick.bid order_type = mt5.ORDER_TYPE_SELL else: price = tick.ask order_type = mt5.ORDER_TYPE_BUY request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": position_info["volume"], "type": order_type, "position": position_info["ticket"], "price": price, "deviation": 10, "magic": 123456, "comment": "Close position", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print("Close position result:", result) def place_order(symbol, order_type, volume=0.01): """ Place an order for the symbol. - order_type "buy": place BUY order. - order_type "sell": place SELL order. Before placing, if an open position exists with the opposite signal, it is closed using its order ticket. """ # Check current open position: current_position = get_open_position(symbol) if current_position: # If the existing position is opposite of the new signal, close it. if (order_type == "buy" and current_position["type"] == "sell") or \ (order_type == "sell" and current_position["type"] == "buy"): print(f"Opposite position ({current_position['type']}) detected. Closing it first.") close_position(current_position) # If same type, do nothing. elif (order_type == current_position["type"]): print(f"{order_type.upper()} order already open. No new order will be placed.") return tick = mt5.symbol_info_tick(symbol) if order_type == "buy": price = tick.ask order_type_mt5 = mt5.ORDER_TYPE_BUY elif order_type == "sell": price = tick.bid order_type_mt5 = mt5.ORDER_TYPE_SELL else: print("Invalid order type:", order_type) return request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type_mt5, "price": price, "deviation": 10, "magic": 123456, "comment": "NEAT AI trade", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print(f"Placed {order_type.upper()} order result:", result) # ------------------------------- # Initialize connection to MetaTrader 5 if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() symbol = "XAUUSDm" timeframe = mt5.TIMEFRAME_M1 # 1-minute timeframe if not mt5.symbol_select(symbol, True): print("Failed to select symbol:", symbol) mt5.shutdown() quit() # Continuous monitoring loop try: while True: print("\n" + "="*80) print("Retrieving candle data...") num_candles = 15 rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles) if rates is None: print("Failed to get rates for", symbol) else: df = pd.DataFrame(rates) df['tick_count'] = df['tick_volume'] df['9ma'] = df['close'].rolling(window=9).mean() df['14ma'] = df['close'].rolling(window=14).mean() df['TP'] = (df['high'] + df['low'] + df['close']) / 3 df['NTP'] = (df['TP'] - df['low']) / (df['high'] - df['low']) df['NIP9'] = (df['9ma'] - df['low']) / (df['high'] - df['low']) df['NIP14'] = (df['14ma'] - df['low']) / (df['high'] - df['low']) close_prices = list(df['close'].tail(14)) HC, LC = tournament(close_prices) tick_counts = list(df['tick_count'].tail(14)) HT, LT = tournament(tick_counts) df['NCP'] = (df['close'] - LC) / (HC - LC) df['NT'] = (df['tick_count'] - LT) / (HT - LT) df = df.round(3) # Print last candle's normalized data display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14'] print("\nNormalized Candle Data (Last Candle):") print(df[display_cols].tail(1).to_string(index=False)) # Extract inputs from the last candle last_row = df.iloc[-1] input_vector = [ last_row['NTP'], last_row['NCP'], last_row['NT'], last_row['NIP9'], last_row['NIP14'] ] print("\nExtracted Inputs:") print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}") print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close']) # Compute network signal signal = forward_pass(input_vector, best_model) print("\nNetwork Signal (discretized):", signal) # Execute trade based on signal using our order management logic if signal == 1: print("Received BUY signal.") place_order(symbol, "buy", volume=0.01) elif signal == -1: print("Received SELL signal.") place_order(symbol, "sell", volume=0.01) else: print("Signal is neutral. No action taken.") # Wait until next minute boundary tick = mt5.symbol_info_tick(symbol) if tick is None: print("Failed to get tick info for", symbol) break server_time = tick.time next_minute = ((server_time // 60) + 1) * 60 sleep_seconds = next_minute - server_time print(f"Sleeping for {sleep_seconds} seconds until next candle...") time.sleep(sleep_seconds) except KeyboardInterrupt: print("Stopping continuous monitoring.") finally: mt5.shutdown()
Neural networks are a really interesting and fun subject. I've taken a break from explaining it, though, as I've decided to finalise the replay / simulator first. However, as soon as I've finished posting the articles on the simulator, we'll be back with new articles on neural networks. The aim is always to show how they work under the hood. Most people think they are magic codes, which is not true. But they're still an interesting and entertaining subject. I'm even thinking of modelling something so that everyone can see how a neural network learns without supervision or prior data. Which is very interesting, and can help in understanding certain things. Detail: All my code will be written in MQL5. And since you said you're not a programmer. How about learning MQL5 and starting to implement your own solutions? I'm writing a series of articles aimed at people like you. The latest one can be seen here: https: //www.mql5.com/pt/articles/15833. In this series I explain things from the very basics. So if you know absolutely nothing about programming, go back to the first article in the series. The links to the previous articles will always be at the beginning of the article.

- www.mql5.com

- Free trading apps
- Over 8,000 signals for copying
- Economic news for exploring financial markets
You agree to website policy and terms of use
Check out the new article: Neural Network in Practice: The First Neuron.
In this article, we'll start building something simple and humble: a neuron. We will program it with a very small amount of MQL5 code. The neuron worked great in my tests. Let's go back a bit in this series of articles about neural networks to understand what I'm talking about.
If you carefully review those previous articles, you will notice that we had to employ certain mathematical maneuvers to determine the best possible values for both the constant < a >, which represents the slope, and the constant < b >, which denotes the intercept. These adjustments enabled us to find the most appropriate linear equation. We explored two methods to achieve this: one through derivative calculations and the other through matrix computations.
However, from this point forward, such calculations will no longer be useful. This is because we need to devise an alternative method for determining the constants in our linear equation. In the previous article, I demonstrated how to find the constant representing the slope. I hope you enjoyed experimenting with that code because now we are about to tackle something slightly more complex. However, despite being only a bit more challenging, this next step will unlock numerous possibilities. In fact, this might be the most interesting article in our neural network series, as everything that follows will become significantly simpler and more practical.
Author: Daniel Jose