Discussão do artigo "Rede neural na prática: O primeiro neurônio"

 

Novo artigo Rede neural na prática: O primeiro neurônio foi publicado:

Neste artigo começamos a de fato criar algo que muitos ficam admirados em ver funcionando. Um simples e singelo neurônio que conseguiremos programar com muito pouco código em MQL5.O neurônio funcionou perfeitamente nos testes que fiz. Bem, vamos voltar um pouco, nesta mesma série sobre redes neurais, para que você possa entender do que estou falando.

Se você estudar com calma aqueles artigos anteriores, irá notar que foi preciso criar um certo malabarismo matemático. Isto para que o melhor valor possível fosse definido tanto para a constante <a> que é o coeficiente angular, como para a constante <b> que é o ponto de intersecção. Tais manobras, permitiam que a equação de reta mais adequada fosse encontrada. Sendo mostradas duas forma de fazer isto. Uma via cálculos de derivadas e outro via cálculos matriciais.

No entanto, para o que precisamos fazer aqui, deste momento em diante. Tais cálculos não nos serão de utilidade. Visto que precisamos modelar uma outra forma de encontrar as constantes, da equação de reta. No artigo anterior, mostrei como poderíamos fazer para encontrar a constante que representa o coeficiente angular. Espero que você tenha se divertido, e brincado bastante com aquele código. Pois agora vamos fazer algo um pouco mais complicado. Porém, apesar de ser só um pouco mais complicado, irá de fato abrir as portas para um monte de outras coisas. Literalmente, este talvez venha a ser o artigo mais interessante que você verá nesta série sobre redes neurais. Já que depois dele, tudo será muito mais simples e prático.

Autor: Daniel Jose

 

sim, correto, acabei de assistir a um vídeo no YouTube, treinando uma IA para jogar flappy bird e outros jogos com o método N.E.A.T., tive a ideia de treinar uma IA para negociação com o N.E.A.T., acabei de aprender o básico sobre redes neurais e criei um modelo com a ajuda do chat GPT, porque não sei programar. Levei duas semanas para normalizar os dados e dois dias para criar o modelo, mas levei apenas uma hora para treinar, 1.300 gerações, 20 genomas por geração, meu laptop de segunda mão de 5 anos estava pegando fogo e, quando conectei o modelo ao MT5, o modelo era muito agressivo e muito preciso na previsão dos próximos candles, mas não era lucrativo. Mas foi divertido aprender e ver a previsão do modelo, e é por isso que vim aqui para aprender mais sobre IA, e este é o código do NEAT AI


import os
import csv
import math
import random
import json
from datetime import datetime

# -----------------------------------------
# UTILIDADES: Ativação e passe simples para frente
# -----------------------------------------
def tanh(x):
    return math.tanh(x)

def forward_pass(genome, inputs):
    """
    genome = {
       'hidden_weights': 2D list,
       'hidden_bias': 1D list,
       'output_weights': list,
       'output_bias': float,
       'fitness': float
    }
    inputs: list of 5 floats
    Returns: single float in (-1, 1)
    """
    hidden_activations = []
    for h in range(len(genome['hidden_bias'])):
        z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs)))
        z += genome['hidden_bias'][h]
        hidden_activations.append(tanh(z))
    
    z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations)))
    z_out += genome['output_bias']
    return tanh(z_out)

def interpret_output(output):
    """
    Convert the Tanh output to discrete values:
      if output >= 0.5  =>  1  (Buy)
      if output <= -0.5 => -1  (Sell)
      otherwise         =>  0  (Hold)
    """
    if output >= 0.5:
        return 1
    elif output <= -0.5:
        return -1
    else:
        return 0

# ------------------------------------------------------
# ALGORITMO GENÉTICO: inicialização, seleção, mutação
# ------------------------------------------------------
def create_random_genome(input_size=5, hidden_size=8):
    return {
        'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)]
                           for _ in range(hidden_size)],
        'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_bias': random.uniform(-1, 1),
        'fitness': 0.0
    }

