English Русский Español Deutsch 日本語 Português
preview
基于Python与MQL5的多模块交易机器人(第一部分):构建基础架构与首个模块

基于Python与MQL5的多模块交易机器人(第一部分):构建基础架构与首个模块

MetaTrader 5交易系统 |
506 1
Yevgeniy Koshtenko
Yevgeniy Koshtenko

引言

有一天,一个念头击中了我:对于现代市场而言,交易机器人太简单了,我们需要一些更灵活、更智能的东西。

市场是不断变化的。今天一个策略有效,明天就变得毫无用处。我观察了很长时间,意识到需要一种全新的方法。解决方案来得出乎意料。如果我们构建一个模块化系统呢?想象一个专业团队:一个人监控趋势,第二个人分析交易量,第三个人控制风险。这正是现代交易机器人应有的工作方式!

技术的选择是显而易见的。Python 被证明是进行数据分析的完美选择——借助它的库,你可以创造奇迹。MQL5 则负责执行交易。一个绝佳的组合诞生了。我们从小处着手:首先,我们创建了一个坚实的基础——一个能够成长和演进的架构,然后我们添加了 Python 和 MQL5 之间的交互。数据管理系统被证明出奇地简单而有效。

异步性是一个真正的突破!现在,机器人可以同时监控多个交易品种。生产效率也突飞猛进。

你知道最有趣的是什么吗?这个系统在真实市场中确实有效。它不仅仅是一个教科书式的示例,而是一个实际的交易工具。当然,我们将从基础版本开始,但即便如此,它也已足够令人印象深刻。我们前方还有一段伟大的旅程。我们将创建一个能够学习和适应的系统。我们将一步一步地改进它。现在,让我们从最重要的事情开始——构建一个坚实的基础。


系统的基础架构,寻求完美的平衡

三年来,我一直在努力创建交易机器人。我逐渐认识到,关键不在于算法本身,而在于它们如何协同工作。这一发现改变了一切。

想象一个交响乐团。每位音乐家都很出色,但没有指挥就没有音乐。在我的系统中,MarketMaker(市场制造者)就扮演了这样的指挥角色。它控制着四个模块,每个模块都各司其职:

  • 第一个模块监控交易量:交易在何时、以何种价格发生。
  • 第二个模块寻找套利机会。
  • 第三个模块分析经济状况。 
  • 第四个模块防止系统过度冒险,并控制风险。

市场不会等待任何人。它以闪电般的速度变化,因此所有模块都同时工作,并不断相互通信。比如说,套利模块发现了机会。系统会检查来自其他模块的信息,然后做出决策。

起初,我想过制定严格的入市规则。但实践很快证明这是行不通的。有时,一个强烈的信号比几个微弱的信号更重要。数据整理耗费了大量时间。每个模块都有自己的信息:报价、宏观经济指标、交易历史。所有这些都需要被存储、更新,并与其他模块共享。因此,有必要创建一个专门的同步系统。

具有讽刺意味的是,模块越是独立,系统运行得就越好。一个组件的故障不会导致其他模块停止工作。但故障确实会发生:连接中断,或者报价卡住。这种架构的主要优势在于它的可扩展性。想要添加新闻分析功能?没问题!创建一个模块,将其连接到 MarketMaker,一切就都神奇地运转起来了。

这个系统在不断地生存和发展。它并不完美,但其模块化、并行性和灵活性的基础,让我们能够满怀信心地展望未来。我很快会详细介绍每个组件。


系统主类

在对交易机器人架构的不同方法进行了大量实验之后,我认识到,系统的成功在很大程度上取决于其核心的组织方式。MarketMaker 正是这一认知的产物,它体现了我在多年开发算法系统过程中积累的所有最佳实践。

让我们从类的基本结构开始。它的初始化看起来是这样的:

def __init__(self, pairs: List[str], terminal_path: str, 
             volume: float = 1.0, levels: int = 5, spacing: float = 3.0):
    # Main parameters
    self.pairs = pairs
    self.base_volume = volume
    self.levels = levels
    self.spacing = spacing
    self.magic = 12345
    
    # Trading parameters
    self.portfolio_iterations = 10
    self.leverage = 50
    self.min_profit_pips = 1.0
    self.max_spread_multiplier = 2.0
    
    # Data warehouses
    self.symbols_info = {}
    self.trading_parameters = {}
    self.optimal_horizons = {}

乍一看,一切似乎都非常简单。但在每个参数背后都有一段故事。以 portfolio_iterations 为例——这个参数是在我注意到过于激进地开仓可能导致流动性问题之后才创建的。现在,系统会将可用交易量分解成若干部分,这使得交易更加均衡。

