Discusión sobre el artículo "Red neuronal en la práctica: La primera neurona"

 

Artículo publicado Red neuronal en la práctica: La primera neurona:

En este artículo, comenzaremos a crear algo que muchos se sorprenden al ver funcionando: una simple y modesta neurona que lograremos programar con muy poco código en MQL5. La neurona funcionó perfectamente en las pruebas que realicé. Bueno, retrocedamos un poco en esta misma serie sobre redes neuronales, para que puedas entender de qué estoy hablando.

Si estudias con calma esos artículos anteriores, notarás que fue necesario realizar ciertos malabarismos matemáticos para definir el mejor valor posible tanto para la constante < a >, que es el coeficiente angular, como para la constante < b >, que es el punto de intersección. Estas maniobras permitían encontrar la ecuación de línea recta más adecuada, mostrándose dos formas de hacerlo: una mediante cálculos de derivadas y otra mediante cálculos matriciales.

Sin embargo, para lo que necesitamos hacer a partir de este momento, tales cálculos no nos serán útiles, ya que necesitamos modelar otra forma de encontrar las constantes de la ecuación de línea recta. En el artículo anterior, mostré cómo podríamos encontrar la constante que representa el coeficiente angular. Espero que te hayas divertido y hayas experimentado bastante con ese código, porque ahora haremos algo un poco más complicado. Sin embargo, aunque es solo un poco más complicado, de hecho abrirá las puertas a muchas otras cosas. Literalmente, este quizás sea el artículo más interesante que verás en esta serie sobre redes neuronales, ya que después de él, todo será mucho más sencillo y práctico.

Autor: Daniel Jose

 

Sí, correcto, acabo de ver un video en YouTube, entrenar a una IA para jugar Flappy Bird y otros juegos con el método N.E.A.T, tengo una idea para entrenar a una IA para el comercio con N.E.A.T, acabo de aprender los fundamentos de la red neuronal, y creó un modelo con la ayuda de chat GPT, porque yo no sé de código. me tomó dos semanas para normalizar los datos y dos días para crear el modelo, pero tomó sólo una hora para entrenar, 1300 generación, 20 genomas por generación, mi portátil de segunda mano de 5 años estaba ardiendo, y cuando conecté el modelo con MT5, el modelo era tan agresivo y era tan preciso en la predicción de las próximas velas, pero no rentable. porque los datos no fueron normalizados correctamente, y todavía no entiendo el código de los modelos. pero fue divertido aprender y ver el modelo de predicción, y es por eso que vine aquí para aprender más acerca de la IA, y este es el código de NEAT AI


import os
import csv
import math
import random
import json
from datetime import datetime

# -----------------------------------------
# UTILS: Activación y pase simple hacia adelante
# -----------------------------------------
def tanh(x):
    return math.tanh(x)

def forward_pass(genome, inputs):
    """
    genome = {
       'hidden_weights': 2D list,
       'hidden_bias': 1D list,
       'output_weights': list,
       'output_bias': float,
       'fitness': float
    }
    inputs: list of 5 floats
    Returns: single float in (-1, 1)
    """
    hidden_activations = []
    for h in range(len(genome['hidden_bias'])):
        z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs)))
        z += genome['hidden_bias'][h]
        hidden_activations.append(tanh(z))
    
    z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations)))
    z_out += genome['output_bias']
    return tanh(z_out)

def interpret_output(output):
    """
    Convert the Tanh output to discrete values:
      if output >= 0.5  =>  1  (Buy)
      if output <= -0.5 => -1  (Sell)
      otherwise         =>  0  (Hold)
    """
    if output >= 0.5:
        return 1
    elif output <= -0.5:
        return -1
    else:
        return 0

# ------------------------------------------------------
# ALGORITMO GENÉTICO: Inicialización, selección, mutación
# ------------------------------------------------------
def create_random_genome(input_size=5, hidden_size=8):
    return {
        'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)]
                           for _ in range(hidden_size)],
        'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_bias': random.uniform(-1, 1),
        'fitness': 0.0
    }

