Português
preview
Многомодульный торговый робот на Python и MQL5 (Часть I): Создание базовой архитектуры и первых модулей

Многомодульный торговый робот на Python и MQL5 (Часть I): Создание базовой архитектуры и первых модулей

MetaTrader 5Торговые системы | 19 декабря 2024, 13:31
901 0
Yevgeniy Koshtenko
Yevgeniy Koshtenko

Введение

Однажды меня осенила идея: торговые роботы слишком просты для современного рынка, нужно что-то более гибкое и умное.

Рынок постоянно меняется. Сегодня работает одна стратегия, а завтра она уже бесполезна. Я долго наблюдал за этим и понял, что нужен совершенно новый подход. Решение пришло неожиданно. А что если сделать систему модульной? Представьте команду профессионалов: один следит за трендами, второй анализирует объемы торгов, третий контролирует риски. Именно так и должен работать современный торговый робот!

Выбор технологий был очевиден. Python идеально подходил для анализа данных — с его библиотеками можно творить чудеса. MQL5 взял на себя исполнение сделок. Получился отличный тандем. Мы начали с малого: сначала создали прочную основу — архитектуру, способную расти и развиваться, потом добавили взаимодействие между Python и MQL5. Система управления данными получилась на удивление простой и эффективной.

Асинхронность стала настоящим прорывом! Теперь робот мог одновременно следить за множеством инструментов. Производительность взлетела до небес.

Знаете, что самое интересное? Эта система реально работает на рынке. Не просто пример из учебника, а боевой инструмент. Конечно, мы начнем с базовой версии, но даже она — впечатляет. Впереди у нас — большое путешествие. Мы создадим систему, способную учиться и адаптироваться. Будем улучшать её шаг за шагом. А пока, давайте начнем с самого важного — с построения надежного фундамента.


Базовая архитектура системы. В поисках идеального баланса

Три года я бился над созданием торговых роботов. И знаете, что понял? Главное, это не сами алгоритмы, а то, как они работают вместе. Это открытие изменило все.

Представьте симфонический оркестр. Каждый музыкант великолепен, но без дирижера музыки не будет. В моей системе таким дирижером стал MarketMaker. Он управляет четырьмя модулями, и каждый из них знает свое дело:

  • Первый модуль следит за объемами торгов: когда и по каким ценам проходят сделки.
  • Второй модуль ищет арбитражные возможности.
  • Третий модуль анализирует экономику. 
  • Четвертый модуль не дает системе увлечься и контролирует риски.

Рынок никого не ждет. Он меняется молниеносно, поэтому все модули работают одновременно, постоянно общаясь друг с другом. Увидел арбитражный модуль возможность и сообщил центру. Тот проверил информацию у других модулей и принял решение.

Сначала я думал сделать жесткие правила входа в рынок. Но жизнь быстро показала, что так нельзя. Иногда один сильный сигнал важнее нескольких слабых. А сколько времени ушло на организацию данных! У каждого модуля своя информация: котировки, макропоказатели, история сделок. Все это нужно хранить, обновлять, делиться с другими. Пришлось создать особую систему синхронизации.

Забавно, но чем независимее были модули, тем лучше работала система. Сбой одного компонента не останавливал остальные. А сбои случаются: то связь пропадет, то котировки зависнут. Главное достоинство такой архитектуры — её можно расширять. Хотите добавить анализ новостей? Нет проблем! Создаете модуль, подключаете к MarketMaker — и все работает.

Система живет и развивается. Она не идеальна, но её основа — модульность, параллельная работа и гибкость — позволяет уверенно смотреть в будущее. Скоро расскажу подробнее о каждом компоненте.


Основной класс системы

После долгих экспериментов с различными подходами к архитектуре торговых роботов, я пришел к пониманию, что успех системы во многом зависит от того, насколько хорошо организовано её ядро. MarketMaker стал результатом этого понимания, воплотив в себе все лучшие практики, которые я накопил за годы разработки алгоритмических систем.

Начнем с базовой структуры класса. Вот как выглядит его инициализация:

def __init__(self, pairs: List[str], terminal_path: str, 
             volume: float = 1.0, levels: int = 5, spacing: float = 3.0):
    # Основные параметры
    self.pairs = pairs
    self.base_volume = volume
    self.levels = levels
    self.spacing = spacing
    self.magic = 12345
    
    # Торговые параметры
    self.portfolio_iterations = 10
    self.leverage = 50
    self.min_profit_pips = 1.0
    self.max_spread_multiplier = 2.0
    
    # Хранилища данных
    self.symbols_info = {}
    self.trading_parameters = {}
    self.optimal_horizons = {}

На первый взгляд, всё выглядит довольно просто. Но за каждым параметром стоит своя история. Взять, например, portfolio_iterations — этот параметр появился после того, как я заметил, что слишком агрессивное открытие позиций может привести к проблемам с ликвидностью. Теперь система разбивает доступный объем на части, что делает торговлю более сбалансированной.

