Обсуждение статьи "Нейронная сеть на практике: Первый нейрон"

 

Опубликована статья Нейронная сеть на практике: Первый нейрон:

В этой статье мы начнем создавать нечто простое и скромное: нейрон. Мы запрограммируем его с помощью очень небольшого кода на MQL5. Нейрон прекрасно работал в тех тестах, которые я проводил. Вернемся немного назад в этой серии статей о нейронных сетях, чтобы понять, о чем я говорю.

Если спокойно изучить предыдущие статьи, то можно заметить, что для определения наилучшего возможного значения как константы < a >, которая является угловым коэффициентом, так и константы < b >, которая является точкой пересечения, потребовалось некоторое математическое жонглирование. Данные манипуляции позволили найти наиболее подходящее уравнение прямой линии, причем было показано два способа сделать это: один - с помощью вычисления производных, другой - с помощью матричных вычислений.

Однако теперь, для всего того, что нам нужно сделать, такие вычисления не пригодятся, поскольку нам нужно смоделировать другой способ поиска констант уравнения прямой. В предыдущей статье мы показали, как можно найти константу, представляющую собой угловой коэффициент. Надеюсь, вам было интересно и вы много экспериментировали с этим кодом, потому что сейчас мы сделаем кое-что более сложное. Однако, несмотря на сложности, на самом деле это открывает двери для многих других вещей. Это, пожалуй, самая интересная статья, которую вы прочтете в этой серии о нейронных сетях, потому что после нее всё станет намного проще и практичнее.

Автор: Daniel Jose

 

да, правильно, я только что видел видео на YouTube, обучение ИИ играть flappy bird и другие игры с N.E.A.T метод, у меня появилась идея обучить ИИ для торговли с N.E.A.T, я только что изучил основы нейронной сети, и создал модель с помощью чата GPT, потому что я не знаю, чтобы код. У меня ушло две недели на нормализацию данных и два дня на создание модели, но на обучение ушел всего один час, 1300 генераций, 20 геномов на генерацию, мой 5-летний подержанный ноутбук горел, и когда я подключил модель к MT5, модель была такой агрессивной и так точно предсказывала следующие свечи, но не приносила прибыли. Потому что данные не были нормализованы должным образом, и я до сих пор не понимаю код модели. Но было весело учиться и видеть, как модель предсказывает, и именно поэтому я пришел сюда, чтобы узнать больше об искусственном интеллекте, и вот код NEAT AI


import os
import csv
import math
import random
import json
from datetime import datetime

# -----------------------------------------
# UTILS: Активация и простая передача вперед
# -----------------------------------------
def tanh(x):
    return math.tanh(x)

def forward_pass(genome, inputs):
    """
    genome = {
       'hidden_weights': 2D list,
       'hidden_bias': 1D list,
       'output_weights': list,
       'output_bias': float,
       'fitness': float
    }
    inputs: list of 5 floats
    Returns: single float in (-1, 1)
    """
    hidden_activations = []
    for h in range(len(genome['hidden_bias'])):
        z = sum(inputs[i] * genome['hidden_weights'][h][i] for i in range(len(inputs)))
        z += genome['hidden_bias'][h]
        hidden_activations.append(tanh(z))
    
    z_out = sum(hidden_activations[h] * genome['output_weights'][h] for h in range(len(hidden_activations)))
    z_out += genome['output_bias']
    return tanh(z_out)

def interpret_output(output):
    """
    Convert the Tanh output to discrete values:
      if output >= 0.5  =>  1  (Buy)
      if output <= -0.5 => -1  (Sell)
      otherwise         =>  0  (Hold)
    """
    if output >= 0.5:
        return 1
    elif output <= -0.5:
        return -1
    else:
        return 0

# ------------------------------------------------------
# ГЕНЕТИЧЕСКИЙ АЛГОРИТМ: инициализация, отбор, мутация
# ------------------------------------------------------
def create_random_genome(input_size=5, hidden_size=8):
    return {
        'hidden_weights': [[random.uniform(-1, 1) for _ in range(input_size)]
                           for _ in range(hidden_size)],
        'hidden_bias': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_weights': [random.uniform(-1, 1) for _ in range(hidden_size)],
        'output_bias': random.uniform(-1, 1),
        'fitness': 0.0
    }