def mutate(genome, mutation_rate=0.1, mutation_strength=0.5):
    for h in range(len(genome['hidden_weights'])):
        for i in range(len(genome['hidden_weights'][h])):
            if random.random() < mutation_rate:
                genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['hidden_bias'])):
        if random.random() < mutation_rate:
            genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['output_weights'])):
        if random.random() < mutation_rate:
            genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength)
    if random.random() < mutation_rate:
        genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength)

def crossover(genome1, genome2):
    child = create_random_genome()  # inicialização aleatória, que será substituída
    for h in range(len(genome1['hidden_weights'])):
        for i in range(len(genome1['hidden_weights'][h])):
            child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i]
                                             if random.random() < 0.5
                                             else genome2['hidden_weights'][h][i])
    for h in range(len(genome1['hidden_bias'])):
        child['hidden_bias'][h] = (genome1['hidden_bias'][h]
                                   if random.random() < 0.5
                                   else genome2['hidden_bias'][h])
    for h in range(len(genome1['output_weights'])):
        child['output_weights'][h] = (genome1['output_weights'][h]
                                      if random.random() < 0.5
                                      else genome2['output_weights'][h])
    child['output_bias'] = (genome1['output_bias']
                            if random.random() < 0.5
                            else genome2['output_bias'])
    child['fitness'] = 0.0
    return child

def evolve_population(population, keep_best=5):
    population.sort(key=lambda g: g['fitness'], reverse=True)
    new_population = population[:keep_best]  # Mantenha os melhores genomas
    while len(new_population) < len(population):
        parent1 = random.choice(new_population)
        parent2 = random.choice(new_population)
        child = crossover(parent1, parent2)
        mutate(child)
        new_population.append(child)
    return new_population

# -------------------------------------------
# Salvar e carregar o melhor modelo para evitar um novo treinamento
# -------------------------------------------
def save_best_model(genome, filename="best_model.json"):
    data = {
        'hidden_weights': genome['hidden_weights'],
        'hidden_bias': genome['hidden_bias'],
        'output_weights': genome['output_weights'],
        'output_bias': genome['output_bias'],
        'fitness': genome['fitness']
    }
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_best_model(filename="best_model.json"):
    if not os.path.exists(filename):
        return None
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