Особое внимание я уделил инициализации исторических данных. Вот как это работает:

def _initialize_history(self, pair: str):
    """Инициализация исторических данных для пары"""
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is not None:
            df = pd.DataFrame(rates)
            df['time'] = pd.to_datetime(df['time'], unit='s')
            df.set_index('time', inplace=True)
            returns = np.log(df['close'] / df['close'].shift(1)).dropna()
            self.returns_history[pair] = pd.Series(returns.values, 
                                                 index=df.index[1:])
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

Интересный момент здесь — использование логарифмических доходностей вместо простых процентных изменений. Это не случайный выбор. На практике я обнаружил, что логарифмические доходности дают более стабильные результаты при расчете статистических показателей, особенно когда речь идет о волатильности.

Одним из самых сложных моментов оказалась реализация прогнозирования объемов. После множества экспериментов родился такой код:

async def update_volume_predictions(self):
    """Обновление предсказаний объема для каждой пары"""
    for pair in self.pairs:
        try:
            df = volume_model.get_volume_data(
                symbol=pair,
                timeframe=mt5.TIMEFRAME_H1,
                n_bars=100
            )
            
            if pair in self.volume_models:
                feature_columns = [
                    'volume_sma_5', 'volume_sma_20', 'relative_volume', 
                    'volume_change', 'volume_volatility', 'price_sma_5', 
                    'price_sma_20', 'price_change', 'price_volatility',
                    'rsi', 'macd', 'macd_signal', 'bb_upper', 'bb_lower'
                ]
                
                X = df[feature_columns].iloc[-1:].copy()
                prediction = self.volume_models[pair].predict(X)[0]
                current_price = df['close'].iloc[-1]
                predicted_change = (prediction - current_price) / current_price
                
                self.volume_predictions[pair] = predicted_change
                
        except Exception as e:
            logger.error(f"Error updating prediction for {pair}: {e}")

Обратите внимание на набор признаков — это не просто случайный набор индикаторов. Каждый из них добавлялся постепенно, после тщательного тестирования. Например, относительный объем (relative_volume) оказался особенно полезен для определения аномальной активности на рынке.

А вот как выглядит сердце системы — торговый цикл:

async def trade_cycle(self):
    """Основной торговый цикл"""
    try:
        await self.update_volume_predictions()
        await self.economic_module.update_forecasts()
        
        all_positions = mt5.positions_get() or []
        open_positions = [pos for pos in all_positions if pos.magic == self.magic]
        
        if open_positions:
            await self.manage_positions()
            return
            
        valid_signals = []
        available_volume = self.calculate_available_volume() * len(self.pairs)
        
        for pair in self.pairs:
            signal = await self.get_combined_signal(pair)
            if signal and self._validate_signal(signal):
                valid_signals.append(signal)
        
        if valid_signals:
            volume_per_trade = available_volume / len(valid_signals)
            for signal in valid_signals:
                signal['adjusted_volume'] = volume_per_trade
                await self.place_order(signal)
                
    except Exception as e:
        logger.error(f"Error in trade cycle: {e}")

Этот код — результат долгих размышлений о том, как правильно организовать торговый процесс. Асинхронная природа цикла позволяет эффективно обрабатывать множество пар одновременно, а четкая последовательность действий (обновление прогнозов → проверка позиций → поиск сигналов → исполнение) обеспечивает предсказуемое поведение системы.

Отдельного внимания заслуживает механизм валидации сигналов:

def _validate_signal(self, signal: Dict) -> bool:
    """Проверка торгового сигнала"""
    spread = signal['spread']
    diff_pips = signal['diff_pips']
    
    # Базовые проверки
    if spread > self.max_spread_multiplier * diff_pips:
        return False
        
    if diff_pips < self.min_profit_pips:
        return False
        
    # Проверка экономических факторов
    if signal['economic_volatility'] > self.volatility_threshold:
        return False
        
    # Проверка предсказания объема
    if abs(signal['volume_prediction']) < self.min_volume_change:
        return False
        
    return True

Каждая проверка здесь — это результат реального торгового опыта. Например, проверка экономической волатильности была добавлена после того, как я заметил, что торговля во время важных новостей часто приводит к повышенным убыткам, из-за резких движений цены.

В завершение, хочу отметить, что MarketMaker — это живая система, которая продолжает эволюционировать. Каждый день торговли приносит новые идеи и улучшения. Модульная архитектура позволяет легко внедрять эти улучшения, не нарушая работу основных компонентов.


Работа с данными

Работа с данными всегда была одним из самых сложных аспектов алгоритмической торговли. Помню, как в начале разработки я столкнулся с, казалось бы, простым вопросом: как правильно организовать хранение и обработку рыночной информации? Быстро стало понятно, что обычной базы данных или простых массивов будет недостаточно.

Всё началось с создания базовой структуры для получения данных. После нескольких итераций родился такой метод:

def _initialize_history(self, pair: str):
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is None:
            logger.error(f"Failed to get history data for {pair}")
            return
            
        df = pd.DataFrame(rates)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        df.set_index('time', inplace=True)
        
        # Рассчитываем логарифмические доходности
        returns = np.log(df['close'] / df['close'].shift(1)).dropna()
        
        # Добавляем дополнительные метрики
        df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
        df['price_velocity'] = df['close'].diff() / df['time'].diff().dt.total_seconds()
        df['volume_intensity'] = df['tick_volume'] / df['time'].diff().dt.total_seconds()
        
        self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:])
        self.price_data[pair] = df
        
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

Интересный момент здесь — расчёт "скорости" изменения цены (price_velocity) и интенсивности объёма (volume_intensity). Эти метрики появились не сразу. Изначально я работал только с обычными ценовыми данными, но быстро понял, что рынок — это не просто последовательность цен, это сложная динамическая система, где важна не только величина изменений, но и скорость этих изменений.

Особое внимание пришлось уделить обработке пропущенных данных. Вот как выглядит система валидации и очистки:

def _validate_and_clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """Валидация и очистка данных"""
    if df.empty:
        raise ValueError("Empty dataset received")
        
    # Проверяем пропуски
    missing_count = df.isnull().sum()
    if missing_count.any():
        logger.warning(f"Found missing values: {missing_count}")
        
        # Для цен используем forward fill
        price_cols = ['open', 'high', 'low', 'close']
        df[price_cols] = df[price_cols].ffill()
        
        # Для объёмов - интерполяцию
        df['tick_volume'] = df['tick_volume'].interpolate(method='linear')
    
    # Проверяем выбросы
    for col in ['high', 'low', 'close']:
        zscore = stats.zscore(df[col])
        outliers = abs(zscore) > 3
        if outliers.any():
            logger.warning(f"Found {outliers.sum()} outliers in {col}")
            
            # Заменяем экстремальные выбросы
            df.loc[outliers, col] = df[col].mean() + 3 * df[col].std() * np.sign(zscore[outliers])
    
    return df

Помню случай, когда пропуск всего одного тика привёл к некорректному расчёту индикаторов и, как следствие, к неверному торговому сигналу. После этого, система очистки данных была существенно доработана.

А вот как мы работаем с объёмами — одной из самых важных характеристик рынка:

def analyze_volume_profile(self, pair: str, window: int = 100) -> Dict:
    """Анализ профиля объёма"""
    try:
        df = self.price_data[pair].copy().last(window)
        
        # Нормализуем объёмы
        volume_mean = df['tick_volume'].rolling(20).mean()
        volume_std = df['tick_volume'].rolling(20).std()
        df['normalized_volume'] = (df['tick_volume'] - volume_mean) / volume_std
        
        # Рассчитываем кластеры объёма
        price_levels = pd.qcut(df['close'], q=10)
        volume_clusters = df.groupby(price_levels)['tick_volume'].sum()
        
        # Находим уровни поддержки/сопротивления по объёму
        significant_levels = volume_clusters[volume_clusters > volume_clusters.mean() + volume_clusters.std()]
        
        # Анализируем дисбалансы
        buy_volume = df[df['close'] > df['open']]['tick_volume'].sum()
        sell_volume = df[df['close'] <= df['open']]['tick_volume'].sum()
        volume_imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume)
        
        return {
            'normalized_profile': volume_clusters.to_dict(),
            'significant_levels': significant_levels.index.to_list(),
            'volume_imbalance': volume_imbalance,
            'current_percentile': stats.percentileofscore(df['tick_volume'], df['tick_volume'].iloc[-1])
        }
        
    except Exception as e:
        logger.error(f"Error analyzing volume profile: {e}")
        return None

Этот код — результат длительного изучения микроструктуры рынка. Особенно интересен расчёт дисбаланса объёмов между покупками и продажами. Я изначально изучал это на крипто-рынке, не знаю, даст ли добро администрация MQL5 на публикацию кода с интеграцией криптобиржи, MetaTrader 5, и Python....

Ну да ладно, я отвлекся. На первый взгляд может показаться, что простое сравнение объёмов на растущих и падающих барах не даст полезной информации. Но практика показала, что этот простой индикатор часто предупреждает о готовящемся развороте тренда.

Отдельная история — работа с экономическими данными. Здесь пришлось создать целую систему синхронизации:

async def synchronize_market_data(self):
    """Синхронизация рыночных данных"""
    while True:
        try:
            # Обновляем базовые данные
            for pair in self.pairs:
                latest_data = await self._get_latest_ticks(pair)
                if latest_data is not None:
                    self._update_price_data(pair, latest_data)
                    
            # Обновляем производные метрики
            await self._update_derivatives()
            
            # Проверяем целостность данных
            self._verify_data_integrity()
            
            await asyncio.sleep(1)  # Динамическая задержка
            
        except Exception as e:
            logger.error(f"Error in data synchronization: {e}")
            await asyncio.sleep(5)  # Увеличенная задержка при ошибке

