да, правильно, я только что видел видео на YouTube, обучение ИИ играть flappy bird и другие игры с N.E.A.T метод, у меня появилась идея обучить ИИ для торговли с N.E.A.T, я только что изучил основы нейронной сети, и создал модель с помощью чата GPT, потому что я не знаю, чтобы код. У меня ушло две недели на нормализацию данных и два дня на создание модели, но на обучение ушел всего один час, 1300 генераций, 20 геномов на генерацию, мой 5-летний подержанный ноутбук горел, и когда я подключил модель к MT5, модель была такой агрессивной и так точно предсказывала следующие свечи, но не приносила прибыли. Потому что данные не были нормализованы должным образом, и я до сих пор не понимаю код модели. Но было весело учиться и видеть, как модель предсказывает, и именно поэтому я пришел сюда, чтобы узнать больше об искусственном интеллекте, и вот код NEAT AI
import os import csv import math import random import json from datetime import datetime # ----------------------------------------- # UTILS: Активация и простая передача вперед # ----------------------------------------- def tanh(x): return math.tanh(x) def forward_pass(genome, inputs): """ genome = { 'hidden_weights': 2D list, 'hidden_bias': 1D list, 'output_weights': list, 'output_bias': float, 'fitness': float } inputs: list of 5 floats Returns: single float in (-1, 1) """ hidden_activations = [] for h in range(len(genome['hidden_bias'])): z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs))) z += genome['hidden_bias'][h] hidden_activations.append(tanh(z)) z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations))) z_out += genome['output_bias'] return tanh(z_out) def interpret_output(output): """ Convert the Tanh output to discrete values: if output >= 0.5 => 1 (Buy) if output <= -0.5 => -1 (Sell) otherwise => 0 (Hold) """ if output >= 0.5: return 1 elif output <= -0.5: return -1 else: return 0 # ------------------------------------------------------ # ГЕНЕТИЧЕСКИЙ АЛГОРИТМ: инициализация, отбор, мутация # ------------------------------------------------------ def create_random_genome(input_size=5, hidden_size=8): return { 'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)] for _ in range(hidden_size)], 'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)], 'output_bias': random.uniform(-1, 1), 'fitness': 0.0 } def mutate(genome, mutation_rate=0.1, mutation_strength=0.5): for h in range(len(genome['hidden_weights'])): for i in range(len(genome['hidden_weights'][h])): if random.random() < mutation_rate: genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['hidden_bias'])): if random.random() < mutation_rate: genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength) for h in range(len(genome['output_weights'])): if random.random() < mutation_rate: genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength) if random.random() < mutation_rate: genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength) def crossover(genome1, genome2): child = create_random_genome() # случайная инициация, будет перезаписана for h in range(len(genome1['hidden_weights'])): for i in range(len(genome1['hidden_weights'][h])): child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i] if random.random() < 0.5 else genome2['hidden_weights'][h][i]) for h in range(len(genome1['hidden_bias'])): child['hidden_bias'][h] = (genome1['hidden_bias'][h] if random.random() < 0.5 else genome2['hidden_bias'][h]) for h in range(len(genome1['output_weights'])): child['output_weights'][h] = (genome1['output_weights'][h] if random.random() < 0.5 else genome2['output_weights'][h]) child['output_bias'] = (genome1['output_bias'] if random.random() < 0.5 else genome2['output_bias']) child['fitness'] = 0.0 return child def evolve_population(population, keep_best=5): population.sort(key=lambda g: g['fitness'], reverse=True) new_population = population[:keep_best] # Сохраните лучшие геномы while len(new_population) < len(population): parent1 = random.choice(new_population) parent2 = random.choice(new_population) child = crossover(parent1, parent2) mutate(child) new_population.append(child) return new_population # ------------------------------------------- # Сохранение и загрузка лучшей модели, чтобы избежать повторного обучения # ------------------------------------------- def save_best_model(genome, filename="best_model.json"): data = { 'hidden_weights': genome['hidden_weights'], 'hidden_bias': genome['hidden_bias'], 'output_weights': genome['output_weights'], 'output_bias': genome['output_bias'], 'fitness': genome['fitness'] } with open(filename, 'w') as f: json.dump(data, f, indent=2) def load_best_model(filename="best_model.json"): if not os.path.exists(filename): return None with open(filename, 'r') as f: data = json.load(f) return data # ---------------- # ОСНОВНАЯ ЛОГИКА ОБУЧЕНИЯ # ---------------- def train_neural_network_data(folder_path): # Гиперпараметры POP_SIZE = 20 KEEP_BEST = 5 input_size = 5 hidden_size = 8 # Создайте начальную популяцию или загрузите лучшую модель, если она доступна population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)] best_saved = load_best_model() if best_saved is not None: population[0]['hidden_weights'] = best_saved['hidden_weights'] population[0]['hidden_bias'] = best_saved['hidden_bias'] population[0]['output_weights'] = best_saved['output_weights'] population[0]['output_bias'] = best_saved['output_bias'] population[0]['fitness'] = 0.0 # Помощник для разбора строк даты/времени ISO8601 с информацией о часовом поясе def get_date_from_time_str(t_str): dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z") return dt.date() csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')]) # Для каждого генома сохраните последний сигнал и соответствующую цену (если есть). genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)] current_date = None generation_count = 0 # Обрабатывайте каждый CSV-файл по очереди (поток строка за строкой) for csv_file in csv_files: file_path = os.path.join(folder_path, csv_file) with open(file_path, 'r', newline='') as f: reader = csv.DictReader(f) for row in reader: row_time_str = row['Time'] row_date = get_date_from_time_str(row_time_str) # Проверьте, начался ли новый день (конец генерации) if current_date is None: current_date = row_date elif row_date != current_date: # Генерация завершена; выведите статистику для мониторинга generation_count += 1 fitnesses = [g['fitness'] for g in population] avg_fitness = sum(fitnesses) / len(fitnesses) best_fitness = max(fitnesses) print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}") # Эволюция населения для нового поколения population = evolve_population(population, keep_best=KEEP_BEST) # Сброс состояния генома для нового поколения for state in genome_states: state['last_signal'] = None state['last_price'] = None current_date = row_date # Подготовить входные данные на лету inputs = [ float(row['NTP']), float(row['NCP']), float(row['NT']), float(row['NIP']), float(row['N14IP']) ] bid_price = float(row['BidOpen']) ask_price = float(row['AskOpen']) # Обработать решение каждого генома для этого ряда for i, genome in enumerate(population): raw_output = forward_pass(genome, inputs) signal_val = interpret_output(raw_output) # -1, 0 или 1 prev_signal = genome_states[i]['last_signal'] prev_price = genome_states[i]['last_price'] # Пропустить обработку, если сигнал не изменился if signal_val == prev_signal: continue genome_states[i]['last_signal'] = signal_val # Применяйте фитнес-логику на переходах сигналов if signal_val == 1: # Купить сигнал genome_states[i]['last_price'] = ask_price if prev_signal == -1 and prev_price is not None: if ask_price < prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 elif signal_val == -1: # Сигнал на продажу genome_states[i]['last_price'] = bid_price if prev_signal == 1 and prev_price is not None: if bid_price > prev_price: genome['fitness'] += 1 else: genome['fitness'] -= 1 else: # Сигнал удержания genome_states[i]['last_price'] = None # После обработки всех CSV-файлов завершите обучение population.sort(key=lambda g: g['fitness'], reverse=True) best_genome = population[0] print("Final Best genome fitness after all files:", best_genome['fitness']) save_best_model(best_genome, filename="best_model.json") if __name__ == "__main__": folder_path = r"" train_neural_network_data(folder_path)и для MT5
import MetaTrader5 as mt5 import pandas as pd import numpy as np import json import time def tournament(values): """Tournament method to find maximum and minimum in a list.""" max_val = values[0] min_val = values[0] for v in values[1:]: if v > max_val: max_val = v if v < min_val: min_val = v return max_val, min_val # ------------------------------- # Загрузите лучшую модель из JSON with open("best_model.json", "r") as f: best_model = json.load(f) def forward_pass(inputs, model): """ Perform a forward pass through a feedforward network with one hidden layer. Uses tanh for both hidden and output layers. The output is discretized: >0.5 --> 1, < -0.5 --> -1, else 0. """ x = np.array(inputs) # shape (5,) hidden_weights = np.array(model["hidden_weights"]) # shape (8, 5) hidden_bias = np.array(model["hidden_bias"]) # shape (8,) output_weights = np.array(model["output_weights"]) # shape (8,) output_bias = model["output_bias"] # скаляр hidden_input = np.dot(hidden_weights, x) + hidden_bias hidden_output = np.tanh(hidden_input) output_val = np.dot(output_weights, hidden_output) + output_bias output_activation = np.tanh(output_val) if output_activation > 0.5: return 1 elif output_activation < -0.5: return -1 else: return 0 # ------------------------------- # Функции управления заказами с помощью позиций def get_open_position(symbol): """ Returns information about an open position for the symbol as a dictionary: {"ticket": ticket, "type": "buy"/"sell", "volume": volume}. If no position is open, returns None. """ positions = mt5.positions_get(symbol=symbol) if positions is None or len(positions) == 0: return None pos = positions[0] # предполагать только одну открытую позицию на символ if pos.type == mt5.POSITION_TYPE_BUY: return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol} elif pos.type == mt5.POSITION_TYPE_SELL: return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol} return None def close_position(position_info): """ Closes the given position using its ticket. """ symbol = position_info["symbol"] tick = mt5.symbol_info_tick(symbol) # Для закрытия: если открыта BUY, мы продаем по bid; если открыта SELL, мы покупаем по ask. if position_info["type"] == "buy": price = tick.bid order_type = mt5.ORDER_TYPE_SELL else: price = tick.ask order_type = mt5.ORDER_TYPE_BUY request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": position_info["volume"], "type": order_type, "position": position_info["ticket"], "price": price, "deviation": 10, "magic": 123456, "comment": "Close position", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print("Close position result:", result) def place_order(symbol, order_type, volume=0.01): """ Place an order for the symbol. - order_type "buy": place BUY order. - order_type "sell": place SELL order. Before placing, if an open position exists with the opposite signal, it is closed using its order ticket. """ # Проверьте текущую открытую позицию: current_position = get_open_position(symbol) if current_position: # Если существующая позиция противоположна новому сигналу, закройте ее. if (order_type == "buy" and current_position["type"] == "sell") or \ (order_type == "sell" and current_position["type"] == "buy"): print(f"Opposite position ({current_position['type']}) detected. Closing it first.") close_position(current_position) # Если тип тот же, ничего не делайте. elif (order_type == current_position["type"]): print(f"{order_type.upper()} order already open. No new order will be placed.") return tick = mt5.symbol_info_tick(symbol) if order_type == "buy": price = tick.ask order_type_mt5 = mt5.ORDER_TYPE_BUY elif order_type == "sell": price = tick.bid order_type_mt5 = mt5.ORDER_TYPE_SELL else: print("Invalid order type:", order_type) return request = { "action": mt5.TRADE_ACTION_DEAL, "symbol": symbol, "volume": volume, "type": order_type_mt5, "price": price, "deviation": 10, "magic": 123456, "comment": "NEAT AI trade", "type_time": mt5.ORDER_TIME_GTC, "type_filling": mt5.ORDER_FILLING_IOC, } result = mt5.order_send(request) print(f"Placed {order_type.upper()} order result:", result) # ------------------------------- # Инициализация подключения к MetaTrader 5 if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() symbol = "XAUUSDm" timeframe = mt5.TIMEFRAME_M1 # 1-минутный таймфрейм if not mt5.symbol_select(symbol, True): print("Failed to select symbol:", symbol) mt5.shutdown() quit() # Непрерывный цикл мониторинга try: while True: print("\n" + "="*80) print("Retrieving candle data...") num_candles = 15 rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles) if rates is None: print("Failed to get rates for", symbol) else: df = pd.DataFrame(rates) df['tick_count'] = df['tick_volume'] df['9ma'] = df['close'].rolling(window=9).mean() df['14ma'] = df['close'].rolling(window=14).mean() df['TP'] = (df['high'] + df['low'] + df['close']) / 3 df['NTP'] = (df['TP'] - df['low']) / (df['high'] - df['low']) df['NIP9'] = (df['9ma'] - df['low']) / (df['high'] - df['low']) df['NIP14'] = (df['14ma'] - df['low']) / (df['high'] - df['low']) close_prices = list(df['close'].tail(14)) HC, LC = tournament(close_prices) tick_counts = list(df['tick_count'].tail(14)) HT, LT = tournament(tick_counts) df['NCP'] = (df['close'] - LC) / (HC - LC) df['NT'] = (df['tick_count'] - LT) / (HT - LT) df = df.round(3) # Вывести нормализованные данные последней свечи display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14'] print("\nNormalized Candle Data (Last Candle):") print(df[display_cols].tail(1).to_string(index=False)) # Извлечение входных данных из последней свечи last_row = df.iloc[-1] input_vector = [ last_row['NTP'], last_row['NCP'], last_row['NT'], last_row['NIP9'], last_row['NIP14'] ] print("\nExtracted Inputs:") print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}") print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close']) # Вычислить сигнал сети signal = forward_pass(input_vector, best_model) print("\nNetwork Signal (discretized):", signal) # Выполнение сделки на основе сигнала с использованием нашей логики управления ордерами if signal == 1: print("Received BUY signal.") place_order(symbol, "buy", volume=0.01) elif signal == -1: print("Received SELL signal.") place_order(symbol, "sell", volume=0.01) else: print("Signal is neutral. No action taken.") # Подождите до следующей минутной границы tick = mt5.symbol_info_tick(symbol) if tick is None: print("Failed to get tick info for", symbol) break server_time = tick.time next_minute = ((server_time // 60) + 1) * 60 sleep_seconds = next_minute - server_time print(f"Sleeping for {sleep_seconds} seconds until next candle...") time.sleep(sleep_seconds) except KeyboardInterrupt: print("Stopping continuous monitoring.") finally: mt5.shutdown()
Нейронные сети - это действительно интересная и увлекательная тема. Однако я взял паузу в объяснении, так как решил сначала доработать реплейсер/симулятор. Однако как только я закончу публиковать статьи по симулятору, мы вернемся с новыми статьями о нейронных сетях. Цель всегда состоит в том, чтобы показать, как они работают под капотом. Большинство людей думают, что это волшебные коды, что не так. Но это все равно интересная и занимательная тема. Я даже подумываю смоделировать что-нибудь, чтобы все могли увидеть, как нейронная сеть обучается без наблюдения или предварительных данных. Это очень интересно и может помочь в понимании некоторых вещей. Деталь: весь мой код будет написан на MQL5. И раз уж вы сказали, что вы не программист. Как насчет того, чтобы выучить MQL5 и начать реализовывать свои собственные решения? Я пишу серию статей, ориентированных на таких людей, как вы. Последнюю из них можно посмотреть здесь: https: //www.mql5.com/pt/articles/15833. В этой серии я объясняю все с самых азов. Так что если вы совсем ничего не знаете о программировании, вернитесь к первой статье цикла. Ссылки на предыдущие статьи всегда будут находиться в начале статьи.
- www.mql5.com
- Бесплатные приложения для трейдинга
- 8 000+ сигналов для копирования
- Экономические новости для анализа финансовых рынков
Вы принимаете политику сайта и условия использования
Опубликована статья Нейронная сеть на практике: Первый нейрон:
В этой статье мы начнем создавать нечто простое и скромное: нейрон. Мы запрограммируем его с помощью очень небольшого кода на MQL5. Нейрон прекрасно работал в тех тестах, которые я проводил. Вернемся немного назад в этой серии статей о нейронных сетях, чтобы понять, о чем я говорю.
Если спокойно изучить предыдущие статьи, то можно заметить, что для определения наилучшего возможного значения как константы < a >, которая является угловым коэффициентом, так и константы < b >, которая является точкой пересечения, потребовалось некоторое математическое жонглирование. Данные манипуляции позволили найти наиболее подходящее уравнение прямой линии, причем было показано два способа сделать это: один - с помощью вычисления производных, другой - с помощью матричных вычислений.
Однако теперь, для всего того, что нам нужно сделать, такие вычисления не пригодятся, поскольку нам нужно смоделировать другой способ поиска констант уравнения прямой. В предыдущей статье мы показали, как можно найти константу, представляющую собой угловой коэффициент. Надеюсь, вам было интересно и вы много экспериментировали с этим кодом, потому что сейчас мы сделаем кое-что более сложное. Однако, несмотря на сложности, на самом деле это открывает двери для многих других вещей. Это, пожалуй, самая интересная статья, которую вы прочтете в этой серии о нейронных сетях, потому что после нее всё станет намного проще и практичнее.
Автор: Daniel Jose