# ----------------
# LÓGICA PRINCIPAL DO TREINAMENTO
# ----------------
def train_neural_network_data(folder_path):
    # Hiperparâmetros
    POP_SIZE = 20
    KEEP_BEST = 5
    input_size = 5
    hidden_size = 8

    # Criar uma população inicial ou carregar o melhor modelo, se disponível
    population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)]
    best_saved = load_best_model()
    if best_saved is not None:
        population[0]['hidden_weights'] = best_saved['hidden_weights']
        population[0]['hidden_bias']    = best_saved['hidden_bias']
        population[0]['output_weights'] = best_saved['output_weights']
        population[0]['output_bias']    = best_saved['output_bias']
        population[0]['fitness']        = 0.0

    # Ajudante para analisar cadeias de data/hora ISO8601 com informações de fuso horário
    def get_date_from_time_str(t_str):
        dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z")
        return dt.date()

    csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')])

    # Para cada genoma, armazene o último sinal e o preço correspondente (se houver)
    genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)]
    current_date = None
    generation_count = 0

    # Processar cada arquivo CSV um de cada vez (streaming linha por linha)
    for csv_file in csv_files:
        file_path = os.path.join(folder_path, csv_file)
        with open(file_path, 'r', newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                row_time_str = row['Time']
                row_date = get_date_from_time_str(row_time_str)

                # Verificar se um novo dia começou (fim da geração)
                if current_date is None:
                    current_date = row_date
                elif row_date != current_date:
                    # Geração encerrada; estatísticas de impressão para monitoramento
                    generation_count += 1
                    fitnesses = [g['fitness'] for g in population]
                    avg_fitness = sum(fitnesses) / len(fitnesses)
                    best_fitness = max(fitnesses)
                    print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}")

                    # Evoluir a população para a nova geração
                    population = evolve_population(population, keep_best=KEEP_BEST)

                    # Redefinir os estados do genoma para a nova geração
                    for state in genome_states:
                        state['last_signal'] = None
                        state['last_price'] = None

                    current_date = row_date

                # Preparar entradas em tempo real
                inputs = [
                    float(row['NTP']),
                    float(row['NCP']),
                    float(row['NT']),
                    float(row['NIP']),
                    float(row['N14IP'])
                ]
                bid_price = float(row['BidOpen'])
                ask_price = float(row['AskOpen'])

                # Processar a decisão de cada genoma para essa linha
                for i, genome in enumerate(population):
                    raw_output = forward_pass(genome, inputs)
                    signal_val = interpret_output(raw_output)  # -1, 0 ou 1
                    prev_signal = genome_states[i]['last_signal']
                    prev_price = genome_states[i]['last_price']

                    # Pular o processamento se o sinal não tiver mudado
                    if signal_val == prev_signal:
                        continue

                    genome_states[i]['last_signal'] = signal_val

                    # Aplicar lógica de adequação nas transições de sinal
                    if signal_val == 1:  # Sinal de compra
                        genome_states[i]['last_price'] = ask_price
                        if prev_signal == -1 and prev_price is not None:
                            if ask_price < prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    elif signal_val == -1:  # Sinal de venda
                        genome_states[i]['last_price'] = bid_price
                        if prev_signal == 1 and prev_price is not None:
                            if bid_price > prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    else:  # Sinal de espera
                        genome_states[i]['last_price'] = None

    # Após processar todos os arquivos CSV, finalize o treinamento
    population.sort(key=lambda g: g['fitness'], reverse=True)
    best_genome = population[0]
    print("Final Best genome fitness after all files:", best_genome['fitness'])
    save_best_model(best_genome, filename="best_model.json")

if __name__ == "__main__":
    folder_path = r""
    train_neural_network_data(folder_path)
e para o MT5
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import json
import time

def tournament(values):
    """Tournament method to find maximum and minimum in a list."""
    max_val = values[0]
    min_val = values[0]
    for v in values[1:]:
        if v > max_val:
            max_val = v
        if v < min_val:
            min_val = v
    return max_val, min_val

# -------------------------------
# Carregar o melhor modelo do JSON
with open("best_model.json", "r") as f:
    best_model = json.load(f)

def forward_pass(inputs, model):
    """
    Perform a forward pass through a feedforward network with one hidden layer.
    Uses tanh for both hidden and output layers.
    The output is discretized:
       >0.5 --> 1, < -0.5 --> -1, else 0.
    """
    x = np.array(inputs)  # forma (5,)
    hidden_weights = np.array(model["hidden_weights"])  # shape (8, 5)
    hidden_bias = np.array(model["hidden_bias"])          # forma (8,)
    output_weights = np.array(model["output_weights"])    # forma (8,)
    output_bias = model["output_bias"]                    # escalar

    hidden_input = np.dot(hidden_weights, x) + hidden_bias
    hidden_output = np.tanh(hidden_input)
    output_val = np.dot(output_weights, hidden_output) + output_bias
    output_activation = np.tanh(output_val)
    
    if output_activation > 0.5:
        return 1
    elif output_activation < -0.5:
        return -1
    else:
        return 0

# -------------------------------
# Funções de gerenciamento de pedidos usando posições
def get_open_position(symbol):
    """
    Returns information about an open position for the symbol as a dictionary:
    {"ticket": ticket, "type": "buy"/"sell", "volume": volume}.
    If no position is open, returns None.
    """
    positions = mt5.positions_get(symbol=symbol)
    if positions is None or len(positions) == 0:
        return None
    pos = positions[0]  # Assumir apenas uma posição aberta por símbolo
    if pos.type == mt5.POSITION_TYPE_BUY:
        return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol}
    elif pos.type == mt5.POSITION_TYPE_SELL:
        return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol}
    return None