我对历史数据的初始化给予了特别的关注。其工作原理如下:

def _initialize_history(self, pair: str):
    """Initializing historical data for a pair"""
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is not None:
            df = pd.DataFrame(rates)
            df['time'] = pd.to_datetime(df['time'], unit='s')
            df.set_index('time', inplace=True)
            returns = np.log(df['close'] / df['close'].shift(1)).dropna()
            self.returns_history[pair] = pd.Series(returns.values, 
                                                 index=df.index[1:])
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

这里有趣的一点是,它使用的是对数收益率,而不是简单的百分比变化。这并非一个随机的选择。在实践中,我发现对数收益率在计算统计指标时能提供更稳定的结果,尤其是在涉及波动率时。

最具挑战性的方面之一是实现成交量预测。经过多次实验,诞生了以下代码:

async def update_volume_predictions(self):
    """Updating volume predictions for each pair"""
    for pair in self.pairs:
        try:
            df = volume_model.get_volume_data(
                symbol=pair,
                timeframe=mt5.TIMEFRAME_H1,
                n_bars=100
            )
            
            if pair in self.volume_models:
                feature_columns = [
                    'volume_sma_5', 'volume_sma_20', 'relative_volume', 
                    'volume_change', 'volume_volatility', 'price_sma_5', 
                    'price_sma_20', 'price_change', 'price_volatility',
                    'rsi', 'macd', 'macd_signal', 'bb_upper', 'bb_lower'
                ]
                
                X = df[feature_columns].iloc[-1:].copy()
                prediction = self.volume_models[pair].predict(X)[0]
                current_price = df['close'].iloc[-1]
                predicted_change = (prediction - current_price) / current_price
                
                self.volume_predictions[pair] = predicted_change
                
        except Exception as e:
            logger.error(f"Error updating prediction for {pair}: {e}")

请注意,这组特征并非随机选取的指标集合。它们中的每一个都是在经过仔细测试后逐步添加的。例如,relative_volume(相对成交量)已被证明对于识别异常的市场活动特别有用。

而这里就是系统的心脏——交易循环:

async def trade_cycle(self):
    """Main trading loop"""
    try:
        await self.update_volume_predictions()
        await self.economic_module.update_forecasts()
        
        all_positions = mt5.positions_get() or []
        open_positions = [pos for pos in all_positions if pos.magic == self.magic]
        
        if open_positions:
            await self.manage_positions()
            return
            
        valid_signals = []
        available_volume = self.calculate_available_volume() * len(self.pairs)
        
        for pair in self.pairs:
            signal = await self.get_combined_signal(pair)
            if signal and self._validate_signal(signal):
                valid_signals.append(signal)
        
        if valid_signals:
            volume_per_trade = available_volume / len(valid_signals)
            for signal in valid_signals:
                signal['adjusted_volume'] = volume_per_trade
                await self.place_order(signal)
                
    except Exception as e:
        logger.error(f"Error in trade cycle: {e}")

这段代码是关于如何正确组织交易流程的长期思考的成果。该循环的异步特性使其能够高效地同时处理多个交易对,而清晰的行动序列(更新预测 → 检查持仓 → 搜索信号 → 执行)确保了系统行为的可预测性。

信号验证机制值得特别关注:

def _validate_signal(self, signal: Dict) -> bool:
    """Trading signal check"""
    spread = signal['spread']
    diff_pips = signal['diff_pips']
    
    # Basic checks
    if spread > self.max_spread_multiplier * diff_pips:
        return False
        
    if diff_pips < self.min_profit_pips:
        return False
        
    # Check economic factors
    if signal['economic_volatility'] > self.volatility_threshold:
        return False
        
    # Check the volume prediction
    if abs(signal['volume_prediction']) < self.min_volume_change:
        return False
        
    return True

这里的每一项检查都是真实交易经验的结晶。例如,经济波动性检查是在我注意到在重要新闻事件期间交易,常常因价格剧烈波动而导致亏损增加之后才添加的。

最后,我想指出的是,MarketMaker 是一个持续演进的、有生命力的系统。每一天的交易都会带来新的想法和改进。模块化的架构使得在不干扰核心组件的情况下,轻松实现这些改进成为可能。


数据处理

数据处理一直是算法交易中最具挑战性的方面之一。我记得在开发初期,我曾面临一个看似简单的问题:如何正确地组织市场信息的存储和处理?很快我就明白,一个常规的数据库或简单的数组是远远不够的。

这一切都从创建一个接收数据的基本结构开始。经过几次迭代,诞生了以下方法:

def _initialize_history(self, pair: str):
    try:
        rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, 
                                  datetime.now()-timedelta(days=1), 1440)
        if rates is None:
            logger.error(f"Failed to get history data for {pair}")
            return
            
        df = pd.DataFrame(rates)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        df.set_index('time', inplace=True)
        
        # Calculate logarithmic returns
        returns = np.log(df['close'] / df['close'].shift(1)).dropna()
        
        # Add new metrics
        df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
        df['price_velocity'] = df['close'].diff() / df['time'].diff().dt.total_seconds()
        df['volume_intensity'] = df['tick_volume'] / df['time'].diff().dt.total_seconds()
        
        self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:])
        self.price_data[pair] = df
        
    except Exception as e:
        logger.error(f"Error initializing history for {pair}: {e}")

这里有趣的一点是价格变化速度(price_velocity)和成交量强度(volume_intensity)的计算。这些指标并非一开始就存在。最初,我只处理常规的价格数据,但很快我意识到,市场不仅仅是价格的序列,它是一个复杂的动态系统,重要的不仅是变化的大小,还有变化的速度。

必须特别注意处理缺失的数据。验证和清洗系统如下所示:

def _validate_and_clean_data(self, df: pd.DataFrame) -> pd.DataFrame:
    """Validation and data cleaning"""
    if df.empty:
        raise ValueError("Empty dataset received")
        
    # Check gaps
    missing_count = df.isnull().sum()
    if missing_count.any():
        logger.warning(f"Found missing values: {missing_count}")
        
        # Use 'forward fill' for prices
        price_cols = ['open', 'high', 'low', 'close']
        df[price_cols] = df[price_cols].ffill()
        
        # Use interpolation for volumes
        df['tick_volume'] = df['tick_volume'].interpolate(method='linear')
    
    # Check outliers
    for col in ['high', 'low', 'close']:
        zscore = stats.zscore(df[col])
        outliers = abs(zscore) > 3
        if outliers.any():
            logger.warning(f"Found {outliers.sum()} outliers in {col}")
            
            # Replace extreme outliers
            df.loc[outliers, col] = df[col].mean() + 3 * df[col].std() * np.sign(zscore[outliers])
    
    return df

我记得有一次,仅仅因为缺失了一个 tick,就导致了指标计算错误,并最终产生了一个错误的交易信号。在那之后,数据清洗系统得到了显著改进。

以下是我们处理市场最重要的特征之一——成交量的方式:

def analyze_volume_profile(self, pair: str, window: int = 100) -> Dict:
    """Volume profile analysis"""
    try:
        df = self.price_data[pair].copy().last(window)
        
        # Normalize volumes
        volume_mean = df['tick_volume'].rolling(20).mean()
        volume_std = df['tick_volume'].rolling(20).std()
        df['normalized_volume'] = (df['tick_volume'] - volume_mean) / volume_std
        
        # Calculate volume clusters
        price_levels = pd.qcut(df['close'], q=10)
        volume_clusters = df.groupby(price_levels)['tick_volume'].sum()
        
        # Find support/resistance levels by volume
        significant_levels = volume_clusters[volume_clusters > volume_clusters.mean() + volume_clusters.std()]
        
        # Analyze imbalances
        buy_volume = df[df['close'] > df['open']]['tick_volume'].sum()
        sell_volume = df[df['close'] <= df['open']]['tick_volume'].sum()
        volume_imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume)
        
        return {
            'normalized_profile': volume_clusters.to_dict(),
            'significant_levels': significant_levels.index.to_list(),
            'volume_imbalance': volume_imbalance,
            'current_percentile': stats.percentileofscore(df['tick_volume'], df['tick_volume'].iloc[-1])
        }
        
    except Exception as e:
        logger.error(f"Error analyzing volume profile: {e}")
        return None

这段代码是对市场微观结构长期研究的成果。其中,买卖成交量之间失衡的计算尤其引人关注。我最初是在加密货币市场上研究这个课题的。我不知道 MQL5 的管理部门是否会批准发布整合了加密货币交易所、MetaTrader 5 和 Python 的代码……

不过我有点扯远了。乍一看,简单地比较上涨和下跌 K 线的成交量似乎不会提供有用的信息。但实践表明,这个简单的指标常常能预示即将到来的趋势反转。

处理经济数据又是另一回事了。在这里,有必要创建一整套同步系统:

async def synchronize_market_data(self):
    """Market data synchronization"""
    while True:
        try:
            # Update basic data
            for pair in self.pairs:
                latest_data = await self._get_latest_ticks(pair)
                if latest_data is not None:
                    self._update_price_data(pair, latest_data)
                    
            # Update derived metrics
            await self._update_derivatives()
            
            # Check data integrity
            self._verify_data_integrity()
            
            await asyncio.sleep(1)  # Dynamic delay
            
        except Exception as e:
            logger.error(f"Error in data synchronization: {e}")
            await asyncio.sleep(5)  # Increased delay on error