def mutate(genome, mutation_rate=0.1, mutation_strength=0.5):
    for h in range(len(genome['hidden_weights'])):
        for i in range(len(genome['hidden_weights'][h])):
            if random.random() < mutation_rate:
                genome['hidden_weights'][h][i] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['hidden_bias'])):
        if random.random() < mutation_rate:
            genome['hidden_bias'][h] += random.uniform(-mutation_strength, mutation_strength)
    for h in range(len(genome['output_weights'])):
        if random.random() < mutation_rate:
            genome['output_weights'][h] += random.uniform(-mutation_strength, mutation_strength)
    if random.random() < mutation_rate:
        genome['output_bias'] += random.uniform(-mutation_strength, mutation_strength)

def crossover(genome1, genome2):
    child = create_random_genome()  # случайная инициация, будет перезаписана
    for h in range(len(genome1['hidden_weights'])):
        for i in range(len(genome1['hidden_weights'][h])):
            child['hidden_weights'][h][i] = (genome1['hidden_weights'][h][i]
                                             if random.random() < 0.5
                                             else genome2['hidden_weights'][h][i])
    for h in range(len(genome1['hidden_bias'])):
        child['hidden_bias'][h] = (genome1['hidden_bias'][h]
                                   if random.random() < 0.5
                                   else genome2['hidden_bias'][h])
    for h in range(len(genome1['output_weights'])):
        child['output_weights'][h] = (genome1['output_weights'][h]
                                      if random.random() < 0.5
                                      else genome2['output_weights'][h])
    child['output_bias'] = (genome1['output_bias']
                            if random.random() < 0.5
                            else genome2['output_bias'])
    child['fitness'] = 0.0
    return child

def evolve_population(population, keep_best=5):
    population.sort(key=lambda g: g['fitness'], reverse=True)
    new_population = population[:keep_best]  # Сохраните лучшие геномы
    while len(new_population) < len(population):
        parent1 = random.choice(new_population)
        parent2 = random.choice(new_population)
        child = crossover(parent1, parent2)
        mutate(child)
        new_population.append(child)
    return new_population

# -------------------------------------------
# Сохранение и загрузка лучшей модели, чтобы избежать повторного обучения
# -------------------------------------------
def save_best_model(genome, filename="best_model.json"):
    data = {
        'hidden_weights': genome['hidden_weights'],
        'hidden_bias': genome['hidden_bias'],
        'output_weights': genome['output_weights'],
        'output_bias': genome['output_bias'],
        'fitness': genome['fitness']
    }
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2)

def load_best_model(filename="best_model.json"):
    if not os.path.exists(filename):
        return None
    with open(filename, 'r') as f:
        data = json.load(f)
    return data