Здесь ключевой момент — асинхронность обновления данных. В ранних версиях системы я использовал синхронные запросы, но это создавало задержки при обработке большого количества пар. Переход на асинхронную модель позволил значительно повысить производительность.

В завершение хочу отметить, что правильная организация работы с данными — это не просто технический вопрос. Это фундамент, на котором строится вся торговая стратегия. Чистые, хорошо структурированные данные позволяют увидеть те закономерности рынка, которые остаются скрытыми при поверхностном анализе.


Первый модуль: анализ объемов

История создания модуля анализа объемов началась с простого наблюдения: классические индикаторы часто запаздывают, потому что работают только с ценами. Но рынок — это не только цены, это еще и объемы торгов, которые часто предсказывают движение котировок. Именно поэтому первым модулем нашей системы стал анализатор объемов.

Начнем с базовой функции получения данных:

def get_volume_data(symbol, timeframe=mt5.TIMEFRAME_H1, n_bars=2000):
    """Получение данных объема и цен с MT5"""
    try:
        bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, n_bars)
        if bars is None:
            logger.error(f"Failed to get data for {symbol}")
            return None
        
        df = pd.DataFrame(bars)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        return df
        
    except Exception as e:
        logger.error(f"Error getting data for {symbol}: {e}")
        return None

На первый взгляд, функция выглядит просто. Но за этой простотой скрывается важное решение: мы берем именно 2000 баров истории. Почему? Экспериментальным путем я выяснил, что этого достаточно для построения качественной модели, но при этом не создает излишней нагрузки на память сервера в случае обучения очень уж крупных моделей, с крупными размерностями датасетов и входом признаков, как последовательностей батчей.

Самая интересная часть модуля — создание признаков для анализа. Вот как это работает:

def create_features(df, forecast_periods=None):
    """Создание признаков для модели прогнозирования"""
    try:
        # Базовые объемные показатели
        df['volume_sma_5'] = df['tick_volume'].rolling(window=5).mean()
        df['volume_sma_20'] = df['tick_volume'].rolling(window=20).mean()
        df['relative_volume'] = df['tick_volume'] / df['volume_sma_20']
        
        # Динамика объема
        df['volume_change'] = df['tick_volume'].pct_change()
        df['volume_acceleration'] = df['volume_change'].diff()
        
        # Волатильность объема
        df['volume_volatility'] = df['tick_volume'].rolling(window=20).std()
        df['volume_volatility_5'] = df['tick_volume'].rolling(window=5).std()
        df['volume_volatility_ratio'] = df['volume_volatility_5'] / df['volume_volatility']

Особое внимание здесь стоит обратить на volume_volatility_ratio. Этот показатель появился после того, как я заметил интересную закономерность: перед сильными движениями краткосрочная волатильность объемов часто начинает расти быстрее долгосрочной. Этот индикатор стал одним из ключевых в определении потенциальных точек входа.

Интересная история связана с расчетом профиля объема:

# Профиль объема
        df['volume_percentile'] = df['tick_volume'].rolling(window=100).apply(
            lambda x: pd.Series(x).rank(pct=True).iloc[-1]
        )
        df['volume_density'] = df['tick_volume'] / (df['high'] - df['low'])
        df['volume_density_ma'] = df['volume_density'].rolling(window=20).mean()
        df['cumulative_volume'] = df['tick_volume'].rolling(window=20).sum()
        df['volume_ratio'] = df['tick_volume'] / df['cumulative_volume']

Показатель volume_density появился не случайно. Я заметил, что объем сам по себе может быть обманчив — важно учитывать, на каком ценовом диапазоне он был набран. Высокий объем на узком диапазоне цен часто говорит о формировании важного уровня поддержки или сопротивления.

Для прогнозирования направления движения цены я разработал специальную функцию:

def predict_direction(model, X):
    """Прогноз направления движения цены"""
    try:
        prediction = model.predict(X)[0]
        current_price = X['close'].iloc[-1] if 'close' in X else None
        if current_price is None:
            return 0
            
        # Возвращаем 1 для роста, -1 для падения, 0 для нейтрального
        price_change = (prediction - current_price) / current_price
        if abs(price_change) < 0.0001:  # Минимальный порог изменения
            return 0
        return 1 if price_change > 0 else -1
        
    except Exception as e:
        logger.error(f"Error predicting direction: {e}")
        return 0

Обратите внимание на порог изменения 0.0001. Это не случайное число — оно выбрано, исходя из анализа минимального движения, которое может быть отработано с учетом спреда и комиссий разного рода. Для фондового рынка показатель нужно подбирать отдельно.

Финальный этап — обучение модели:

def train_model(X_train, X_test, y_train, y_test, model_params=None):
    try:
        if model_params is None:
            model_params = {'n_estimators': 400, 'random_state': 42}
            
        model = RandomForestRegressor(**model_params)
        model.fit(X_train, y_train)
        
        # Оценка модели
        train_predictions = model.predict(X_train)
        test_predictions = model.predict(X_test)
        
        train_rmse = np.sqrt(mean_squared_error(y_train, train_predictions))
        test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions))
        test_r2 = r2_score(y_test, test_predictions)