这里的关键在于数据更新的异步性。在系统的早期版本中,我使用的是同步请求,但在处理大量交易对时,这会导致延迟。转向异步模型后,系统的性能得到了显著提升。

总而言之,我想指出的是,正确地组织数据处理工作,并不仅仅是一个技术问题。这是整个交易策略所建立的基石。干净、结构化的数据能让我们看到那些在粗浅分析中无法被发现的市场规律。


第一个模块:成交量分析

成交量分析模块的开发历史始于一个简单的观察:经典指标常常存在滞后性,因为它们只处理价格数据。但市场并不仅仅关乎价格,它还关乎成交量,而成交量往往能预示行情的变动。这就是我们系统的第一个模块是成交量分析器的原因。

让我们从基本的数据获取函数开始:

def get_volume_data(symbol, timeframe=mt5.TIMEFRAME_H1, n_bars=2000):
    """Getting volume and price data from MT5"""
    try:
        bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, n_bars)
        if bars is None:
            logger.error(f"Failed to get data for {symbol}")
            return None
        
        df = pd.DataFrame(bars)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        return df
        
    except Exception as e:
        logger.error(f"Error getting data for {symbol}: {e}")
        return None

乍一看,这个函数看起来很简单。但在这份简单背后,隐藏着一个重要的决定:我们只获取 2000 根K线的历史数据。为什么呢?我通过实验发现,这个数据量足以构建一个高质量的模型,但同时,在训练数据集维度很大、输入特征以批次序列形式输入的超大型模型时,它又不会给服务器内存造成过大的负荷。

该模块最有趣的部分,是为分析创建特征。其工作原理如下:

def create_features(df, forecast_periods=None):
    """Create features for the forecasting model"""
    try:
        # Basic volume indicators
        df['volume_sma_5'] = df['tick_volume'].rolling(window=5).mean()
        df['volume_sma_20'] = df['tick_volume'].rolling(window=20).mean()
        df['relative_volume'] = df['tick_volume'] / df['volume_sma_20']
        
        # Volume dynamics
        df['volume_change'] = df['tick_volume'].pct_change()
        df['volume_acceleration'] = df['volume_change'].diff()
        
        # Volume volatility
        df['volume_volatility'] = df['tick_volume'].rolling(window=20).std()
        df['volume_volatility_5'] = df['tick_volume'].rolling(window=5).std()
        df['volume_volatility_ratio'] = df['volume_volatility_5'] / df['volume_volatility']

在这里,要特别关注 volume_volatility_ratio(成交量波动率比率)这个指标。这个指标的出现,源于我观察到的一个有趣模式:在强劲行情启动之前,短期成交量波动率往往会开始比长期波动率增长得更快。该指标已成为识别潜在入场点的关键指标之一。

成交量分布的计算则另有一番说道:

# Volume profile
        df['volume_percentile'] = df['tick_volume'].rolling(window=100).apply(
            lambda x: pd.Series(x).rank(pct=True).iloc[-1]
        )
        df['volume_density'] = df['tick_volume'] / (df['high'] - df['low'])
        df['volume_density_ma'] = df['volume_density'].rolling(window=20).mean()
        df['cumulative_volume'] = df['tick_volume'].rolling(window=20).sum()
        df['volume_ratio'] = df['tick_volume'] / df['cumulative_volume']

volume_density(成交量密度)指标的出现并非偶然。我注意到,成交量本身可能具有欺骗性——重要的是要考虑这些成交量是在哪个价格区间内产生的。在狭窄价格区间内的高成交量,通常预示着一个重要支撑位或阻力位的形成。

我开发了一个专门的函数来预测价格运动方向:

def predict_direction(model, X):
    """Price movement direction forecast"""
    try:
        prediction = model.predict(X)[0]
        current_price = X['close'].iloc[-1] if 'close' in X else None
        if current_price is None:
            return 0
            
        # Return 1 for rise, -1 for fall, 0 for neutral
        price_change = (prediction - current_price) / current_price
        if abs(price_change) < 0.0001:  # Minimum change threshold
            return 0
        return 1 if price_change > 0 else -1
        
    except Exception as e:
        logger.error(f"Error predicting direction: {e}")
        return 0

请注意,这里的变动阈值是 0.0001。这并非一个随机数字——它是基于对考虑到点差和各种类型佣金后能够处理的最小价格变动的分析而选定的。对于股票市场,该指标应另行选择。