# ----------------
# ОСНОВНАЯ ЛОГИКА ОБУЧЕНИЯ
# ----------------
def train_neural_network_data(folder_path):
    # Гиперпараметры
    POP_SIZE = 20
    KEEP_BEST = 5
    input_size = 5
    hidden_size = 8

    # Создайте начальную популяцию или загрузите лучшую модель, если она доступна
    population = [create_random_genome(input_size, hidden_size) for _ in range(POP_SIZE)]
    best_saved = load_best_model()
    if best_saved is not None:
        population[0]['hidden_weights'] = best_saved['hidden_weights']
        population[0]['hidden_bias']    = best_saved['hidden_bias']
        population[0]['output_weights'] = best_saved['output_weights']
        population[0]['output_bias']    = best_saved['output_bias']
        population[0]['fitness']        = 0.0

    # Помощник для разбора строк даты/времени ISO8601 с информацией о часовом поясе
    def get_date_from_time_str(t_str):
        dt = datetime.strptime(t_str, "%Y-%m-%dT%H:%M:%S%z")
        return dt.date()

    csv_files = sorted([f for f in os.listdir(folder_path) if f.endswith('.csv')])

    # Для каждого генома сохраните последний сигнал и соответствующую цену (если есть).
    genome_states = [{'last_signal': None, 'last_price': None} for _ in range(POP_SIZE)]
    current_date = None
    generation_count = 0

    # Обрабатывайте каждый CSV-файл по очереди (поток строка за строкой)
    for csv_file in csv_files:
        file_path = os.path.join(folder_path, csv_file)
        with open(file_path, 'r', newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                row_time_str = row['Time']
                row_date = get_date_from_time_str(row_time_str)

                # Проверьте, начался ли новый день (конец генерации)
                if current_date is None:
                    current_date = row_date
                elif row_date != current_date:
                    # Генерация завершена; выведите статистику для мониторинга
                    generation_count += 1
                    fitnesses = [g['fitness'] for g in population]
                    avg_fitness = sum(fitnesses) / len(fitnesses)
                    best_fitness = max(fitnesses)
                    print(f"Generation {generation_count} | Date: {current_date} | Avg Fitness: {avg_fitness:.2 f} | Best Fitness: {best_fitness:.2 f}")

                    # Эволюция населения для нового поколения
                    population = evolve_population(population, keep_best=KEEP_BEST)

                    # Сброс состояния генома для нового поколения
                    for state in genome_states:
                        state['last_signal'] = None
                        state['last_price'] = None

                    current_date = row_date

                # Подготовить входные данные на лету
                inputs = [
                    float(row['NTP']),
                    float(row['NCP']),
                    float(row['NT']),
                    float(row['NIP']),
                    float(row['N14IP'])
                ]
                bid_price = float(row['BidOpen'])
                ask_price = float(row['AskOpen'])

                # Обработать решение каждого генома для этого ряда
                for i, genome in enumerate(population):
                    raw_output = forward_pass(genome, inputs)
                    signal_val = interpret_output(raw_output)  # -1, 0 или 1
                    prev_signal = genome_states[i]['last_signal']
                    prev_price = genome_states[i]['last_price']

                    # Пропустить обработку, если сигнал не изменился
                    if signal_val == prev_signal:
                        continue

                    genome_states[i]['last_signal'] = signal_val

                    # Применяйте фитнес-логику на переходах сигналов
                    if signal_val == 1:  # Купить сигнал
                        genome_states[i]['last_price'] = ask_price
                        if prev_signal == -1 and prev_price is not None:
                            if ask_price < prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    elif signal_val == -1:  # Сигнал на продажу
                        genome_states[i]['last_price'] = bid_price
                        if prev_signal == 1 and prev_price is not None:
                            if bid_price > prev_price:
                                genome['fitness'] += 1
                            else:
                                genome['fitness'] -= 1
                    else:  # Сигнал удержания
                        genome_states[i]['last_price'] = None

    # После обработки всех CSV-файлов завершите обучение
    population.sort(key=lambda g: g['fitness'], reverse=True)
    best_genome = population[0]
    print("Final Best genome fitness after all files:", best_genome['fitness'])
    save_best_model(best_genome, filename="best_model.json")

if __name__ == "__main__":
    folder_path = r""
    train_neural_network_data(folder_path)
и для MT5
import MetaTrader5 as mt5
import pandas as pd
import numpy as np
import json
import time

def tournament(values):
    """Tournament method to find maximum and minimum in a list."""
    max_val = values[0]
    min_val = values[0]
    for v in values[1:]:
        if v > max_val:
            max_val = v
        if v < min_val:
            min_val = v
    return max_val, min_val

# -------------------------------
# Загрузите лучшую модель из JSON
with open("best_model.json", "r") as f:
    best_model = json.load(f)

def forward_pass(inputs, model):
    """
    Perform a forward pass through a feedforward network with one hidden layer.
    Uses tanh for both hidden and output layers.
    The output is discretized:
       >0.5 --> 1, < -0.5 --> -1, else 0.
    """
    x = np.array(inputs)  # shape (5,)
    hidden_weights = np.array(model["hidden_weights"])  # shape (8, 5)
    hidden_bias = np.array(model["hidden_bias"])          # shape (8,)
    output_weights = np.array(model["output_weights"])    # shape (8,)
    output_bias = model["output_bias"]                    # скаляр

    hidden_input = np.dot(hidden_weights, x) + hidden_bias
    hidden_output = np.tanh(hidden_input)
    output_val = np.dot(output_weights, hidden_output) + output_bias
    output_activation = np.tanh(output_val)
    
    if output_activation > 0.5:
        return 1
    elif output_activation < -0.5:
        return -1
    else:
        return 0

# -------------------------------
# Функции управления заказами с помощью позиций
def get_open_position(symbol):
    """
    Returns information about an open position for the symbol as a dictionary:
    {"ticket": ticket, "type": "buy"/"sell", "volume": volume}.
    If no position is open, returns None.
    """
    positions = mt5.positions_get(symbol=symbol)
    if positions is None or len(positions) == 0:
        return None
    pos = positions[0]  # предполагать только одну открытую позицию на символ
    if pos.type == mt5.POSITION_TYPE_BUY:
        return {"ticket": pos.ticket, "type": "buy", "volume": pos.volume, "symbol": pos.symbol}
    elif pos.type == mt5.POSITION_TYPE_SELL:
        return {"ticket": pos.ticket, "type": "sell", "volume": pos.volume, "symbol": pos.symbol}
    return None

def close_position(position_info):
    """
    Closes the given position using its ticket.
    """
    symbol = position_info["symbol"]
    tick = mt5.symbol_info_tick(symbol)
    # Для закрытия: если открыта BUY, мы продаем по bid; если открыта SELL, мы покупаем по ask.
    if position_info["type"] == "buy":
        price = tick.bid
        order_type = mt5.ORDER_TYPE_SELL
    else:
        price = tick.ask
        order_type = mt5.ORDER_TYPE_BUY

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": position_info["volume"],
        "type": order_type,
        "position": position_info["ticket"],
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "Close position",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print("Close position result:", result)

def place_order(symbol, order_type, volume=0.01):
    """
    Place an order for the symbol.
    - order_type "buy": place BUY order.
    - order_type "sell": place SELL order.
    Before placing, if an open position exists with the opposite signal,
    it is closed using its order ticket.
    """
    # Проверьте текущую открытую позицию:
    current_position = get_open_position(symbol)
    if current_position:
        # Если существующая позиция противоположна новому сигналу, закройте ее.
        if (order_type == "buy" and current_position["type"] == "sell") or \
           (order_type == "sell" and current_position["type"] == "buy"):
            print(f"Opposite position ({current_position['type']}) detected. Closing it first.")
            close_position(current_position)
        # Если тип тот же, ничего не делайте.
        elif (order_type == current_position["type"]):
            print(f"{order_type.upper()} order already open. No new order will be placed.")
            return

    tick = mt5.symbol_info_tick(symbol)
    if order_type == "buy":
        price = tick.ask
        order_type_mt5 = mt5.ORDER_TYPE_BUY
    elif order_type == "sell":
        price = tick.bid
        order_type_mt5 = mt5.ORDER_TYPE_SELL
    else:
        print("Invalid order type:", order_type)
        return

    request = {
        "action": mt5.TRADE_ACTION_DEAL,
        "symbol": symbol,
        "volume": volume,
        "type": order_type_mt5,
        "price": price,
        "deviation": 10,
        "magic": 123456,
        "comment": "NEAT AI trade",
        "type_time": mt5.ORDER_TIME_GTC,
        "type_filling": mt5.ORDER_FILLING_IOC,
    }
    result = mt5.order_send(request)
    print(f"Placed {order_type.upper()} order result:", result)

# -------------------------------
# Инициализация подключения к MetaTrader 5
if not mt5.initialize():
    print("initialize() failed, error code =", mt5.last_error())
    quit()

symbol = "XAUUSDm"
timeframe = mt5.TIMEFRAME_M1  # 1-минутный таймфрейм

if not mt5.symbol_select(symbol, True):
    print("Failed to select symbol:", symbol)
    mt5.shutdown()
    quit()

# Непрерывный цикл мониторинга
try:
    while True:
        print("\n" + "="*80)
        print("Retrieving candle data...")
        num_candles = 15
        rates = mt5.copy_rates_from_pos(symbol, timeframe, 1, num_candles)
        if rates is None:
            print("Failed to get rates for", symbol)
        else:
            df = pd.DataFrame(rates)
            df['tick_count'] = df['tick_volume']
            df['9ma'] = df['close'].rolling(window=9).mean()
            df['14ma'] = df['close'].rolling(window=14).mean()
            df['TP'] = (df['high'] + df['low'] + df['close']) / 3
            df['NTP']   = (df['TP']    - df['low']) / (df['high'] - df['low'])
            df['NIP9']  = (df['9ma']   - df['low']) / (df['high'] - df['low'])
            df['NIP14'] = (df['14ma']  - df['low']) / (df['high'] - df['low'])
            close_prices = list(df['close'].tail(14))
            HC, LC = tournament(close_prices)
            tick_counts = list(df['tick_count'].tail(14))
            HT, LT = tournament(tick_counts)
            df['NCP'] = (df['close'] - LC) / (HC - LC)
            df['NT']  = (df['tick_count'] - LT) / (HT - LT)
            df = df.round(3)
            
            # Вывести нормализованные данные последней свечи
            display_cols = ['high', 'low', 'close', 'tick_count', '9ma', '14ma', 'TP', 'NTP', 'NCP', 'NT', 'NIP9', 'NIP14']
            print("\nNormalized Candle Data (Last Candle):")
            print(df[display_cols].tail(1).to_string(index=False))
            
            # Извлечение входных данных из последней свечи
            last_row = df.iloc[-1]
            input_vector = [
                last_row['NTP'],
                last_row['NCP'],
                last_row['NT'],
                last_row['NIP9'],
                last_row['NIP14']
            ]
            
            print("\nExtracted Inputs:")
            print(f"NTP = {last_row['NTP']}, NCP = {last_row['NCP']}, NT = {last_row['NT']}, NIP9 = {last_row['NIP9']}, NIP14 = {last_row['NIP14']}")
            print("Price Data - High:", last_row['high'], "Low:", last_row['low'], "Close:", last_row['close'])
            
            # Вычислить сигнал сети
            signal = forward_pass(input_vector, best_model)
            print("\nNetwork Signal (discretized):", signal)
            
            # Выполнение сделки на основе сигнала с использованием нашей логики управления ордерами
            if signal == 1:
                print("Received BUY signal.")
                place_order(symbol, "buy", volume=0.01)
            elif signal == -1:
                print("Received SELL signal.")
                place_order(symbol, "sell", volume=0.01)
            else:
                print("Signal is neutral. No action taken.")
        
        # Подождите до следующей минутной границы
        tick = mt5.symbol_info_tick(symbol)
        if tick is None:
            print("Failed to get tick info for", symbol)
            break
        server_time = tick.time
        next_minute = ((server_time // 60) + 1) * 60
        sleep_seconds = next_minute - server_time
        print(f"Sleeping for {sleep_seconds} seconds until next candle...")
        time.sleep(sleep_seconds)
except KeyboardInterrupt:
    print("Stopping continuous monitoring.")
finally:
    mt5.shutdown()
 
Очень хорошо.
 
Multi Dead нейронных сетей и создал модель с помощью чата GPT, потому что не умею программировать. У меня ушло две недели на нормализацию данных и два дня на создание модели, но на обучение ушел всего час, 1300 поколений, 20 геномов на поколение, мой 5-летний подержанный ноутбук горел, и, когда я подключил модель к MT5, модель была очень агрессивной и очень точно предсказывала следующие свечи, но она не приносила прибыли. Но было интересно учиться и видеть предсказания модели, поэтому я и пришел сюда, чтобы узнать больше об искусственном интеллекте, и вот код NEAT AI


и для МТ5

Нейронные сети - это действительно интересная и увлекательная тема. Однако я взял паузу в объяснении, так как решил сначала доработать реплейсер/симулятор. Однако как только я закончу публиковать статьи по симулятору, мы вернемся с новыми статьями о нейронных сетях. Цель всегда состоит в том, чтобы показать, как они работают под капотом. Большинство людей думают, что это волшебные коды, что не так. Но это все равно интересная и занимательная тема. Я даже подумываю смоделировать что-нибудь, чтобы все могли увидеть, как нейронная сеть обучается без наблюдения или предварительных данных. Это очень интересно и может помочь в понимании некоторых вещей. Деталь: весь мой код будет написан на MQL5. И раз уж вы сказали, что вы не программист. Как насчет того, чтобы выучить MQL5 и начать реализовывать свои собственные решения? Я пишу серию статей, ориентированных на таких людей, как вы. Последнюю из них можно посмотреть здесь: https: //www.mql5.com/pt/articles/15833. В этой серии я объясняю все с самых азов. Так что если вы совсем ничего не знаете о программировании, вернитесь к первой статье цикла. Ссылки на предыдущие статьи всегда будут находиться в начале статьи.

Do básico ao intermediário: Indicador (IV)
Do básico ao intermediário: Indicador (IV)
  • www.mql5.com
Neste artigo, vermos como é fácil de criar e implementar uma metodologia operacional, visando colorir candles. Sendo este um conceito, que diversos operadores apreciam imensamente. Porém, é preciso se tomar cuidado ao implementar tal tipo de coisa. Isto para que as barras, ou candles, mantenham a sua aparência original. Visando assim não prejudicar a leitura que muitos operadores fazem candle a candle.
 
Большое спасибо за это представление!