sim, correto, acabei de assistir a um vídeo no YouTube, treinando uma IA para jogar flappy bird e outros jogos com o método N.E.A.T., tive a ideia de treinar uma IA para negociação com o N.E.A.T., acabei de aprender o básico sobre redes neurais e criei um modelo com a ajuda do chat GPT, porque não sei programar. Levei duas semanas para normalizar os dados e dois dias para criar o modelo, mas levei apenas uma hora para treinar, 1.300 gerações, 20 genomas por geração, meu laptop de segunda mão de 5 anos estava pegando fogo e, quando conectei o modelo ao MT5, o modelo era muito agressivo e muito preciso na previsão dos próximos candles, mas não era lucrativo. Mas foi divertido aprender e ver a previsão do modelo, e é por isso que vim aqui para aprender mais sobre IA, e este é o código do NEAT AI
import os import csv import math import random import json from datetime import datetime # ----------------------------------------- # UTILIDADES: Ativação e passe simples para frente # ----------------------------------------- def tanh(x): return math.tanh(x) def forward_pass(genome, inputs): """ genome = { 'hidden_weights': 2D list, 'hidden_bias': 1D list, 'output_weights': list, 'output_bias': float, 'fitness': float } inputs: list of 5 floats Returns: single float in (-1, 1) """ hidden_activations = [] for h in range(len(genome['hidden_bias'])): z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs))) z += genome['hidden_bias'][h] hidden_activations.append(tanh(z)) z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations))) z_out += genome['output_bias'] return tanh(z_out) def interpret_output(output): """ Convert the Tanh output to discrete values: if output >= 0.5 => 1 (Buy) if output <= -0.5 => -1 (Sell) otherwise => 0 (Hold) """ if output >= 0.5: return 1 elif output <= -0.5: return -1 else: return 0 # ------------------------------------------------------ # ALGORITMO GENÉTICO: inicialização, seleção, mutação # ------------------------------------------------------ def create_random_genome(input_size=5, hidden_size=8): return { 'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)] for _ in range(hidden_size)], 'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_bias': random.uniform(-1, 1), 'fitness': 0.0 } def mutate(genome, mutation_rate=0.1, mutation_strength=0.5): for h in range(len(genome['hidden_weights'])): for i in range(len(genome['hidden_weights'][h])): if random.random() < mutation_rate: genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['hidden_bias'])): if random.random() < mutation_rate: genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['output_weights'])): if random.random() < mutation_rate: genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength) if random.random() < mutation_rate: genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength) def crossover(genome1, genome2): child = create_random_genome() # inicialização aleatória, que será substituída for h in range(len(genome1['hidden_weights'])): for i in range(len(genome1['hidden_weights'][h])): child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i] if random.random() < 0.5 else genome2['hidden_weights'][h][i]) for h in range(len(genome1['hidden_bias'])): child['hidden_bias'][h] = (genome1['hidden_bias'][h] if random.random() < 0.5 else genome2['hidden_bias'][h]) for h in range(len(genome1['output_weights'])): child['output_weights'][h] = (genome1['output_weights'][h] if random.random() < 0.5 else genome2['output_weights'][h]) child['output_bias'] = (genome1['output_bias'] if random.random() < 0.5 else genome2['output_bias']) child['fitness'] = 0.0 return child def evolve_population(population, keep_best=5): population.sort(key=lambda g: g['fitness'], reverse=True) new_population = population[:keep_best] # Mantenha os melhores genomas while len(new_population) < len(population): parent1 = random.choice(new_population) parent2 = random.choice(new_population) child = crossover(parent1, parent2) mutate(child) new_population.append(child) return new_population # ------------------------------------------- # Salvar e carregar o melhor modelo para evitar um novo treinamento # ------------------------------------------- def save_best_model(genome, filename="best_model.json"): data = { 'hidden_weights': genome['hidden_weights'], 'hidden_bias': genome['hidden_bias'], 'output_weights': genome['output_weights'], 'output_bias': genome['output_bias'], 'fitness': genome['fitness'] } with open(filename, 'w') as f: json.dump(data, f, indent=2) def load_best_model(filename="best_model.json"): if not os.path.exists(filename): return None with open(filename, 'r') as f: data = json.load(f) return data # ---------------- # LÓGICA PRINCIPAL DO TREINAMENTO # ---------------- def train_neural_network_data(folder_path): # Hiperparâmetros POP_SIZE = 20 KEEP_BEST = 5 input_size = 5 hidden_size = 8 # Criar uma população inicial ou carregar o melhor modelo, se disponível population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)] best_saved = load_best_model() if best_saved is not None: population[0]['hidden_weights'] = best_saved['hidden_weights'] population[0]['hidden_bias'] = best_saved['hidden_bias'] population[0]['output_weights'] = best_saved['output_weights'] population[0]['output_bias'] = best_saved['output_bias'] population[0]['fitness'] = 0.0 # Ajudante para analisar cadeias de data/hora ISO8601 com informações de fuso horário def get_date_from_time_str(t_str): dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z") return dt.date() csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')]) # Para cada genoma, armazene o último sinal e o preço correspondente (se houver) genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)] current_date = None generation_count = 0 # Processar cada arquivo CSV um de cada vez (streaming linha por linha) for csv_file in csv_files: file_path = os.path.join(folder_path, csv_file) with open(file_path, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: row_time_str = row['Time'] row_date = get_date_from_time_str(row_time_str) # Verificar se um novo dia começou (fim da geração) if current_date is None: current_date = row_date elif row_date != current_date: # Geração encerrada; estatísticas de impressão para monitoramento generation_count += 1 fitnesses = [g['fitness'] for g in population] avg_fitness = sum(fitnesses) / len(fitnesses) best_fitness = max(fitnesses) print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}") # Evoluir a população para a nova geração population = evolve_population(population, keep_best=KEEP_BEST) # Redefinir os estados do genoma para a nova geração for state in genome_states: state['last_signal'] = None state['last_price'] = None current_date = row_date # Preparar entradas em tempo real inputs = [ float(row['NTP']), float(row['NCP']), float(row['NT']), float(row['NIP']), float(row['N14IP']) ] bid_price = float(row['BidOpen']) ask_price = float(row['AskOpen']) # Processar a decisão de cada genoma para essa linha for i, genome in enumerate(population): raw_output = forward_pass(genome, inputs) signal_val = interpret_output(raw_output) # -1, 0 ou 1 prev_signal = genome_states[i]['last_signal'] prev_price = genome_states[i]['last_price'] # Pular o processamento se o sinal não tiver mudado if signal_val == prev_signal: continue genome_states[i]['last_signal'] = signal_val # Aplicar lógica de adequação nas transições de sinal if signal_val == 1: # Sinal de compra genome_states[i]['last_price'] = ask_price if prev_signal == -1 and prev_price is not None: if ask_price < prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 elif signal_val == -1: # Sinal de venda genome_states[i]['last_price'] = bid_price if prev_signal == 1 and prev_price is not None: if bid_price > prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 else: # Sinal de espera genome_states[i]['last_price'] = None # Após processar todos os arquivos CSV, finalize o treinamento population.sort(key=lambda g: g['fitness'], reverse=True) best_genome = population[0] print("Final Best genome fitness after all files:", best_genome['fitness']) save_best_model(best_genome, filename="best_model.json") if __name__ == "__main__": folder_path = r"" train_neural_network_data(folder_path)e para o MT5
import MetaTrader5 as mt5 import pandas as pd import numpy as np import json import time def tournament(values): """Tournament method to find maximum and minimum in a list.""" max_val = values[0] min_val = values[0] for v in values[1:]: if v > max_val: max_val = v if v < min_val: min_val = v return max_val, min_val # ------------------------------- # Carregar o melhor modelo do JSON with open("best_model.json", "r") as f: best_model = json.load(f) def forward_pass(inputs, model): """ Perform a forward pass through a feedforward network with one hidden layer. Uses tanh for both hidden and output layers. The output is discretized: >0.5 --> 1, < -0.5 --> -1, else 0. """ x = np.array(inputs) # forma (5,) hidden_weights = np.array(model["hidden_weights"]) # shape (8, 5) hidden_bias = np.array(model["hidden_bias"]) # forma (8,) output_weights = np.array(model["output_weights"]) # forma (8,) output_bias = model["output_bias"] # escalar hidden_input = np.dot(hidden_weights, x) + hidden_bias hidden_output = np.tanh(hidden_input) output_val = np.dot(output_weights, hidden_output) + output_bias output_activation = np.tanh(output_val) if output_activation > 0.5: return 1 elif output_activation < -0.5: return -1 else: return 0 # ------------------------------- # Funções de gerenciamento de pedidos usando posições def get_open_position(symbol): """ Returns information about an open position for the symbol as a dictionary: {"ticket": ticket, "type": "buy"/"sell", "volume": volume}. If no position is open, returns None. """ positions = mt5.positions_get(symbol=symbol) if positions is None or len(positions) == 0: return None pos = positions[0] # Assumir apenas uma posição aberta por símbolo if pos.type == mt5.POSITION_TYPE_BUY: return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol} elif pos.type == mt5.POSITION_TYPE_SELL: return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol} return None def close_position(position_info): """ Closes the given position using its ticket. """ symbol = position_info["symbol"] tick = mt5.symbol_info_tick(symbol) # Para o fechamento: se BUY estiver aberto, vendemos pelo lance; se SELL estiver aberto, compramos pelo pedido. if position_info["type"] == "buy": price = tick.bid order_type = mt5.ORDER_TYPE_SELL else: price = tick.ask order_type = mt5.ORDER_TYPE_BUY request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": position_info["volume"], "type": order_type, "position": position_info["ticket"], "price": price, "deviation": 10, "magic": 123456, "comment": "Close position", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print("Close position result:", result) def place_order(symbol, order_type, volume=0.01): """ Place an order for the symbol. - order_type "buy": place BUY order. - order_type "sell": place SELL order. Before placing, if an open position exists with the opposite signal, it is closed using its order ticket. """ # Verificar a posição aberta atual: current_position = get_open_position(symbol) if current_position: # Se a posição existente for oposta ao novo sinal, feche-a. if (order_type == "buy" and current_position["type"] == "sell") or \ (order_type == "sell" and current_position["type"] == "buy"): print(f"Opposite position ({current_position['type']}) detected. Closing it first.") close_position(current_position) # Se for do mesmo tipo, não faça nada. elif (order_type == current_position["type"]): print(f"{order_type.upper()} order already open. No new order will be placed.") return tick = mt5.symbol_info_tick(symbol) if order_type == "buy": price = tick.ask order_type_mt5 = mt5.ORDER_TYPE_BUY elif order_type == "sell": price = tick.bid order_type_mt5 = mt5.ORDER_TYPE_SELL else: print("Invalid order type:", order_type) return request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type_mt5, "price": price, "deviation": 10, "magic": 123456, "comment": "NEAT AI trade", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print(f"Placed {order_type.upper()} order result:", result) # ------------------------------- # Inicializar a conexão com o MetaTrader 5 if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() symbol = "XAUUSDm" timeframe = mt5.TIMEFRAME_M1 # Período de tempo de 1 minuto if not mt5.symbol_select(symbol, True): print("Failed to select symbol:", symbol) mt5.shutdown() quit() # Loop de monitoramento contínuo try: while True: print("\n" + "="*80) print("Retrieving candle data...") num_candles = 15 rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles) if rates is None: print("Failed to get rates for", symbol) else: df = pd.DataFrame(rates) df['tick_count'] = df['tick_volume'] df['9ma'] = df['close'].rolling(window=9).mean() df['14ma'] = df['close'].rolling(window=14).mean() df['TP'] = (df['high'] + df['low'] + df['close']) / 3 df['NTP'] = (df['TP'] - df['low']) / (df['high'] - df['low']) df['NIP9'] = (df['9ma'] - df['low']) / (df['high'] - df['low']) df['NIP14'] = (df['14ma'] - df['low']) / (df['high'] - df['low']) close_prices = list(df['close'].tail(14)) HC, LC = tournament(close_prices) tick_counts = list(df['tick_count'].tail(14)) HT, LT = tournament(tick_counts) df['NCP'] = (df['close'] - LC) / (HC - LC) df['NT'] = (df['tick_count'] - LT) / (HT - LT) df = df.round(3) # Imprimir os dados normalizados da última vela display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14'] print("\nNormalized Candle Data (Last Candle):") print(df[display_cols].tail(1).to_string(index=False)) # Extrair entradas da última vela last_row = df.iloc[-1] input_vector = [ last_row['NTP'], last_row['NCP'], last_row['NT'], last_row['NIP9'], last_row['NIP14'] ] print("\nExtracted Inputs:") print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}") print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close']) # Computar o sinal da rede signal = forward_pass(input_vector, best_model) print("\nNetwork Signal (discretized):", signal) # Executar a negociação com base no sinal usando nossa lógica de gerenciamento de ordens if signal == 1: print("Received BUY signal.") place_order(symbol, "buy", volume=0.01) elif signal == -1: print("Received SELL signal.") place_order(symbol, "sell", volume=0.01) else: print("Signal is neutral. No action taken.") # Aguarde até o limite do próximo minuto tick = mt5.symbol_info_tick(symbol) if tick is None: print("Failed to get tick info for", symbol) break server_time = tick.time next_minute = ((server_time // 60) + 1) * 60 sleep_seconds = next_minute - server_time print(f"Sleeping for {sleep_seconds} seconds until next candle...") time.sleep(sleep_seconds) except KeyboardInterrupt: print("Stopping continuous monitoring.") finally: mt5.shutdown()
sim, correto, acabei de assistir a um vídeo no YouTube, treinando uma IA para jogar flappy bird e outros jogos com o método N.E.A.T., tive a ideia de treinar uma IA para negociação com o N.E.A.T., acabei de aprender o básico sobre redes neurais e criei um modelo com a ajuda do chat GPT, porque não sei programar. Levei duas semanas para normalizar os dados e dois dias para criar o modelo, mas levei apenas uma hora para treinar, 1.300 gerações, 20 genomas por geração, meu laptop de segunda mão de 5 anos estava pegando fogo e, quando conectei o modelo ao MT5, o modelo era muito agressivo e muito preciso na previsão dos próximos candles, mas não era lucrativo. Mas foi divertido aprender e ver a previsão do modelo, e é por isso que vim aqui para aprender mais sobre IA, e este é o código do NEAT AI
Redes neurais são um assunto realmente muito interessante e divertido. Porém dei um tempo na explicação, já que decidi primeiro finalizar o replay / simulador. No entanto, assim que acabar de postar os artigos sobre o simulador, iremos voltar a ter novos artigos sobre redes neurais. Sempre visando justamente mostrar como elas funcionam por debaixo dos panos. Já que a maioria imaginam que elas são códigos mágicos, o que de fato não é verdade. Mas nem por isto deixam de ser um assunto interessante e divertido. Estou até pensando em uma forma de modelar algo para que todos possam ver como uma rede neural aprende sem supervisão e sem dados prévios. O que é algo muito interessante, e pode ajudar no entendimento de certas coisas. Detalhe: Todos os meus códigos serão escritos em MQL5. E como você disse que não é programador. Que tal aprender MQL5 e assim começar a implementar suas próprias soluções ? Estou fazendo uma série de artigos voltados justamente para pessoas como você. O último que foi postado pode ser visto aqui: https://www.mql5.com/pt/articles/15833. Nesta serie explico as coisas desde o básico do básico. Então se você não sabe absolutamente nada de programação volte até o primeiro artigo da serie. Os links para os artigos anteriores, sempre estarão logo no inicio do artigo.
- www.mql5.com
- Aplicativos de negociação gratuitos
- 8 000+ sinais para cópia
- Notícias econômicas para análise dos mercados financeiros
Você concorda com a política do site e com os termos de uso
Novo artigo Rede neural na prática: O primeiro neurônio foi publicado:
Neste artigo começamos a de fato criar algo que muitos ficam admirados em ver funcionando. Um simples e singelo neurônio que conseguiremos programar com muito pouco código em MQL5.O neurônio funcionou perfeitamente nos testes que fiz. Bem, vamos voltar um pouco, nesta mesma série sobre redes neurais, para que você possa entender do que estou falando.
Se você estudar com calma aqueles artigos anteriores, irá notar que foi preciso criar um certo malabarismo matemático. Isto para que o melhor valor possível fosse definido tanto para a constante <a> que é o coeficiente angular, como para a constante <b> que é o ponto de intersecção. Tais manobras, permitiam que a equação de reta mais adequada fosse encontrada. Sendo mostradas duas forma de fazer isto. Uma via cálculos de derivadas e outro via cálculos matriciais.
No entanto, para o que precisamos fazer aqui, deste momento em diante. Tais cálculos não nos serão de utilidade. Visto que precisamos modelar uma outra forma de encontrar as constantes, da equação de reta. No artigo anterior, mostrei como poderíamos fazer para encontrar a constante que representa o coeficiente angular. Espero que você tenha se divertido, e brincado bastante com aquele código. Pois agora vamos fazer algo um pouco mais complicado. Porém, apesar de ser só um pouco mais complicado, irá de fato abrir as portas para um monte de outras coisas. Literalmente, este talvez venha a ser o artigo mais interessante que você verá nesta série sobre redes neurais. Já que depois dele, tudo será muito mais simples e prático.
Autor: Daniel Jose