最后阶段是训练模型:

def train_model(X_train, X_test, y_train, y_test, model_params=None):
    try:
        if model_params is None:
            model_params = {'n_estimators': 400, 'random_state': 42}
            
        model = RandomForestRegressor(**model_params)
        model.fit(X_train, y_train)
        
        # Model evaluation
        train_predictions = model.predict(X_train)
        test_predictions = model.predict(X_test)
        
        train_rmse = np.sqrt(mean_squared_error(y_train, train_predictions))
        test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions))
        test_r2 = r2_score(y_test, test_predictions)

你知道我为什么选择包含 400 棵树的随机森林(RandomForest)吗?在尝试了一堆方法后,从简单的回归到架构复杂得惊人的神经网络,我得出了一个结论:这种方法是最可靠的。也许不是最精确的,但它稳定。市场充满噪音且反复无常,但随机森林表现得相当稳健。

当然,这仅仅是个开始。我们接下来要面对的问题是如何将所有这些信号整合在一起,以及如何设置系统以使其能够在线学习。但更多内容,我们下次再谈。


风险管理:保全资本的艺术

现在,让我们来谈谈最重要的事情——风险。听大家讨论那些酷炫的策略和神经网络,是件相当有趣的事。在市场上摸爬滚打了十年,我领悟到了最关键的一点:所有这些策略,如果没有风险管理,都一文不值。你可能拥有一个超级交易算法,但如果没有适当的风险管理,你最终仍会陷入亏损。

因此,在我们的系统中,资本保护居于核心地位。正是这种保守的方法,让我们能够持续地盈利,而其他人则在他们的“完美”策略上亏钱。

def calculate_available_volume(self) -> float:
    try:
        account = mt5.account_info()
        if not account:
            return 0.01
            
        # Use balance and free margin
        balance = account.balance
        free_margin = account.margin_free
        
        # Take the minimum value for safety
        available_margin = min(balance, free_margin)
        
        # Calculate the maximum volume taking into account the margin
        margin_ratio = 0.1  # Use only 10% of the available margin
        base_volume = (available_margin * margin_ratio) / 1000
        
        # Limit to maximum volume
        max_volume = min(base_volume, 1.0)  # max 1 lot

请注意,margin_ratio(保证金比例) = 0.1。这不是一个随机数字。经过数月的测试,我得出结论:使用超过10%的可用保证金,在市场剧烈波动时会显著增加爆仓的风险。在同时交易多个货币对时,这一点尤其关键。

下一个重点是止损和止盈的计算:

async def calculate_position_limits(self, signal: Dict) -> Tuple[float, float]:
    try:
        pair = signal['pair']
        direction = signal['direction']
        
        # Get volatility
        volatility = signal['price_volatility']
        economic_volatility = signal['economic_volatility']
        
        # Base values in pips
        base_sl = 20
        base_tp = 40
        
        # Adjust for volatility
        volatility_factor = 1 + (volatility * 2)
        sl_points = base_sl * volatility_factor
        tp_points = base_tp * volatility_factor
        
        # Take economic volatility into account
        if economic_volatility > 0.5:
            sl_points *= 1.5
            tp_points *= 1.2
            
        # Check minimum distances
        info = self.symbols_info[pair]
        min_stop_level = info.trade_stops_level if hasattr(info, 'trade_stops_level') else 0
        
        return max(sl_points, min_stop_level), max(tp_points, min_stop_level)
        
    except Exception as e:
        logger.error(f"Error calculating position limits: {e}")
        return 20, 40  # return base values in case of an error

关于 volatility_factor(波动率因子) 的故事尤其有趣。最初我使用固定的止损水平,但很快注意到,在高波动性时期,它们经常被过早、过频繁地触发。根据当前波动率动态调整止损水平,显著改善了交易结果。

下面是持仓管理系统的样子:

async def manage_positions(self):
    """Managing open positions"""
    try:
        positions = mt5.positions_get() or []
        for position in positions:
            if position.magic == self.magic:
                # Check the time in the position
                time_in_trade = datetime.now() - pd.to_datetime(position.time, unit='s')
                
                # Get current market data
                signal = await self.get_combined_signal(position.symbol)
                
                # Check the need to modify the position
                if self._should_modify_position(position, signal, time_in_trade):
                    await self._modify_position(position, signal)
                    
                # Check the closing conditions
                if self._should_close_position(position, signal, time_in_trade):
                    await self.close_position(position)
                    
    except Exception as e:
        logger.error(f"Error managing positions: {e}")

这里特别关注的是持仓时间。经验表明,持仓时间越长,对其维持的要求就应该越高。这是通过随时间动态收紧持仓条件来实现的。