def close_position(position_info):
    """
    Closes the given position using its ticket.
    """
    symbol = position_info["symbol"]
    tick = mt5.symbol_info_tick(symbol)
    # Para o fechamento: se BUY estiver aberto, vendemos pelo lance; se SELL estiver aberto, compramos pelo pedido.
    if position_info["type"] == "buy":
        price = tick.bid
        order_type = mt5.ORDER_TYPE_SELL
    else:
        price = tick.ask
        order_type = mt5.ORDER_TYPE_BUY

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": position_info["volume"],
        "type": order_type,
        "position": position_info["ticket"],
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "Close position",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print("Close position result:", result)

def place_order(symbol, order_type, volume=0.01):
    """
    Place an order for the symbol.
    - order_type "buy": place BUY order.
    - order_type "sell": place SELL order.
    Before placing, if an open position exists with the opposite signal,
    it is closed using its order ticket.
    """
    # Verificar a posição aberta atual:
    current_position = get_open_position(symbol)
    if current_position:
        # Se a posição existente for oposta ao novo sinal, feche-a.
        if (order_type == "buy" and current_position["type"] == "sell") or \
           (order_type == "sell" and current_position["type"] == "buy"):
            print(f"Opposite position ({current_position['type']}) detected. Closing it first.")
            close_position(current_position)
        # Se for do mesmo tipo, não faça nada.
        elif (order_type == current_position["type"]):
            print(f"{order_type.upper()} order already open. No new order will be placed.")
            return

    tick = mt5.symbol_info_tick(symbol)
    if order_type == "buy":
        price = tick.ask
        order_type_mt5 = mt5.ORDER_TYPE_BUY
    elif order_type == "sell":
        price = tick.bid
        order_type_mt5 = mt5.ORDER_TYPE_SELL
    else:
        print("Invalid order type:", order_type)
        return

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": volume,
        "type": order_type_mt5,
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "NEAT AI trade",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print(f"Placed {order_type.upper()} order result:", result)

# -------------------------------
# Inicializar a conexão com o MetaTrader 5
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

symbol = "XAUUSDm"
timeframe = mt5.TIMEFRAME_M1  # Período de tempo de 1 minuto

if not mt5.symbol_select(symbol, True):
    print("Failed to select symbol:", symbol)
    mt5.shutdown()
    quit()