def mutate(genome, mutation_rate=0.1, mutation_strength=0.5):
    for h in range(len(genome['hidden_weights'])):
        for i in range(len(genome['hidden_weights'][h])):
            if random.random() < mutation_rate:
                genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['hidden_bias'])):
        if random.random() < mutation_rate:
            genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['output_weights'])):
        if random.random() < mutation_rate:
            genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength)
    if random.random() < mutation_rate:
        genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength)

def crossover(genome1, genome2):
    child = create_random_genome()  # random init, will be overwritten
    for h in range(len(genome1['hidden_weights'])):
        for i in range(len(genome1['hidden_weights'][h])):
            child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i]
                                             if random.random() < 0.5
                                             else genome2['hidden_weights'][h][i])
    for h in range(len(genome1['hidden_bias'])):
        child['hidden_bias'][h] = (genome1['hidden_bias'][h]
                                   if random.random() < 0.5
                                   else genome2['hidden_bias'][h])
    for h in range(len(genome1['output_weights'])):
        child['output_weights'][h] = (genome1['output_weights'][h]
                                      if random.random() < 0.5
                                      else genome2['output_weights'][h])
    child['output_bias'] = (genome1['output_bias']
                            if random.random() < 0.5
                            else genome2['output_bias'])
    child['fitness'] = 0.0
    return child

def evolve_population(population, keep_best=5):
    population.sort(key=lambda g: g['fitness'], reverse=True)
    new_population = population[:keep_best]  # Conservar los mejores genomas
    while len(new_population) < len(population):
        parent1 = random.choice(new_population)
        parent2 = random.choice(new_population)
        child = crossover(parent1, parent2)
        mutate(child)
        new_population.append(child)
    return new_population

# -------------------------------------------
# SAVE & LOAD best model to avoid re-training
# -------------------------------------------
def save_best_model(genome, filename="best_model.json"):
    data = {
        'hidden_weights': genome['hidden_weights'],
        'hidden_bias': genome['hidden_bias'],
        'output_weights': genome['output_weights'],
        'output_bias': genome['output_bias'],
        'fitness': genome['fitness']
    }
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_best_model(filename="best_model.json"):
    if not os.path.exists(filename):
        return None
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