另一个有趣的点与部分平仓有关:

def calculate_partial_close(self, position, profit_threshold: float = 0.5) -> float:
    """Volume calculation for partial closure"""
    try:
        # Check the current profit
        if position.profit <= 0:
            return 0.0
            
        profit_ratio = position.profit / (position.volume * 1000)  # approximate ROI estimate
        
        if profit_ratio >= profit_threshold:
            # Close half of the position when the profit threshold is reached
            return position.volume * 0.5
        return 0.0
        
    except Exception as e:
        logger.error(f"Error calculating partial close: {e}")
        return 0.0

这个功能是在分析交易记录后创建的。我注意到,在达到特定盈利水平时部分平仓,能显著改善整体的交易统计数据。这让我们能够锁定一部分利润,同时为后续的潜在增长留出空间。

最后,我想指出,风险管理系统是一个不断演化的生命体。每一笔失败的交易,每一次意外的市场波动,都是我们用来完善资本保护算法的新经验。在系统的下一个版本中,我决定加入机器学习来动态优化风险管理参数,以及一个VaR(风险价值)系统与马科维茨投资组合理论的混合模型,但那就是一个完全不同的故事了……


经济模块:当基本面分析遇上机器学习

在开发交易系统的过程中,我注意到了一个有趣的模式:即使是最强的技术信号,如果与基本面因素相悖,也可能失效。正是这一观察促使我创建了经济模块,这是一个分析宏观经济指标及其对货币对走势影响的组件。

让我们从模块的基本结构开始。这是主要经济指标的初始化代码:

def __init__(self):
    self.indicators = {
        'NY.GDP.MKTP.KD.ZG': 'GDP growth',
        'FP.CPI.TOTL.ZG': 'Inflation',
        'FR.INR.RINR': 'Real interest rate',
        'NE.EXP.GNFS.ZS': 'Exports',
        'NE.IMP.GNFS.ZS': 'Imports',
        'BN.CAB.XOKA.GD.ZS': 'Current account balance',
        'GC.DOD.TOTL.GD.ZS': 'Government debt',
        'SL.UEM.TOTL.ZS': 'Unemployment rate',
        'NY.GNP.PCAP.CD': 'GNI per capita',
        'NY.GDP.PCAP.KD.ZG': 'GDP per capita growth'
    }

选择这些指标并非随机。在分析了数千笔交易后,我注意到这些指标对货币对的长期趋势影响最大。特别有趣的是实际利率与货币走势之间的关系,该指标的变化往往预示着趋势的反转。

我开发了一个专门的方法来获取经济数据:

def fetch_economic_data(self):
    data_frames = []
    for indicator, name in self.indicators.items():
        try:
            data_frame = wbdata.get_dataframe({indicator: name}, country='all')
            data_frames.append(data_frame)
        except Exception as e:
            logger.error(f"Error fetching data for indicator '{indicator}': {e}")

    if data_frames:
        self.economic_data = pd.concat(data_frames, axis=1)
        return self.economic_data

这里有趣的一点是使用了 wbdata 库来获取世界银行的数据。在尝试了不同的API后,我选择了这个数据源,因为它提供了最完整、最经过验证的数据。

我特别关注了为分析所做的数据准备工作:

def prepare_data(self, symbol_data):
    data = symbol_data.copy()
    data['close_diff'] = data['close'].diff()
    data['close_corr'] = data['close'].rolling(window=30).corr(data['close'].shift(1))

    for indicator in self.indicators.keys():
        if indicator in self.economic_data.columns:
            data[indicator] = self.economic_data[indicator].ffill()

    data.dropna(inplace=True)
    return data

请注意对经济指标使用了前向填充。这个方案并非一蹴而就——起初我尝试了插值法,但结果发现,对于经济数据而言,使用最后一个已知值更为正确。

该模块的核心是预测系统:

def forecast(self, symbol, symbol_data):
    if len(symbol_data) < 50:
        return None, None

    X = symbol_data.drop(columns=['close'])
    y = symbol_data['close']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

    model = CatBoostRegressor(iterations=1000, learning_rate=0.1, depth=8, loss_function='RMSE')
    model.fit(X_train, y_train, verbose=False)

选择 CatBoost 作为机器学习算法也并非偶然。在尝试了不同的模型(从简单的线性回归到复杂的神经网络)后,我发现 CatBoost 在处理经济数据的不规则性方面表现得最好。

最后对我们获得的结果进行解读:

def interpret_results(self, symbol):
    forecast = self.forecasts.get(symbol)
    importance_df = self.feature_importances.get(symbol)

    if forecast is None or importance_df is None:
        return f"Insufficient data for interpretation of {symbol}"

    trend = "upward" if forecast[-1] > forecast[0] else "downward"
    volatility = "high" if forecast.std() / forecast.mean() > 0.1 else "low"
    top_feature = importance_df.iloc[0]['feature']

其中,波动率的计算尤其有趣。用于定义高波动的 0.1 这一阈值,是在分析了历史数据后选定的。结果证明,该数值能很好地区分市场平静期和波动加剧期。

在开发该模块的过程中,我有一个有趣的发现:经济因子的影响常常存在延迟,但其影响力比技术因子更为稳定。这促使我们创建了一个权重系统,在该系统中,经济信号的重要性在更长的时间框架上会增加。

当然,经济模块并非万能,它无法预测所有的市场走势。但当它与技术和成交量分析相结合时,它为理解市场过程提供了一个额外的维度。在系统的未来版本中,我计划加入新闻流分析及其对经济指标的影响,但这是另一个需要单独讨论的话题了。


套利模块:探寻真实价格

创建套利模块的想法,是我在长期观察外汇市场后产生的。我注意到了一个有趣的模式:货币对的实际价格,常常会偏离通过交叉汇率计算出的理论价值。这些偏差创造了套利机会,但更重要的是,它们可以作为未来价格走势的一个指标。

让我们从该模块的基本结构开始:

class ArbitrageModule:
    def __init__(self, terminal_path: str = "C:/Program Files/RannForex MetaTrader 5/terminal64.exe", max_trades: int = 10):
        self.terminal_path = terminal_path
        self.MAX_OPEN_TRADES = max_trades
        self.symbols = [
            "AUDUSD.ecn", "AUDJPY.ecn", "CADJPY.ecn", "AUDCHF.ecn", "AUDNZD.ecn", 
            "USDCAD.ecn", "USDCHF.ecn", "USDJPY.ecn", "NZDUSD.ecn", "GBPUSD.ecn", 
            "EURUSD.ecn", "CADCHF.ecn", "CHFJPY.ecn", "NZDCAD.ecn", "NZDCHF.ecn", 
            "NZDJPY.ecn", "GBPCAD.ecn", "GBPCHF.ecn", "GBPJPY.ecn", "GBPNZD.ecn", 
            "EURCAD.ecn", "EURCHF.ecn", "EURGBP.ecn", "EURJPY.ecn", "EURNZD.ecn"
        ]

计算交叉汇率时,基础货币对的结构需要予以特别关注:

     self.usd_pairs = {
            "EUR": "EURUSD.ecn",
            "GBP": "GBPUSD.ecn", 
            "AUD": "AUDUSD.ecn",
            "NZD": "NZDUSD.ecn",
            "USD": None,
            "CAD": ("USDCAD.ecn", True),
            "CHF": ("USDCHF.ecn", True),
            "JPY": ("USDJPY.ecn", True)
        }

这里有趣的一点是,有些货币对被标记为反向(True)。这并非巧合——对于某些货币,如加元、瑞士法郎和日元,其基础报价是 USD/XXX,而非 XXX/USD。这是一个在计算交叉汇率时常被忽略的重要细节。

该模块的核心是用于计算合成价格的函数:

def calculate_synthetic_prices(self, data: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    """Calculation of synthetic prices through cross rates"""
    synthetic_prices = {}
    
    try:
        for symbol in self.symbols:
            base = symbol[:3]
            quote = symbol[3:6]
            
            # Calculate the synthetic price using cross rates
            fair_price = self.calculate_cross_rate(base, quote, data)
            synthetic_prices[f'{symbol}_fair'] = pd.Series([fair_price])

我记得当时为了优化这段代码,我费了不少功夫。最初,我尝试计算所有可能的变换路径,然后选出最优的一条。但结果证明,通过美元进行简单计算,其结果更为稳定,尤其是在市场高波动性的情况下。

用于计算相对美元汇率的函数也很有意思:

def get_usd_rate(self, currency: str, data: dict) -> float:
    """Get exchange rate to USD"""
    if currency == "USD":
        return 1.0
        
    pair_info = self.usd_pairs[currency]
    if isinstance(pair_info, tuple):
        pair, inverse = pair_info
        rate = data[pair]['close'].iloc[-1]
        return 1 / rate if inverse else rate
    else:
        pair = pair_info
        return data[pair]['close'].iloc[-1]

这个功能是在对不同的交叉汇率计算方法进行了长期实验之后才诞生的。这里的关键点在于正确处理反向货币对。哪怕只有一个货币对计算错误,也会在合成价格中引发一连串的错误。

我开发了一个专门处理实时数据的函数:

def get_mt5_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
    try:
        timezone = pytz.timezone("Etc/UTC")
        utc_from = datetime.now(timezone) - timedelta(days=1)

        ticks = mt5.copy_ticks_from(symbol, utc_from, count, mt5.COPY_TICKS_ALL)
        if ticks is None:
            logger.error(f"Failed to fetch data for {symbol}")
            return None

        ticks_frame = pd.DataFrame(ticks)
        ticks_frame['time'] = pd.to_datetime(ticks_frame['time'], unit='s')
        return ticks_frame

所选取的tick数量(1000个)是在计算精度与数据处理速度之间做出的一个折中方案。在实践中,这个数量被证明足以可靠地确定一个公允价格。

在开发这个模块的过程中,我有一个有趣的发现:真实价格与合成价格之间的背离,常常出现在重大的市场行情之前。这就好像是聪明的资金”开始调动某些货币对,在交叉汇率体系中制造了紧张感,而后这种紧张感便通过一轮强劲的行情得以释放。

当然,套利模块并非什么魔杖,但当它与成交量分析、经济指标相结合时,它为理解市场提供了一个额外的维度。在未来的版本中,我打算加入对不同货币对背离情况之间相关性的分析,而这又是一个完全不同的故事了。


结论

当我开始这个项目时,我完全没想到它会发展成什么样子。我当时以为,我仅仅是把Python和MQL5连接起来,事情就到此为止了。结果,它却演变成了一个完整的交易平台!它的每一个部分都像是瑞士手表里的一个零件,而这部分内容仅仅是系列文章中的第一篇。

在开发过程中,我学到了很多。例如,我明白了在算法交易中,没有任何捷径可走。就拿仓位量的计算来说吧。这听起来不难,对吧?但当你开始考虑所有风险和市场行为时,你的脑子就会开始发晕。

模块化架构的运行效果是多么出色啊!如果一个模块失灵了,其他的模块还能继续工作。你可以放心地改进每一个部分,而不用担心会破坏整个系统。

最有趣的是观察系统的不同部分如何协同工作。一个模块寻找套利机会,另一个监控成交量,第三个分析经济,而第四个则控制风险。它们共同看到的市场,是任何单一分析都无法企及的。

当然,还有很大的成长空间。我想加入新闻分析、改进机器学习模型、开发新的风险评估模型。研究市场的3D可视化尤其令人着迷——将价格、成交量和时间想象在同一个空间里。

这个项目最重要的经验是:交易系统应该是活的。市场不会停滞不前,系统也必须随之而变。从错误中学习,寻找新的模式,摒弃过时的方法。

我希望我的经验能对那些正在创建交易算法的人有所帮助。请记住——这段旅程没有终点线。有的,只是路!

本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/16667

附加的文件 |
arbitrage_mt5.py (5.45 KB)
最近评论 | 前往讨论 (1)
linfo2
linfo2 | 29 8月 2025 在 04:06
谢谢,我正在学习 python,您的 arbitrage_mt5 无法编译 AttributeError: 'ArbitrageModule' 对象没有属性'run',这里有什么问题吗?
价格行为分析工具包开发(第六部分):均值回归信号捕捉器 价格行为分析工具包开发(第六部分):均值回归信号捕捉器
有些概念乍一看似乎简单明了,但在实际操作中的实现却颇具挑战。在接下来的文章中,将带您了解我们创新性地自动化一款运用均值回归策略分析市场的智能交易系统(EA)的方法。与我们一同揭开这一激动人心的自动化过程的神秘面纱吧。
交易中的神经网络:降低锐度强化变换器效率(SAMformer) 交易中的神经网络:降低锐度强化变换器效率(SAMformer)
训练变换器模型需要大量数据,并且往往很困难,因为模型不擅长类推到小型数据集。SAMformer 框架通过避免糟糕的局部最小值来帮助解决这个问题。即使在有限的训练数据集上,也能提升模型的效率。
从基础到中级:数组(四) 从基础到中级:数组(四)
在本文中,我们将看看如何做一些与 C、C++ 和 Java 等语言中实现的非常相似的事情。我说的是在函数或过程中传递几乎无限数量的参数。虽然这似乎是一个相当高级的主题,但在我看来,任何理解了前面概念的人都可以很容易地实现这里展示的内容。只要它们真的被正确理解。
迁移至 MQL5 Algo Forge(第 2 部分):使用多个存储库 迁移至 MQL5 Algo Forge(第 2 部分):使用多个存储库
在本文中,我们将探讨在公共存储库中组织项目源代码存储的一种可能的方法。我们将把代码分发到不同的分支,为项目开发建立清晰方便的规则。