# Loop de monitoramento contínuo
try:
    while True:
        print("\n" + "="*80)
        print("Retrieving candle data...")
        num_candles = 15
        rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles)
        if rates is None:
            print("Failed to get rates for", symbol)
        else:
            df = pd.DataFrame(rates)
            df['tick_count'] = df['tick_volume']
            df['9ma'] = df['close'].rolling(window=9).mean()
            df['14ma'] = df['close'].rolling(window=14).mean()
            df['TP'] = (df['high'] + df['low'] + df['close']) / 3
            df['NTP']   = (df['TP']    - df['low']) / (df['high'] - df['low'])
            df['NIP9']  = (df['9ma']   - df['low']) / (df['high'] - df['low'])
            df['NIP14'] = (df['14ma']  - df['low']) / (df['high'] - df['low'])
            close_prices = list(df['close'].tail(14))
            HC, LC = tournament(close_prices)
            tick_counts = list(df['tick_count'].tail(14))
            HT, LT = tournament(tick_counts)
            df['NCP'] = (df['close'] - LC) / (HC - LC)
            df['NT']  = (df['tick_count'] - LT) / (HT - LT)
            df = df.round(3)
            
            # Imprimir os dados normalizados da última vela
            display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14']
            print("\nNormalized Candle Data (Last Candle):")
            print(df[display_cols].tail(1).to_string(index=False))
            
            # Extrair entradas da última vela
            last_row = df.iloc[-1]
            input_vector = [
                last_row['NTP'],
                last_row['NCP'],
                last_row['NT'],
                last_row['NIP9'],
                last_row['NIP14']
            ]
            
            print("\nExtracted Inputs:")
            print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}")
            print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close'])
            
            # Computar o sinal da rede
            signal = forward_pass(input_vector, best_model)
            print("\nNetwork Signal (discretized):", signal)
            
            # Executar a negociação com base no sinal usando nossa lógica de gerenciamento de ordens
            if signal == 1:
                print("Received BUY signal.")
                place_order(symbol, "buy", volume=0.01)
            elif signal == -1:
                print("Received SELL signal.")
                place_order(symbol, "sell", volume=0.01)
            else:
                print("Signal is neutral. No action taken.")
        
        # Aguarde até o limite do próximo minuto
        tick = mt5.symbol_info_tick(symbol)
        if tick is None:
            print("Failed to get tick info for", symbol)
            break
        server_time = tick.time
        next_minute = ((server_time // 60) + 1) * 60
        sleep_seconds = next_minute - server_time
        print(f"Sleeping for {sleep_seconds} seconds until next candle...")
        time.sleep(sleep_seconds)
except KeyboardInterrupt:
    print("Stopping continuous monitoring.")
finally:
    mt5.shutdown()
 
Muito bom.
 
Multi Dead #:

sim, correto, acabei de assistir a um vídeo no YouTube, treinando uma IA para jogar flappy bird e outros jogos com o método N.E.A.T., tive a ideia de treinar uma IA para negociação com o N.E.A.T., acabei de aprender o básico sobre redes neurais e criei um modelo com a ajuda do chat GPT, porque não sei programar. Levei duas semanas para normalizar os dados e dois dias para criar o modelo, mas levei apenas uma hora para treinar, 1.300 gerações, 20 genomas por geração, meu laptop de segunda mão de 5 anos estava pegando fogo e, quando conectei o modelo ao MT5, o modelo era muito agressivo e muito preciso na previsão dos próximos candles, mas não era lucrativo. Mas foi divertido aprender e ver a previsão do modelo, e é por isso que vim aqui para aprender mais sobre IA, e este é o código do NEAT AI


e para o MT5

Redes neurais são um assunto realmente muito interessante e divertido. Porém dei um tempo na explicação, já que decidi primeiro finalizar o replay / simulador. No entanto, assim que acabar de postar os artigos sobre o simulador, iremos voltar a ter novos artigos sobre redes neurais. Sempre visando justamente mostrar como elas funcionam por debaixo dos panos. Já que a maioria imaginam que elas são códigos mágicos, o que de fato não é verdade. Mas nem por isto deixam de ser um assunto interessante e divertido. Estou até pensando em uma forma de modelar algo para que todos possam ver como uma rede neural aprende sem supervisão e sem dados prévios. O que é algo muito interessante, e pode ajudar no entendimento de certas coisas. Detalhe: Todos os meus códigos serão escritos em MQL5. E como você disse que não é programador. Que tal aprender MQL5 e assim começar a implementar suas próprias soluções ? Estou fazendo uma série de artigos voltados justamente para pessoas como você. O último que foi postado pode ser visto aqui: https://www.mql5.com/pt/articles/15833. Nesta serie explico as coisas desde o básico do básico. Então se você não sabe absolutamente nada de programação volte até o primeiro artigo da serie. Os links para os artigos anteriores, sempre estarão logo no inicio do artigo.

Do básico ao intermediário: Indicador (IV)
Do básico ao intermediário: Indicador (IV)
  • www.mql5.com
Neste artigo, vermos como é fácil de criar e implementar uma metodologia operacional, visando colorir candles. Sendo este um conceito, que diversos operadores apreciam imensamente. Porém, é preciso se tomar cuidado ao implementar tal tipo de coisa. Isto para que as barras, ou candles, mantenham a sua aparência original. Visando assim não prejudicar a leitura que muitos operadores fazem candle a candle.
 
Muito obrigado por essa introdução!