# ----------------
# MAIN TRAINING LOGIC
# ----------------
def train_neural_network_data(folder_path):
    # Hiperparámetros
    POP_SIZE = 20
    KEEP_BEST = 5
    input_size = 5
    hidden_size = 8

    # Crear población inicial o cargar el mejor modelo si se dispone de él.
    population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)]
    best_saved = load_best_model()
    if best_saved is not None:
        population[0]['hidden_weights'] = best_saved['hidden_weights']
        population[0]['hidden_bias']    = best_saved['hidden_bias']
        population[0]['output_weights'] = best_saved['output_weights']
        population[0]['output_bias']    = best_saved['output_bias']
        population[0]['fitness']        = 0.0

    # Helper to parse ISO8601 date/time strings with timezone info
    def get_date_from_time_str(t_str):
        dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z")
        return dt.date()

    csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')])

    # Para cada genoma, almacena la última señal y el precio correspondiente (si lo hay)
    genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)]
    current_date = None
    generation_count = 0

    # Procesar cada fichero CSV de uno en uno (streaming fila a fila)
    for csv_file in csv_files:
        file_path = os.path.join(folder_path, csv_file)
        with open(file_path, 'r', newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                row_time_str = row['Time']
                row_date = get_date_from_time_str(row_time_str)

                # Comprueba si ha empezado un nuevo día (fin de generación)
                if current_date is None:
                    current_date = row_date
                elif row_date != current_date:
                    # Generación finalizada; imprimir estadísticas para monitorización
                    generation_count += 1
                    fitnesses = [g['fitness'] for g in population]
                    avg_fitness = sum(fitnesses) / len(fitnesses)
                    best_fitness = max(fitnesses)
                    print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}")

                    # Evoluciona la población para la nueva generación
                    population = evolve_population(population, keep_best=KEEP_BEST)

                    # Restablecer los estados del genoma para la nueva generación
                    for state in genome_states:
                        state['last_signal'] = None
                        state['last_price'] = None

                    current_date = row_date

                # Preparar entradas sobre la marcha
                inputs = [
                    float(row['NTP']),
                    float(row['NCP']),
                    float(row['NT']),
                    float(row['NIP']),
                    float(row['N14IP'])
                ]
                bid_price = float(row['BidOpen'])
                ask_price = float(row['AskOpen'])

                # Procesa la decisión de cada genoma para esta fila
                for i, genome in enumerate(population):
                    raw_output = forward_pass(genome, inputs)
                    signal_val = interpret_output(raw_output)  # -1, 0 o 1
                    prev_signal = genome_states[i]['last_signal']
                    prev_price = genome_states[i]['last_price']

                    # Omitir procesamiento si la señal no ha cambiado
                    if signal_val == prev_signal:
                        continue

                    genome_states[i]['last_signal'] = signal_val

                    # Aplicar la lógica de fitness en las transiciones de señal
                    if signal_val == 1:  # Señal de compra
                        genome_states[i]['last_price'] = ask_price
                        if prev_signal == -1 and prev_price is not None:
                            if ask_price < prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    elif signal_val == -1:  # Señal de venta
                        genome_states[i]['last_price'] = bid_price
                        if prev_signal == 1 and prev_price is not None:
                            if bid_price > prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    else:  # Señal de retención
                        genome_states[i]['last_price'] = None

    # Después de procesar todos los archivos CSV, finalizar la formación
    population.sort(key=lambda g: g['fitness'], reverse=True)
    best_genome = population[0]
    print("Final Best genome fitness after all files:", best_genome['fitness'])
    save_best_model(best_genome, filename="best_model.json")

if __name__ == "__main__":
    folder_path = r""
    train_neural_network_data(folder_path)
y para MT5
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import json
import time

def tournament(values):
    """Tournament method to find maximum and minimum in a list."""
    max_val = values[0]
    min_val = values[0]
    for v in values[1:]:
        if v > max_val:
            max_val = v
        if v < min_val:
            min_val = v
    return max_val, min_val

# -------------------------------
# Cargar el mejor modelo desde JSON
with open("best_model.json", "r") as f:
    best_model = json.load(f)

def forward_pass(inputs, model):
    """
    Perform a forward pass through a feedforward network with one hidden layer.
    Uses tanh for both hidden and output layers.
    The output is discretized:
       >0.5 --> 1, < -0.5 --> -1, else 0.
    """
    x = np.array(inputs)  # forma (5,)
    hidden_weights = np.array(model["hidden_weights"])  # forma (8, 5)
    hidden_bias = np.array(model["hidden_bias"])          # forma (8,)
    output_weights = np.array(model["output_weights"])    # forma (8,)
    output_bias = model["output_bias"]                    # escalar

    hidden_input = np.dot(hidden_weights, x) + hidden_bias
    hidden_output = np.tanh(hidden_input)
    output_val = np.dot(output_weights, hidden_output) + output_bias
    output_activation = np.tanh(output_val)
    
    if output_activation > 0.5:
        return 1
    elif output_activation < -0.5:
        return -1
    else:
        return 0

# -------------------------------
# Funciones de gestión de pedidos mediante posiciones
def get_open_position(symbol):
    """
    Returns information about an open position for the symbol as a dictionary:
    {"ticket": ticket, "type": "buy"/"sell", "volume": volume}.
    If no position is open, returns None.
    """
    positions = mt5.positions_get(symbol=symbol)
    if positions is None or len(positions) == 0:
        return None
    pos = positions[0]  # Asumir sólo una posición abierta por símbolo
    if pos.type == mt5.POSITION_TYPE_BUY:
        return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol}
    elif pos.type == mt5.POSITION_TYPE_SELL:
        return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol}
    return None

