Sí, correcto, acabo de ver un video en YouTube, entrenar a una IA para jugar Flappy Bird y otros juegos con el método N.E.A.T, tengo una idea para entrenar a una IA para el comercio con N.E.A.T, acabo de aprender los fundamentos de la red neuronal, y creó un modelo con la ayuda de chat GPT, porque yo no sé de código. me tomó dos semanas para normalizar los datos y dos días para crear el modelo, pero tomó sólo una hora para entrenar, 1300 generación, 20 genomas por generación, mi portátil de segunda mano de 5 años estaba ardiendo, y cuando conecté el modelo con MT5, el modelo era tan agresivo y era tan preciso en la predicción de las próximas velas, pero no rentable. porque los datos no fueron normalizados correctamente, y todavía no entiendo el código de los modelos. pero fue divertido aprender y ver el modelo de predicción, y es por eso que vine aquí para aprender más acerca de la IA, y este es el código de NEAT AI
import os import csv import math import random import json from datetime import datetime # ----------------------------------------- # UTILS: Activación y pase simple hacia adelante # ----------------------------------------- def tanh(x): return math.tanh(x) def forward_pass(genome, inputs): """ genome = { 'hidden_weights': 2D list, 'hidden_bias': 1D list, 'output_weights': list, 'output_bias': float, 'fitness': float } inputs: list of 5 floats Returns: single float in (-1, 1) """ hidden_activations = [] for h in range(len(genome['hidden_bias'])): z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs))) z += genome['hidden_bias'][h] hidden_activations.append(tanh(z)) z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations))) z_out += genome['output_bias'] return tanh(z_out) def interpret_output(output): """ Convert the Tanh output to discrete values: if output >= 0.5 => 1 (Buy) if output <= -0.5 => -1 (Sell) otherwise => 0 (Hold) """ if output >= 0.5: return 1 elif output <= -0.5: return -1 else: return 0 # ------------------------------------------------------ # ALGORITMO GENÉTICO: Inicialización, selección, mutación # ------------------------------------------------------ def create_random_genome(input_size=5, hidden_size=8): return { 'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)] for _ in range(hidden_size)], 'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_bias': random.uniform(-1, 1), 'fitness': 0.0 } def mutate(genome, mutation_rate=0.1, mutation_strength=0.5): for h in range(len(genome['hidden_weights'])): for i in range(len(genome['hidden_weights'][h])): if random.random() < mutation_rate: genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['hidden_bias'])): if random.random() < mutation_rate: genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['output_weights'])): if random.random() < mutation_rate: genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength) if random.random() < mutation_rate: genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength) def crossover(genome1, genome2): child = create_random_genome() # random init, will be overwritten for h in range(len(genome1['hidden_weights'])): for i in range(len(genome1['hidden_weights'][h])): child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i] if random.random() < 0.5 else genome2['hidden_weights'][h][i]) for h in range(len(genome1['hidden_bias'])): child['hidden_bias'][h] = (genome1['hidden_bias'][h] if random.random() < 0.5 else genome2['hidden_bias'][h]) for h in range(len(genome1['output_weights'])): child['output_weights'][h] = (genome1['output_weights'][h] if random.random() < 0.5 else genome2['output_weights'][h]) child['output_bias'] = (genome1['output_bias'] if random.random() < 0.5 else genome2['output_bias']) child['fitness'] = 0.0 return child def evolve_population(population, keep_best=5): population.sort(key=lambda g: g['fitness'], reverse=True) new_population = population[:keep_best] # Conservar los mejores genomas while len(new_population) < len(population): parent1 = random.choice(new_population) parent2 = random.choice(new_population) child = crossover(parent1, parent2) mutate(child) new_population.append(child) return new_population # ------------------------------------------- # SAVE & LOAD best model to avoid re-training # ------------------------------------------- def save_best_model(genome, filename="best_model.json"): data = { 'hidden_weights': genome['hidden_weights'], 'hidden_bias': genome['hidden_bias'], 'output_weights': genome['output_weights'], 'output_bias': genome['output_bias'], 'fitness': genome['fitness'] } with open(filename, 'w') as f: json.dump(data, f, indent=2) def load_best_model(filename="best_model.json"): if not os.path.exists(filename): return None with open(filename, 'r') as f: data = json.load(f) return data # ---------------- # MAIN TRAINING LOGIC # ---------------- def train_neural_network_data(folder_path): # Hiperparámetros POP_SIZE = 20 KEEP_BEST = 5 input_size = 5 hidden_size = 8 # Crear población inicial o cargar el mejor modelo si se dispone de él. population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)] best_saved = load_best_model() if best_saved is not None: population[0]['hidden_weights'] = best_saved['hidden_weights'] population[0]['hidden_bias'] = best_saved['hidden_bias'] population[0]['output_weights'] = best_saved['output_weights'] population[0]['output_bias'] = best_saved['output_bias'] population[0]['fitness'] = 0.0 # Helper to parse ISO8601 date/time strings with timezone info def get_date_from_time_str(t_str): dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z") return dt.date() csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')]) # Para cada genoma, almacena la última señal y el precio correspondiente (si lo hay) genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)] current_date = None generation_count = 0 # Procesar cada fichero CSV de uno en uno (streaming fila a fila) for csv_file in csv_files: file_path = os.path.join(folder_path, csv_file) with open(file_path, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: row_time_str = row['Time'] row_date = get_date_from_time_str(row_time_str) # Comprueba si ha empezado un nuevo día (fin de generación) if current_date is None: current_date = row_date elif row_date != current_date: # Generación finalizada; imprimir estadísticas para monitorización generation_count += 1 fitnesses = [g['fitness'] for g in population] avg_fitness = sum(fitnesses) / len(fitnesses) best_fitness = max(fitnesses) print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}") # Evoluciona la población para la nueva generación population = evolve_population(population, keep_best=KEEP_BEST) # Restablecer los estados del genoma para la nueva generación for state in genome_states: state['last_signal'] = None state['last_price'] = None current_date = row_date # Preparar entradas sobre la marcha inputs = [ float(row['NTP']), float(row['NCP']), float(row['NT']), float(row['NIP']), float(row['N14IP']) ] bid_price = float(row['BidOpen']) ask_price = float(row['AskOpen']) # Procesa la decisión de cada genoma para esta fila for i, genome in enumerate(population): raw_output = forward_pass(genome, inputs) signal_val = interpret_output(raw_output) # -1, 0 o 1 prev_signal = genome_states[i]['last_signal'] prev_price = genome_states[i]['last_price'] # Omitir procesamiento si la señal no ha cambiado if signal_val == prev_signal: continue genome_states[i]['last_signal'] = signal_val # Aplicar la lógica de fitness en las transiciones de señal if signal_val == 1: # Señal de compra genome_states[i]['last_price'] = ask_price if prev_signal == -1 and prev_price is not None: if ask_price < prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 elif signal_val == -1: # Señal de venta genome_states[i]['last_price'] = bid_price if prev_signal == 1 and prev_price is not None: if bid_price > prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 else: # Señal de retención genome_states[i]['last_price'] = None # Después de procesar todos los archivos CSV, finalizar la formación population.sort(key=lambda g: g['fitness'], reverse=True) best_genome = population[0] print("Final Best genome fitness after all files:", best_genome['fitness']) save_best_model(best_genome, filename="best_model.json") if __name__ == "__main__": folder_path = r"" train_neural_network_data(folder_path)y para MT5
import MetaTrader5 as mt5 import pandas as pd import numpy as np import json import time def tournament(values): """Tournament method to find maximum and minimum in a list.""" max_val = values[0] min_val = values[0] for v in values[1:]: if v > max_val: max_val = v if v < min_val: min_val = v return max_val, min_val # ------------------------------- # Cargar el mejor modelo desde JSON with open("best_model.json", "r") as f: best_model = json.load(f) def forward_pass(inputs, model): """ Perform a forward pass through a feedforward network with one hidden layer. Uses tanh for both hidden and output layers. The output is discretized: >0.5 --> 1, < -0.5 --> -1, else 0. """ x = np.array(inputs) # forma (5,) hidden_weights = np.array(model["hidden_weights"]) # forma (8, 5) hidden_bias = np.array(model["hidden_bias"]) # forma (8,) output_weights = np.array(model["output_weights"]) # forma (8,) output_bias = model["output_bias"] # escalar hidden_input = np.dot(hidden_weights, x) + hidden_bias hidden_output = np.tanh(hidden_input) output_val = np.dot(output_weights, hidden_output) + output_bias output_activation = np.tanh(output_val) if output_activation > 0.5: return 1 elif output_activation < -0.5: return -1 else: return 0 # ------------------------------- # Funciones de gestión de pedidos mediante posiciones def get_open_position(symbol): """ Returns information about an open position for the symbol as a dictionary: {"ticket": ticket, "type": "buy"/"sell", "volume": volume}. If no position is open, returns None. """ positions = mt5.positions_get(symbol=symbol) if positions is None or len(positions) == 0: return None pos = positions[0] # Asumir sólo una posición abierta por símbolo if pos.type == mt5.POSITION_TYPE_BUY: return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol} elif pos.type == mt5.POSITION_TYPE_SELL: return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol} return None def close_position(position_info): """ Closes the given position using its ticket. """ symbol = position_info["symbol"] tick = mt5.symbol_info_tick(symbol) # Para el cierre: si BUY está abierto, vendemos a bid; si SELL está abierto, compramos a ask. if position_info["type"] == "buy": price = tick.bid order_type = mt5.ORDER_TYPE_SELL else: price = tick.ask order_type = mt5.ORDER_TYPE_BUY request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": position_info["volume"], "type": order_type, "position": position_info["ticket"], "price": price, "deviation": 10, "magic": 123456, "comment": "Close position", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print("Close position result:", result) def place_order(symbol, order_type, volume=0.01): """ Place an order for the symbol. - order_type "buy": place BUY order. - order_type "sell": place SELL order. Before placing, if an open position exists with the opposite signal, it is closed using its order ticket. """ # Comprueba la posición abierta actual: current_position = get_open_position(symbol) if current_position: # Si la posición existente es opuesta a la nueva señal, ciérrela. if (order_type == "buy" and current_position["type"] == "sell") or \ (order_type == "sell" and current_position["type"] == "buy"): print(f"Opposite position ({current_position['type']}) detected. Closing it first.") close_position(current_position) # Si es del mismo tipo, no hacer nada. elif (order_type == current_position["type"]): print(f"{order_type.upper()} order already open. No new order will be placed.") return tick = mt5.symbol_info_tick(symbol) if order_type == "buy": price = tick.ask order_type_mt5 = mt5.ORDER_TYPE_BUY elif order_type == "sell": price = tick.bid order_type_mt5 = mt5.ORDER_TYPE_SELL else: print("Invalid order type:", order_type) return request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type_mt5, "price": price, "deviation": 10, "magic": 123456, "comment": "NEAT AI trade", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print(f"Placed {order_type.upper()} order result:", result) # ------------------------------- # Inicializar la conexión con MetaTrader 5 if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() symbol = "XAUUSDm" timeframe = mt5.TIMEFRAME_M1 # 1 minuto if not mt5.symbol_select(symbol, True): print("Failed to select symbol:", symbol) mt5.shutdown() quit() # Bucle de supervisión continua try: while True: print("\n" + "="*80) print("Retrieving candle data...") num_candles = 15 rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles) if rates is None: print("Failed to get rates for", symbol) else: df = pd.DataFrame(rates) df['tick_count'] = df['tick_volume'] df['9ma'] = df['close'].rolling(window=9).mean() df['14ma'] = df['close'].rolling(window=14).mean() df['TP'] = (df['high'] + df['low'] + df['close']) / 3 df['NTP'] = (df['TP'] - df['low']) / (df['high'] - df['low']) df['NIP9'] = (df['9ma'] - df['low']) / (df['high'] - df['low']) df['NIP14'] = (df['14ma'] - df['low']) / (df['high'] - df['low']) close_prices = list(df['close'].tail(14)) HC, LC = tournament(close_prices) tick_counts = list(df['tick_count'].tail(14)) HT, LT = tournament(tick_counts) df['NCP'] = (df['close'] - LC) / (HC - LC) df['NT'] = (df['tick_count'] - LT) / (HT - LT) df = df.round(3) # Imprime los datos normalizados de la última vela display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14'] print("\nNormalized Candle Data (Last Candle):") print(df[display_cols].tail(1).to_string(index=False)) # Extraer entradas de la última vela last_row = df.iloc[-1] input_vector = [ last_row['NTP'], last_row['NCP'], last_row['NT'], last_row['NIP9'], last_row['NIP14'] ] print("\nExtracted Inputs:") print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}") print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close']) # Compute network signal signal = forward_pass(input_vector, best_model) print("\nNetwork Signal (discretized):", signal) # Ejecutar la operación basada en la señal utilizando nuestra lógica de gestión de órdenes. if signal == 1: print("Received BUY signal.") place_order(symbol, "buy", volume=0.01) elif signal == -1: print("Received SELL signal.") place_order(symbol, "sell", volume=0.01) else: print("Signal is neutral. No action taken.") # Wait until next minute boundary tick = mt5.symbol_info_tick(symbol) if tick is None: print("Failed to get tick info for", symbol) break server_time = tick.time next_minute = ((server_time // 60) + 1) * 60 sleep_seconds = next_minute - server_time print(f"Sleeping for {sleep_seconds} seconds until next candle...") time.sleep(sleep_seconds) except KeyboardInterrupt: print("Stopping continuous monitoring.") finally: mt5.shutdown()
Las redes neuronales son un tema realmente interesante y divertido. Sin embargo, me he tomado un descanso de explicarlo, ya que he decidido finalizar primero la repetición / simulador. Sin embargo, en cuanto termine de publicar los artículos sobre el simulador, volveremos con nuevos artículos sobre redes neuronales. El objetivo es siempre mostrar cómo funcionan. La mayoría de la gente piensa que son códigos mágicos, lo cual no es cierto. Pero siguen siendo un tema interesante y entretenido. Incluso estoy pensando en modelar algo para que todo el mundo pueda ver cómo una red neuronal aprende sin supervisión y sin datos previos. Lo cual es muy interesante, y puede ayudar a entender ciertas cosas. Detalle: Todo mi código estará escrito en MQL5. Y ya que has dicho que no eres programador. ¿Qué tal si aprendes MQL5 y empiezas a implementar tus propias soluciones? Estoy escribiendo una serie de artículos dirigidos a gente como tú. El último lo puedes ver aquí: https: //www.mql5.com/pt/articles/15833. En esta serie explico las cosas desde lo más básico. Así que si no sabes absolutamente nada de programación, vuelve al primer artículo de la serie. Los enlaces a los artículos anteriores estarán siempre al principio del artículo.
- www.mql5.com
- Aplicaciones de trading gratuitas
- 8 000+ señales para copiar
- Noticias económicas para analizar los mercados financieros
Usted acepta la política del sitio web y las condiciones de uso
Artículo publicado Red neuronal en la práctica: La primera neurona:
En este artículo, comenzaremos a crear algo que muchos se sorprenden al ver funcionando: una simple y modesta neurona que lograremos programar con muy poco código en MQL5. La neurona funcionó perfectamente en las pruebas que realicé. Bueno, retrocedamos un poco en esta misma serie sobre redes neuronales, para que puedas entender de qué estoy hablando.
Sin embargo, para lo que necesitamos hacer a partir de este momento, tales cálculos no nos serán útiles, ya que necesitamos modelar otra forma de encontrar las constantes de la ecuación de línea recta. En el artículo anterior, mostré cómo podríamos encontrar la constante que representa el coeficiente angular. Espero que te hayas divertido y hayas experimentado bastante con ese código, porque ahora haremos algo un poco más complicado. Sin embargo, aunque es solo un poco más complicado, de hecho abrirá las puertas a muchas otras cosas. Literalmente, este quizás sea el artículo más interesante que verás en esta serie sobre redes neuronales, ya que después de él, todo será mucho más sencillo y práctico.
Autor: Daniel Jose