Знаете, почему я выбрал именно RandomForest с 400 деревьями? Перепробовав кучу всего — от простой регрессии до потрясающих по сложности архитектуры нейронок, я пришел к выводу, что этот метод — самый надежный. Не самый точный, может быть, зато — стабильный. Рынок шумит, дергается, а RandomForest держится молодцом.

Это только начало, конечно. Дальше будет интереснее: как связать все эти сигналы воедино, как настроить систему, чтобы она сама училась на ходу? Но об этом в следующий раз.


Управление рисками: искусство сохранения капитала

А теперь о самом важном — о рисках. Забавно слушать, как все обсуждают крутые стратегии и нейронки. За десять лет на рынке я понял главное — без контроля рисков все эти стратегии ничего не стоят. Можно иметь суперторговый алгоритм, но без грамотного риск-менеджмента все равно останешься в минусе.

Поэтому в нашей системе, защита капитала — это святое. И знаете что? Именно этот консервативный подход позволяет нам стабильно зарабатывать, пока другие теряют деньги на "идеальных" стратегиях.

def calculate_available_volume(self) -> float:
    try:
        account = mt5.account_info()
        if not account:
            return 0.01
            
        # Используем баланс и свободную маржу
        balance = account.balance
        free_margin = account.margin_free
        
        # Берем минимальное значение для безопасности
        available_margin = min(balance, free_margin)
        
        # Рассчитываем максимальный объем с учетом маржи
        margin_ratio = 0.1  # Используем только 10% доступной маржи
        base_volume = (available_margin * margin_ratio) / 1000
        
        # Ограничиваем максимальным объемом
        max_volume = min(base_volume, 1.0)  # максимум 1 лот

Обратите внимание на margin_ratio = 0.1 . Это неслучайное число. После нескольких месяцев тестирования, я пришел к выводу, что использование более 10% доступной маржи значительно увеличивает риск маржин-колла во время сильных движений рынка. Особенно это критично при торговле несколькими парами одновременно.

Следующий важный момент — расчет стоп-лоссов и тейк-профитов:

async def calculate_position_limits(self, signal: Dict) -> Tuple[float, float]:
    try:
        pair = signal['pair']
        direction = signal['direction']
        
        # Получаем волатильность
        volatility = signal['price_volatility']
        economic_volatility = signal['economic_volatility']
        
        # Базовые значения в пипсах
        base_sl = 20
        base_tp = 40
        
        # Корректируем на волатильность
        volatility_factor = 1 + (volatility * 2)
        sl_points = base_sl * volatility_factor
        tp_points = base_tp * volatility_factor
        
        # Учитываем экономическую волатильность
        if economic_volatility > 0.5:
            sl_points *= 1.5
            tp_points *= 1.2
            
        # Проверяем минимальные дистанции
        info = self.symbols_info[pair]
        min_stop_level = info.trade_stops_level if hasattr(info, 'trade_stops_level') else 0
        
        return max(sl_points, min_stop_level), max(tp_points, min_stop_level)
        
    except Exception as e:
        logger.error(f"Error calculating position limits: {e}")
        return 20, 40  # возвращаем базовые значения в случае ошибки

История с volatility_factor особенно интересная. Изначально я использовал фиксированные стопы, но быстро заметил, что в периоды повышенной волатильности они часто срабатывали слишком рано, их сбивает очень часто. Динамическая корректировка стопов на основе текущей волатильности значительно улучшила результаты торговли.

А вот как выглядит система управления позициями:

async def manage_positions(self):
    """Управление открытыми позициями"""
    try:
        positions = mt5.positions_get() or []
        for position in positions:
            if position.magic == self.magic:
                # Проверяем время в позиции
                time_in_trade = datetime.now() - pd.to_datetime(position.time, unit='s')
                
                # Получаем текущие рыночные данные
                signal = await self.get_combined_signal(position.symbol)
                
                # Проверяем необходимость модификации позиции
                if self._should_modify_position(position, signal, time_in_trade):
                    await self._modify_position(position, signal)
                    
                # Проверяем условия закрытия
                if self._should_close_position(position, signal, time_in_trade):
                    await self.close_position(position)
                    
    except Exception as e:
        logger.error(f"Error managing positions: {e}")

Особое внимание здесь уделено времени нахождения в позиции. Практика показала, что чем дольше открыта позиция, тем выше должны быть требования к её сохранению. Это реализовано через динамическое ужесточение условий удержания позиции с течением времени.

Интересный момент связан с частичным закрытием позиций:

def calculate_partial_close(self, position, profit_threshold: float = 0.5) -> float:
    """Расчет объема для частичного закрытия"""
    try:
        # Проверяем текущую прибыль
        if position.profit <= 0:
            return 0.0
            
        profit_ratio = position.profit / (position.volume * 1000)  # примерная оценка ROI
        
        if profit_ratio >= profit_threshold:
            # Закрываем половину позиции при достижении порога прибыли
            return position.volume * 0.5
        return 0.0
        
    except Exception as e:
        logger.error(f"Error calculating partial close: {e}")
        return 0.0

Эта функция появилась после анализа сделок. Я заметил, что частичное закрытие позиций при достижении определенного уровня прибыли значительно улучшает общую статистику торговли. Это позволяет фиксировать часть прибыли, одновременно оставляя потенциал для дальнейшего роста.

В завершение хочу отметить, что система риск-менеджмента — это живой организм, который постоянно эволюционирует. Каждая неудачная сделка, каждое неожиданное движение рынка — это новый опыт, который мы используем для улучшения алгоритмов защиты капитала. В следующих версиях системы планируется добавить машинное обучение для динамической оптимизации параметров риск-менеджмента, а также гибрид VaR-системы и портфельной теории Марковица, но это уже совсем другая история....


Экономический модуль: когда фундаментальный анализ встречается с машинным обучением

В процессе работы над торговой системой я заметил интересную закономерность: даже самые сильные технические сигналы могут давать сбой, если они противоречат фундаментальным факторам. Именно это наблюдение привело к созданию экономического модуля  — компонента, который анализирует макроэкономические показатели и их влияние на движение валютных пар.

Начнем с базовой структуры модуля. Вот как выглядит инициализация основных экономических индикаторов:

def __init__(self):
    self.indicators = {
        'NY.GDP.MKTP.KD.ZG': 'GDP growth',
        'FP.CPI.TOTL.ZG': 'Inflation',
        'FR.INR.RINR': 'Real interest rate',
        'NE.EXP.GNFS.ZS': 'Exports',
        'NE.IMP.GNFS.ZS': 'Imports',
        'BN.CAB.XOKA.GD.ZS': 'Current account balance',
        'GC.DOD.TOTL.GD.ZS': 'Government debt',
        'SL.UEM.TOTL.ZS': 'Unemployment rate',
        'NY.GNP.PCAP.CD': 'GNI per capita',
        'NY.GDP.PCAP.KD.ZG': 'GDP per capita growth'
    }

Выбор этих индикаторов неслучаен. После анализа тысяч сделок я заметил, что именно эти показатели имеют наибольшее влияние на долгосрочные тренды валютных пар. Особенно интересна связь между реальной процентной ставкой и движением валюты — часто изменение этого показателя предшествует развороту тренда.

Для получения экономических данных я разработал специальный метод:

def fetch_economic_data(self):
    data_frames = []
    for indicator, name in self.indicators.items():
        try:
            data_frame = wbdata.get_dataframe({indicator: name}, country='all')
            data_frames.append(data_frame)
        except Exception as e:
            logger.error(f"Error fetching data for indicator '{indicator}': {e}")

    if data_frames:
        self.economic_data = pd.concat(data_frames, axis=1)
        return self.economic_data

Интересный момент здесь — использование библиотеки wbdata для получения данных Всемирного банка. Я выбрал этот источник после экспериментов с различными API, так как он предоставляет наиболее полные и верифицированные данные.

Особое внимание я уделил подготовке данных для анализа:

def prepare_data(self, symbol_data):
    data = symbol_data.copy()
    data['close_diff'] = data['close'].diff()
    data['close_corr'] = data['close'].rolling(window=30).corr(data['close'].shift(1))

    for indicator in self.indicators.keys():
        if indicator in self.economic_data.columns:
            data[indicator] = self.economic_data[indicator].ffill()

    data.dropna(inplace=True)
    return data

Обратите внимание на использование forward fill для экономических показателей. Это решение появилось не сразу — сначала я пробовал интерполяцию, но выяснилось, что для экономических данных более корректно использовать последнее известное значение.

Сердце модуля — система прогнозирования:

def forecast(self, symbol, symbol_data):
    if len(symbol_data) < 50:
        return None, None

    X = symbol_data.drop(columns=['close'])
    y = symbol_data['close']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

    model = CatBoostRegressor(iterations=1000, learning_rate=0.1, depth=8, loss_function='RMSE')
    model.fit(X_train, y_train, verbose=False)

Выбор CatBoost, как алгоритма машинного обучения, тоже неслучаен. После экспериментов с различными моделями (от простой линейной регрессии до сложных нейронных сетей) оказалось, что именно CatBoost лучше всего справляется с нерегулярной природой экономических данных.

Финальный этап — интерпретация результатов:

def interpret_results(self, symbol):
    forecast = self.forecasts.get(symbol)
    importance_df = self.feature_importances.get(symbol)

    if forecast is None or importance_df is None:
        return f"Insufficient data for interpretation of {symbol}"

    trend = "upward" if forecast[-1] > forecast[0] else "downward"
    volatility = "high" if forecast.std() / forecast.mean() > 0.1 else "low"
    top_feature = importance_df.iloc[0]['feature']