def close_position(position_info):
    """
    Closes the given position using its ticket.
    """
    symbol = position_info["symbol"]
    tick = mt5.symbol_info_tick(symbol)
    # Para el cierre: si BUY está abierto, vendemos a bid; si SELL está abierto, compramos a ask.
    if position_info["type"] == "buy":
        price = tick.bid
        order_type = mt5.ORDER_TYPE_SELL
    else:
        price = tick.ask
        order_type = mt5.ORDER_TYPE_BUY

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": position_info["volume"],
        "type": order_type,
        "position": position_info["ticket"],
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "Close position",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print("Close position result:", result)

def place_order(symbol, order_type, volume=0.01):
    """
    Place an order for the symbol.
    - order_type "buy": place BUY order.
    - order_type "sell": place SELL order.
    Before placing, if an open position exists with the opposite signal,
    it is closed using its order ticket.
    """
    # Comprueba la posición abierta actual:
    current_position = get_open_position(symbol)
    if current_position:
        # Si la posición existente es opuesta a la nueva señal, ciérrela.
        if (order_type == "buy" and current_position["type"] == "sell") or \
           (order_type == "sell" and current_position["type"] == "buy"):
            print(f"Opposite position ({current_position['type']}) detected. Closing it first.")
            close_position(current_position)
        # Si es del mismo tipo, no hacer nada.
        elif (order_type == current_position["type"]):
            print(f"{order_type.upper()} order already open. No new order will be placed.")
            return

    tick = mt5.symbol_info_tick(symbol)
    if order_type == "buy":
        price = tick.ask
        order_type_mt5 = mt5.ORDER_TYPE_BUY
    elif order_type == "sell":
        price = tick.bid
        order_type_mt5 = mt5.ORDER_TYPE_SELL
    else:
        print("Invalid order type:", order_type)
        return

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": volume,
        "type": order_type_mt5,
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "NEAT AI trade",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print(f"Placed {order_type.upper()} order result:", result)

# -------------------------------
# Inicializar la conexión con MetaTrader 5
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

symbol = "XAUUSDm"
timeframe = mt5.TIMEFRAME_M1  # 1 minuto

if not mt5.symbol_select(symbol, True):
    print("Failed to select symbol:", symbol)
    mt5.shutdown()
    quit()