Особенно интересен расчет волатильности. Порог в 0.1 для определения высокой волатильности был выбран после анализа исторических данных. Оказалось, что это значение хорошо разделяет периоды спокойного рынка и повышенной турбулентности.

В процессе работы над модулем я сделал интересное наблюдение: экономические факторы часто действуют с задержкой, но их влияние более устойчиво, чем влияние технических факторов. Это привело к созданию системы весов, где значимость экономических сигналов увеличивается на более длительных таймфреймах.

Конечно, экономический модуль — это не волшебная палочка, и он не может предсказать все движения рынка. Но в сочетании с техническим анализом и анализом объемов, он дает дополнительное измерение для понимания рыночных процессов. В следующих версиях системы планируется добавить анализ новостных потоков и их влияния на экономические показатели, но это уже тема для отдельного разговора.


Арбитражный модуль: в поисках истины в цене

Идея создания арбитражного модуля пришла ко мне после долгих наблюдений за валютным рынком. Я заметил интересную закономерность: часто реальные цены валютных пар отклоняются от их теоретической стоимости, рассчитанной через кросс-курсы. Эти отклонения создают возможности для арбитража, но что более важно — они могут служить индикатором будущего движения цены.

Начнем с базовой структуры модуля:

class ArbitrageModule:
    def __init__(self, terminal_path: str = "C:/Program Files/RannForex MetaTrader 5/terminal64.exe", max_trades: int = 10):
        self.terminal_path = terminal_path
        self.MAX_OPEN_TRADES = max_trades
        self.symbols = [
            "AUDUSD.ecn", "AUDJPY.ecn", "CADJPY.ecn", "AUDCHF.ecn", "AUDNZD.ecn", 
            "USDCAD.ecn", "USDCHF.ecn", "USDJPY.ecn", "NZDUSD.ecn", "GBPUSD.ecn", 
            "EURUSD.ecn", "CADCHF.ecn", "CHFJPY.ecn", "NZDCAD.ecn", "NZDCHF.ecn", 
            "NZDJPY.ecn", "GBPCAD.ecn", "GBPCHF.ecn", "GBPJPY.ecn", "GBPNZD.ecn", 
            "EURCAD.ecn", "EURCHF.ecn", "EURGBP.ecn", "EURJPY.ecn", "EURNZD.ecn"
        ]

Особое внимание стоит обратить на структуру базовых пар для расчета кросс-курсов:

     self.usd_pairs = {
            "EUR": "EURUSD.ecn",
            "GBP": "GBPUSD.ecn", 
            "AUD": "AUDUSD.ecn",
            "NZD": "NZDUSD.ecn",
            "USD": None,
            "CAD": ("USDCAD.ecn", True),
            "CHF": ("USDCHF.ecn", True),
            "JPY": ("USDJPY.ecn", True)
        }

Здесь интересный момент: некоторые пары помечены как инверсные (True). Это неслучайно — для некоторых валют, таких как CAD, CHF и JPY, базовой котировкой является USD/XXX, а не XXX/USD. Это важный нюанс, который часто упускают при расчете кросс-курсов.

Сердце модуля — функция расчета синтетических цен:

def calculate_synthetic_prices(self, data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """Расчет синтетических цен через кросс-курсы"""
    synthetic_prices = {}
    
    try:
        for symbol in self.symbols:
            base = symbol[:3]
            quote = symbol[3:6]
            
            # Рассчитываем синтетическую цену через кросс-курсы
            fair_price = self.calculate_cross_rate(base, quote, data)
            synthetic_prices[f'{symbol}_fair'] = pd.Series([fair_price])

Помню, как я бился над оптимизацией этого кода. Изначально я пытался рассчитывать все возможные маршруты конвертации и выбирать оптимальный. Но оказалось, что простой расчет через USD даёт более стабильные результаты, особенно в условиях высокой волатильности.

Интересна также функция расчета курса к USD:

def get_usd_rate(self, currency: str, data: dict) -> float:
    """Получает курс валюты к USD"""
    if currency == "USD":
        return 1.0
        
    pair_info = self.usd_pairs[currency]
    if isinstance(pair_info, tuple):
        pair, inverse = pair_info
        rate = data[pair]['close'].iloc[-1]
        return 1 / rate if inverse else rate
    else:
        pair = pair_info
        return data[pair]['close'].iloc[-1]

Эта функция появилась после долгих экспериментов с различными способами расчета кросс-курсов. Ключевой момент здесь — правильная обработка инверсных пар. Неправильный расчет даже на одной паре может привести к каскаду ошибок в синтетических ценах.

Для работы с реальными данными я разработал специальную функцию:

def get_mt5_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
    try:
        timezone = pytz.timezone("Etc/UTC")
        utc_from = datetime.now(timezone) - timedelta(days=1)

        ticks = mt5.copy_ticks_from(symbol, utc_from, count, mt5.COPY_TICKS_ALL)
        if ticks is None:
            logger.error(f"Failed to fetch data for {symbol}")
            return None

        ticks_frame = pd.DataFrame(ticks)
        ticks_frame['time'] = pd.to_datetime(ticks_frame['time'], unit='s')
        return ticks_frame

Выбор количества тиков (1000) — это компромисс между точностью расчетов и скоростью обработки данных. На практике оказалось, что этого достаточно для надежного определения справедливой цены.

В процессе работы над модулем я сделал интересное наблюдение: расхождения между реальной и синтетической ценой часто появляются перед значительными движениями рынка. Как будто умные деньги начинают двигать одни пары, создавая напряжение в системе кросс-курсов, которое потом разряжается сильным движением.

Конечно, арбитражный модуль — это не волшебная палочка, но, в сочетании с анализом объемов и экономическими индикаторами, он дает дополнительное измерение для понимания рынка. В следующих версиях планируется добавить анализ корреляций между отклонениями в разных парах, и это уже совсем другая история.


Заключение

Знаете, когда я начинал этот проект, даже не представлял, во что он выльется. Думал — просто соединю Python с MQL5, и дело с концом. А получилась целая торговая платформа! Каждый её кусочек — как деталь в швейцарских часах, и данная часть — лишь первая из множества статей.

За время разработки я много чего понял. Например, что в алготрейдинге нет простых путей. Взять хотя бы расчет объема позиции — казалось бы, что тут сложного? А когда начинаешь учитывать все риски и поведение рынка, голова кругом идет.

А как здорово работает модульная архитектура! Если один модуль сбоит, остальные продолжают работать. Можно спокойно улучшать каждую часть, не боясь сломать всю систему.

Самое интересное — наблюдать, как разные части системы работают вместе. Один модуль ищет арбитраж, другой следит за объемами, третий анализирует экономику, а четвертый контролирует риски. Вместе они видят рынок так, как не видит ни один отдельный анализ.

Конечно, еще есть куда расти. Хочется добавить анализ новостей, улучшить машинное обучение, разработать новые модели оценки рисков. Особенно интересно поработать над 3D-визуализацией рынка — представить цену, объем и время в одном пространстве.

Главный урок этого проекта — торговая система должна быть живой. Рынок не стоит на месте, и система должна меняться вместе с ним. Учиться на ошибках, находить новые закономерности, отбрасывать устаревшие подходы.

Надеюсь, мой опыт пригодится тем, кто тоже создает торговые алгоритмы. И помните, в этом деле нет финишной прямой. Есть только путь!

Прикрепленные файлы |
arbitrage_mt5.py (5.75 KB)
Алгоритм черной дыры — Black Hole Algorithm (BHA) Алгоритм черной дыры — Black Hole Algorithm (BHA)
Алгоритм черной дыры (Black Hole Algorithm, BHA) использует принципы гравитации черных дыр для оптимизации решений. В статье мы рассмотрим, как BHA притягивает лучшие решения, избегая локальных экстремумов, и почему этот алгоритм стал мощным инструментом для решения сложных задач. Узнайте, как простые идеи могут привести к впечатляющим результатам в мире оптимизации.
Возможности Мастера MQL5, которые вам нужно знать (Часть 25): Тестирование и торговля на нескольких таймфреймах Возможности Мастера MQL5, которые вам нужно знать (Часть 25): Тестирование и торговля на нескольких таймфреймах
Стратегии, основанные на нескольких таймфреймах, по умолчанию не могут быть протестированы в советниках, собранных с помощью Мастера, из-за архитектуры кода MQL5, используемой в классах сборки. Мы рассмотрим способ обхода этого ограничения для стратегий, которые предполагают использование нескольких таймфреймов на примере квадратичной скользящей средней.
Нейросети в трейдинге: Гибридный торговый фреймворк с предиктивным кодированием (Окончание) Нейросети в трейдинге: Гибридный торговый фреймворк с предиктивным кодированием (Окончание)
Продолжаем рассмотрение гибридной торговой системы StockFormer, которая объединяет предиктивное кодирование и алгоритмы обучения с подкреплением для анализа финансовых временных рядов. Основой системы служат три ветви Transformer с механизмом Diversified Multi-Head Attention (DMH-Attn), позволяющим выявлять сложные паттерны и взаимосвязи между активами. Ранее мы познакомились с теоретическими аспектами фреймворка и реализовали механизмы DMH-Attn, а сегодня поговорим об архитектуре моделей и их обучении.
Построение модели для ограничения диапазона сигналов по тренду (Часть 5): Система уведомлений (Часть III) Построение модели для ограничения диапазона сигналов по тренду (Часть 5): Система уведомлений (Часть III)
Эта часть серии посвящена интеграции WhatsApp с MetaTrader 5 для получения уведомлений. Мы рассмотрим блок-схему для упрощения понимания и обсудим важность мер безопасности при интеграции. Основная цель индикаторов — упростить анализ за счет автоматизации. Они должны включать методы уведомления для оповещения пользователей при выполнении определенных условий.