# Bucle de supervisión continua
try:
    while True:
        print("\n" + "="*80)
        print("Retrieving candle data...")
        num_candles = 15
        rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles)
        if rates is None:
            print("Failed to get rates for", symbol)
        else:
            df = pd.DataFrame(rates)
            df['tick_count'] = df['tick_volume']
            df['9ma'] = df['close'].rolling(window=9).mean()
            df['14ma'] = df['close'].rolling(window=14).mean()
            df['TP'] = (df['high'] + df['low'] + df['close']) / 3
            df['NTP']   = (df['TP']    - df['low']) / (df['high'] - df['low'])
            df['NIP9']  = (df['9ma']   - df['low']) / (df['high'] - df['low'])
            df['NIP14'] = (df['14ma']  - df['low']) / (df['high'] - df['low'])
            close_prices = list(df['close'].tail(14))
            HC, LC = tournament(close_prices)
            tick_counts = list(df['tick_count'].tail(14))
            HT, LT = tournament(tick_counts)
            df['NCP'] = (df['close'] - LC) / (HC - LC)
            df['NT']  = (df['tick_count'] - LT) / (HT - LT)
            df = df.round(3)
            
            # Imprime los datos normalizados de la última vela
            display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14']
            print("\nNormalized Candle Data (Last Candle):")
            print(df[display_cols].tail(1).to_string(index=False))
            
            # Extraer entradas de la última vela
            last_row = df.iloc[-1]
            input_vector = [
                last_row['NTP'],
                last_row['NCP'],
                last_row['NT'],
                last_row['NIP9'],
                last_row['NIP14']
            ]
            
            print("\nExtracted Inputs:")
            print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}")
            print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close'])
            
            # Compute network signal
            signal = forward_pass(input_vector, best_model)
            print("\nNetwork Signal (discretized):", signal)
            
            # Ejecutar la operación basada en la señal utilizando nuestra lógica de gestión de órdenes.
            if signal == 1:
                print("Received BUY signal.")
                place_order(symbol, "buy", volume=0.01)
            elif signal == -1:
                print("Received SELL signal.")
                place_order(symbol, "sell", volume=0.01)
            else:
                print("Signal is neutral. No action taken.")
        
        # Wait until next minute boundary
        tick = mt5.symbol_info_tick(symbol)
        if tick is None:
            print("Failed to get tick info for", symbol)
            break
        server_time = tick.time
        next_minute = ((server_time // 60) + 1) * 60
        sleep_seconds = next_minute - server_time
        print(f"Sleeping for {sleep_seconds} seconds until next candle...")
        time.sleep(sleep_seconds)
except KeyboardInterrupt:
    print("Stopping continuous monitoring.")
finally:
    mt5.shutdown()
 
Muy bien.
 
Multi Dead redes neuronales y creé un modelo con la ayuda del chat GPT, porque no sé programar. Tardé dos semanas en normalizar los datos y dos días en crear el modelo, pero sólo tardé una hora en entrenarlo, 1.300 generaciones, 20 genomas por generación, mi portátil de segunda mano de 5 años estaba ardiendo y, cuando conecté el modelo a MT5, el modelo era muy agresivo y muy preciso en la predicción de las próximas velas, pero no era rentable. Pero fue divertido aprender y ver la predicción del modelo, por eso vine aquí a aprender más sobre IA, y este es el código de NEAT AI


y para MT5

Las redes neuronales son un tema realmente interesante y divertido. Sin embargo, me he tomado un descanso de explicarlo, ya que he decidido finalizar primero la repetición / simulador. Sin embargo, en cuanto termine de publicar los artículos sobre el simulador, volveremos con nuevos artículos sobre redes neuronales. El objetivo es siempre mostrar cómo funcionan. La mayoría de la gente piensa que son códigos mágicos, lo cual no es cierto. Pero siguen siendo un tema interesante y entretenido. Incluso estoy pensando en modelar algo para que todo el mundo pueda ver cómo una red neuronal aprende sin supervisión y sin datos previos. Lo cual es muy interesante, y puede ayudar a entender ciertas cosas. Detalle: Todo mi código estará escrito en MQL5. Y ya que has dicho que no eres programador. ¿Qué tal si aprendes MQL5 y empiezas a implementar tus propias soluciones? Estoy escribiendo una serie de artículos dirigidos a gente como tú. El último lo puedes ver aquí: https: //www.mql5.com/pt/articles/15833. En esta serie explico las cosas desde lo más básico. Así que si no sabes absolutamente nada de programación, vuelve al primer artículo de la serie. Los enlaces a los artículos anteriores estarán siempre al principio del artículo.

Do básico ao intermediário: Indicador (IV)
Do básico ao intermediário: Indicador (IV)
  • www.mql5.com
Neste artigo, vermos como é fácil de criar e implementar uma metodologia operacional, visando colorir candles. Sendo este um conceito, que diversos operadores apreciam imensamente. Porém, é preciso se tomar cuidado ao implementar tal tipo de coisa. Isto para que as barras, ou candles, mantenham a sua aparência original. Visando assim não prejudicar a leitura que muitos operadores fazem candle a candle.
 
Muchas gracias por esta introducción.