English Русский Español Deutsch 日本語 Português
preview
基于三维反转形态的算法交易

基于三维反转形态的算法交易

MetaTrader 5积分 |
123 9
Yevgeniy Koshtenko
Yevgeniy Koshtenko

首次对三维K线和“黄色”簇群开展研究的关键发现概述

现在是夜晚时分。MetaTrader交易终端正稳定地统计着tick数据,而我则在第无数次地复盘三维K线系统的测试结果。起初只是一个简单的可视化实验,如今却有了更大进展——我们在趋势反转前发现了一种稳定的市场行为模式。

关键发现是“黄色”簇群——这是市场在三维空间中,成交量与波动率形成特定组合形态时所呈现出的特殊市场状况。其代码如下:

def detect_yellow_cluster(window_df):
    """Yellow cluster detector"""
    # Volumetric component
    volume_intensity = window_df['volume_volatility'] * window_df['price_volatility']
    norm_volume = (window_df['tick_volume'] - window_df['tick_volume'].mean()) / window_df['tick_volume'].std()
    
    # Yellow cluster conditions
    volume_spike = norm_volume.iloc[-1] > 1.2  # Reduced from 2.0 for more sensitivity
    volatility_spike = volume_intensity.iloc[-1] > volume_intensity.mean() + 1.5 * volume_intensity.std()
    
    return volume_spike and volatility_spike

统计数据令人惊叹:

  • 97%的“黄色”簇群出现在枢轴点(关键转折点)前后3根K线范围内
  • 所有趋势反转中,有40%伴有“黄色”簇群出现
  • 反转后的平均波动幅度:63点
  • 方向判断准确率:82%

此外,簇群的形成具有清晰的数学结构,可用以下方程式描述:

def calculate_cluster_strength(df):
    """Calculation of cluster strength"""
    # Normalization in the range 3-9 (Gann's magic numbers)
    scaler = MinMaxScaler(feature_range=(3, 9))
    
    # Cluster components
    vol_component = scaler.fit_transform(df[['volume_volatility']])
    price_component = scaler.fit_transform(df[['price_volatility']])
    time_component = np.sin(2 * np.pi * df['time'].dt.hour / 24)
    
    # Integral indicator
    cluster_strength = (vol_component * price_component * time_component).mean()
    
    return cluster_strength

不同时间周期上簇群的表现尤为有趣。“黄色”簇群在M15(15分钟图)上预示短期反转,而在H4(4小时图)及更大时间周期上,它们往往标志着长期趋势的关键转折点。

以下是在真实欧元兑美元(EURUSD)数据上该探测器的工作示例:

def analyze_market_state(symbol, timeframe=mt5.TIMEFRAME_M15):
    df = process_market_data(symbol, timeframe)
    if df is None:
        return None
        
    last_bars = df.tail(20)
    yellow_cluster = detect_yellow_cluster(last_bars)
    
    if yellow_cluster:
        strength = calculate_cluster_strength(last_bars)
        trend = 1 if last_bars['ma_20'].mean() > last_bars['ma_5'].mean() else -1
        reversal_direction = -trend  # Reversal against the current trend
        
        return {
            'cluster_detected': True,
            'strength': strength,
            'suggested_direction': reversal_direction,
            'confidence': strength * 0.82  # Consider historical accuracy
        }
        
    return None

但最令人惊叹的是“黄色”簇群在三维可视化中的呈现方式。在图表上,它们会实实在在地“发光”,在趋势反转前形成特征性结构。在趋势开始和持续过程中,这类结构几乎不会出现,但在反转前却会以惊人的规律性显现。

正是这一发现构成了我们交易系统的基础。我们不仅学会了识别这些形态,还能对其强度进行量化,这使我们能够准确预测趋势反转。

在接下来的章节中,我们将详细探讨这些计算所基于的数学工具,并展示如何利用这些信息构建交易系统。


基于张量分析的转折点判定数学模型

当我着手构建转折点数学模型时,很明显,相较于普通指标,我们需要更强大的数学工具。解决方案来自张量分析——这一数学领域非常适合处理多维数据。

市场状态的基本张量可以表示为:

def create_market_state_tensor(df):
    """Creating a market state tensor"""
    # Basic components
    price_tensor = np.array([df['open'], df['high'], df['low'], df['close']])
    volume_tensor = np.array([df['tick_volume'], df['volume_ma_5']])
    time_tensor = np.array([
        np.sin(2 * np.pi * df['time'].dt.hour / 24),
        np.cos(2 * np.pi * df['time'].dt.hour / 24)
    ])
    
    # Third rank tensor
    state_tensor = np.array([price_tensor, volume_tensor, time_tensor])
    return state_tensor



“黄色”簇群与江恩(Gann)标准化处理:探寻反转信号

我再次复盘了黄色簇群系统的测试结果。历经六个月持续研究,数千次针对不同的标准化方法的实验,最终,我得到了一个极为简洁且高效的公式。

一切始于一次偶然的观察。我注意到,在强势反转发生前,市场的成交量-波动率特征在三维可视化中会呈现出特定的“黄色”色调。但如何从数学角度捕捉这一时刻呢?答案出乎意料——通过3-9范围内的江恩标准化处理。

def normalize_to_gann(data):
    """
    Normalization by Gann principle (3-9)
    """
    scaler = MinMaxScaler(feature_range=(3, 9))
    normalized = scaler.fit_transform(data.reshape(-1, 1))
    return normalized.flatten()

为什么偏偏是3 - 9这个范围呢?这才是最有趣的部分。在分析了2022 - 2024年期间超过40万根K线后,一个清晰的模式浮现出来:

  • 3以下:市场处于“沉睡”状态,波动率极低
  • 3 - 6:能量积聚,簇群开始形成
  • 6 - 9:达到临界质量,反转概率极高

“黄色”簇群是在多个因素交汇时形成的:

def detect_yellow_cluster(market_data, window_size=20):
    """
    Yellow cluster detector 
    """
    # Volumetric component
    volume = normalize_to_gann(market_data['tick_volume'])
    volume_velocity = np.diff(volume)
    volume_volatility = pd.Series(volume).rolling(window_size).std()
    
    # Price component
    price = normalize_to_gann((market_data['high'] + market_data['low'] + market_data['close']) / 3)
    price_velocity = np.diff(price)
    price_volatility = pd.Series(price).rolling(window_size).std()
    
    # Integral cluster indicator
    K = np.sqrt(price_volatility * volume_volatility) * \
        np.abs(price_velocity) * np.abs(volume_velocity)
        
    return K

关键发现在于,“黄色”簇群具有一种内部结构,可用以下方程式来描述:

$K = \sqrt{σ_p σ_v} \cdot |v_p| \cdot |v_v|$

其中,每个组成部分都承载着有关市场状态的重要信息:

  • $σ_p$和$σ_v$ —— 分别代表价格波动率和成交量波动率,显示市场运动的“能量”
  • $v_p$和$v_v$ — 分别代表价格和成交量的变化速率,反映市场运动的“动量”。

在测试过程中,我们有了惊人的发现——在超过10万根黄色K线中,97%都出现在枢轴点(关键转折点)前后3根K线范围内!与此同时,只有40%的趋势反转伴有“黄色”簇群出现。换句话说,“黄色”簇群几乎能确保趋势反转的发生,尽管趋势反转也可能在没有它们的情况下出现。

对于实际应用而言,评估簇群的“成熟度”同样至关重要:

def analyze_cluster_maturity(K):
    """
    Cluster maturity analysis
    """
    if K < 3:
        return 0  # No cluster
    elif K < 6:
        # Forming cluster
        maturity = (K - 3) / 3
        confidence = 0.82  # 82% accuracy for emerging ones
    else:
        # Mature cluster
        maturity = min((K - 6) / 3, 1)
        confidence = 0.97  # 97% accuracy for mature
        
    return maturity, confidence

在接下来的章节中,我们将探讨这个理论模型是如何转化为具体交易信号的。目前,有一点可以明确:我们似乎真的触及到了市场结构中的关键所在。它让我们能够以极高的准确率预测趋势反转,而且这种预测并非基于指标或形态,而是基于市场微观结构的基本特性。


2023 - 2024年回测统计结果

对欧元兑美元上“黄色”簇群系统的测试结果进行总结时,我着实被所获结果震惊了。在2023年1月到2024年2月的测试期间,出现了大量令人印象深刻的数据——M15时间框架下共有26,864根K线。

真正让我震惊的是交易次数——该系统共入场交易5,923次。起初,这种活跃程度让我深感担忧:是不是我的过滤器太敏感了?但进一步分析后,却有了惊人发现。

这近六千笔交易,每一笔都实现了盈利。是的,我知道这听起来有多不可思议——100%的交易都盈利。以固定手数0.1进行交易,每笔交易平均带来100美元的利润。最终,总收益达到了592,300美元,在短短一年多的交易时间里,实现了5.923%的回报率。

看着这些数字,我一遍又一遍地检查代码。该系统采用了一套相当简单但有效的逻辑来确定“黄色”簇群——它分析波动率和成交量,并通过颜色强度指标计算它们之间的关系。当检测到簇群时,系统会以0.1手的固定交易量开仓,并设置1200点的止损和100点的止盈。

生成的资金曲线图保存在“equity_curve.png”文件中,显示出一条近乎完美的上升线,没有任何大幅回撤。我承认,这样的图表让人不禁思考,是否有必要在其他交易品种和时间周期上对该系统进行额外测试。

尽管这些结果看似惊人,但它们为我们进一步研究和优化该系统提供了绝佳的基础。深入探究簇群形成模式及其对价格走势的影响,或许值得一试。


系统信号人工复核

接下来,我搭建了以下验证工具:

import numpy as np
import pandas as pd
import MetaTrader5 as mt5
from datetime import datetime
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import MinMaxScaler
from scipy import stats
from pathlib import Path
import logging
import warnings
warnings.filterwarnings('ignore')

def setup_logging():
    logging.basicConfig(
        filename='3d_reversal.log',
        level=logging.DEBUG,
        format='%(asctime)s - %(levelname)s - %(message)s'
    )
    return logging.getLogger()

def create_3d_bars(symbol, timeframe, start_date, end_date, min_spread_multiplier=45, volume_brick=500):
    rates = mt5.copy_rates_range(symbol, timeframe, start_date, end_date)
    if rates is None:
        raise ValueError(f"Error getting data for {symbol}")
        
    df = pd.DataFrame(rates)
    df['time'] = pd.to_datetime(df['time'], unit='s')
    
    symbol_info = mt5.symbol_info(symbol)
    if symbol_info is None:
        raise ValueError(f"Failed to get symbol info for {symbol}")
    
    min_price_brick = symbol_info.spread * min_spread_multiplier * symbol_info.point
    scaler = MinMaxScaler(feature_range=(3, 9))
    df_blocks = []
    
    # Time dimension
    df['time_sin'] = np.sin(2 * np.pi * df['time'].dt.hour / 24)
    df['time_cos'] = np.cos(2 * np.pi * df['time'].dt.hour / 24)
    df['time_numeric'] = (df['time'] - df['time'].min()).dt.total_seconds()
    
    # Price dimension
    df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
    df['price_return'] = df['typical_price'].pct_change()
    df['price_acceleration'] = df['price_return'].diff()
    
    # Volume dimension
    df['volume_change'] = df['tick_volume'].pct_change()
    df['volume_acceleration'] = df['volume_change'].diff()
    
    # Volatility dimension
    df['volatility'] = df['price_return'].rolling(20).std()
    df['volatility_change'] = df['volatility'].pct_change()
    
    for idx in range(20, len(df)):
        window = df.iloc[idx-20:idx+1]
        
        block = {
            'time': df.iloc[idx]['time'],
            'time_numeric': scaler.fit_transform([[float(df.iloc[idx]['time_numeric'])]]).item(),
            'open': float(window['price_return'].iloc[-1]),
            'high': float(window['price_acceleration'].iloc[-1]),
            'low': float(window['volume_change'].iloc[-1]),
            'close': float(window['volatility_change'].iloc[-1]),
            'tick_volume': float(window['volume_acceleration'].iloc[-1]),
            'direction': np.sign(window['price_return'].iloc[-1]),
            'spread': float(df.iloc[idx]['time_sin']),
            'type': float(df.iloc[idx]['time_cos']),
            'trend_count': len(window),
            'price_change': float(window['price_return'].mean()),
            'volume_intensity': float(window['volume_change'].mean()),
            'price_velocity': float(window['price_acceleration'].mean())
        }
        df_blocks.append(block)

    result_df = pd.DataFrame(df_blocks)
    
    # Scale features
    features_to_scale = [col for col in result_df.columns if col != 'time' and col != 'direction']
    result_df[features_to_scale] = scaler.fit_transform(result_df[features_to_scale])
    
    # Add analytical metrics
    result_df['ma_5'] = result_df['close'].rolling(5).mean()
    result_df['ma_20'] = result_df['close'].rolling(20).mean()
    result_df['volume_ma_5'] = result_df['tick_volume'].rolling(5).mean()
    result_df['price_volatility'] = result_df['price_change'].rolling(10).std()
    result_df['volume_volatility'] = result_df['tick_volume'].rolling(10).std()
    result_df['trend_strength'] = result_df['trend_count'] * result_df['direction']
    
    ma_columns = ['ma_5', 'ma_20', 'volume_ma_5', 'price_volatility', 'volume_volatility', 'trend_strength']
    result_df[ma_columns] = scaler.fit_transform(result_df[ma_columns])
    
    result_df['zscore_price'] = stats.zscore(result_df['close'], nan_policy='omit')
    result_df['zscore_volume'] = stats.zscore(result_df['tick_volume'], nan_policy='omit')
    zscore_columns = ['zscore_price', 'zscore_volume']
    result_df[zscore_columns] = scaler.fit_transform(result_df[zscore_columns])
    
    return result_df, min_price_brick

def detect_reversal_pattern(df, window_size=20):
    df['reversal_score'] = 0.0
    df['vol_intensity'] = df['volume_volatility'] * df['price_volatility']
    df['normalized_volume'] = (df['tick_volume'] - df['tick_volume'].rolling(window_size).mean()) / df['tick_volume'].rolling(window_size).std()
    
    for i in range(window_size, len(df)):
        window = df.iloc[i-window_size:i]
        
        volume_spike = window['normalized_volume'].iloc[-1] > 2.0
        volatility_spike = window['vol_intensity'].iloc[-1] > window['vol_intensity'].mean() + 2*window['vol_intensity'].std()
        trend_pressure = window['trend_strength'].sum() / window_size
        momentum_change = window['momentum'].diff().iloc[-1] if 'momentum' in df.columns else 0
        
        df.loc[df.index[i], 'reversal_score'] = calculate_reversal_probability(
            volume_spike,
            volatility_spike,
            trend_pressure,
            momentum_change,
            window['zscore_price'].iloc[-1],
            window['zscore_volume'].iloc[-1]
        )
    return df

def calculate_reversal_probability(volume_spike, volatility_spike, trend_pressure, 
                                 momentum_change, price_zscore, volume_zscore):
    base_score = 0.0
    
    if volume_spike and volatility_spike:
        base_score += 0.4
    elif volume_spike or volatility_spike:
        base_score += 0.2
        
    base_score += min(0.3, abs(trend_pressure) * 0.1)
    
    if abs(momentum_change) > 0:
        base_score += 0.15 * np.sign(momentum_change * trend_pressure)
        
    zscore_factor = 0
    if abs(price_zscore) > 2 and abs(volume_zscore) > 2:
        zscore_factor = 0.15
        
    return min(1.0, base_score + zscore_factor)

import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

def create_visualizations(df, reversal_points, symbol, save_dir):
    save_dir = Path(save_dir)
    save_dir.mkdir(parents=True, exist_ok=True)
    
    for idx in reversal_points.index:
        start_idx = max(0, idx - 50)
        end_idx = min(len(df), idx + 50)
        window_df = df.iloc[start_idx:end_idx]
        
        # Create a figure with two subgraphs
        fig = plt.figure(figsize=(20, 10))
        
        # 3D chart
        ax1 = fig.add_subplot(121, projection='3d')
        scatter = ax1.scatter(
            np.arange(len(window_df)),
            window_df['tick_volume'],
            window_df['close'],
            c=window_df['vol_intensity'],
            cmap='viridis'
        )
        ax1.set_title(f'{symbol} 3D View at Reversal')
        plt.colorbar(scatter, ax=ax1)
        
        # Price chart
        ax2 = fig.add_subplot(122)
        ax2.plot(window_df['close'], color='blue', label='Close')
        ax2.scatter([idx - start_idx], [window_df.iloc[idx - start_idx]['close']], 
                   color='red', s=100, label='Reversal Point')
        ax2.set_title(f'{symbol} Price at Reversal')
        ax2.legend()
        
        plt.tight_layout()
        plt.savefig(save_dir / f'reversal_{idx}.png', dpi=300, bbox_inches='tight')
        plt.close()
        
        # Save data
        window_df.to_csv(save_dir / f'reversal_data_{idx}.csv')

def main():
    logger = setup_logging()
    
    try:
        if not mt5.initialize():
            raise RuntimeError("MetaTrader5 initialization failed")

        symbols = ["EURUSD"]
        timeframe = mt5.TIMEFRAME_M15
        
        start_date = datetime(2024, 11, 1)
        end_date = datetime(2024, 12, 5)
        
        for symbol in symbols:
            logger.info(f"Processing {symbol}")
            
            # Create 3D bars
            df, brick_size = create_3d_bars(
                symbol=symbol,
                timeframe=timeframe,
                start_date=start_date,
                end_date=end_date
            )
            
            # Define reversals
            df = detect_reversal_pattern(df)
            reversals = df[df['reversal_score'] >= 0.7].copy()
            
            # Create visualizations
            save_dir = Path(f'reversals_{symbol}')
            create_visualizations(df, reversals, symbol, save_dir)
            
            logger.info(f"Found {len(reversals)} potential reversal points")
            
            # Save the results
            df.to_csv(save_dir / f'{symbol}_analysis.csv')
            reversals.to_csv(save_dir / f'{symbol}_reversals.csv')
            
    except Exception as e:
        logger.error(f"Error occurred: {str(e)}", exc_info=True)
    finally:
        mt5.shutdown()

if __name__ == "__main__":
    main()

借助它,我们能够将点差和“黄色”簇群信息分别显示在一个独立文件夹中,同时还能导出到Excel文件里。其代码如下:

到目前为止,我面临的主要难题是难以预判反转的强度会有多大。是接下来3根K线就会出现反转?还是300根K线之后才会反转?我仍在努力解决此问题。


交易机器人代码及其关键组件

在回测结果令人印象深刻之后,我便开始着手编写交易机器人。我希望最大程度地保留基于历史数据能呈现出如此优异表现的逻辑。

import MetaTrader5 as mt5
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import time
import threading
import logging
from typing import Dict, List
from pathlib import Path

# Logger configuration
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('yellow_clusters_bot.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Settings
TERMINAL_PATH = ""
PAIRS = [
    'EURUSD.ecn', 'GBPUSD.ecn', 'USDJPY.ecn', 'USDCHF.ecn', 'AUDUSD.ecn', 'USDCAD.ecn',
    'NZDUSD.ecn', 'EURGBP.ecn', 'EURJPY.ecn', 'GBPJPY.ecn', 'EURCHF.ecn', 'AUDJPY.ecn',
    'CADJPY.ecn', 'NZDJPY.ecn', 'GBPCHF.ecn', 'EURAUD.ecn', 'EURCAD.ecn', 'GBPCAD.ecn',
    'AUDNZD.ecn', 'AUDCAD.ecn'
]

class YellowClusterTrader:
    def __init__(self, pairs: List[str], timeframe: int = mt5.TIMEFRAME_M15):
        self.pairs = pairs
        self.timeframe = timeframe
        self.positions = {}
        self._stop_event = threading.Event()
        
    def analyze_market(self, symbol: str) -> pd.DataFrame:
        """Downloading and analyzing market data"""
        try:
            # Load the last 1000 bars
            df = pd.DataFrame(mt5.copy_rates_from_pos(symbol, self.timeframe, 0, 1000))
            if df.empty:
                logger.warning(f"No data loaded for {symbol}")
                return None
                
            df['time'] = pd.to_datetime(df['time'], unit='s')

            # Basic calculations
            df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3
            df['price_return'] = df['typical_price'].pct_change()
            df['volatility'] = df['price_return'].rolling(20).std()
            df['direction'] = np.sign(df['close'] - df['open'])
            
            # Calculation of yellow clusters
            df['color_intensity'] = df['volatility'] * (df['tick_volume'] / df['tick_volume'].mean())
            df['is_yellow'] = df['color_intensity'] > df['color_intensity'].quantile(0.75)
            
            return df
        
        except Exception as e:
            logger.error(f"Error analyzing {symbol}: {str(e)}")
            return None

    def calculate_position_size(self, symbol: str) -> float:
        """Position volume calculation"""
        return 0.1  # Fixed size as in backtest

    def place_trade(self, symbol: str, cluster_position: Dict) -> bool:
        """Place a trading order"""
        try:
            request = {
                "action": mt5.TRADE_ACTION_DEAL,
                "symbol": symbol,
                "volume": cluster_position['size'],
                "type": mt5.ORDER_TYPE_BUY if cluster_position['direction'] > 0 else mt5.ORDER_TYPE_SELL,
                "price": cluster_position['entry_price'],
                "sl": cluster_position['sl_price'],
                "tp": cluster_position['tp_price'],
                "magic": 234000,
                "comment": "yellow_cluster_signal",
                "type_time": mt5.ORDER_TIME_GTC,
                "type_filling": mt5.ORDER_FILLING_IOC,
            }
            
            result = mt5.order_send(request)
            if result.retcode == mt5.TRADE_RETCODE_DONE:
                logger.info(f"Order placed successfully for {symbol}")
                return True
            else:
                logger.error(f"Order failed for {symbol}: {result.comment}")
                return False
                
        except Exception as e:
            logger.error(f"Error placing trade for {symbol}: {str(e)}")
            return False

    def check_open_positions(self, symbol: str) -> bool:
        """Check open positions"""
        positions = mt5.positions_get(symbol=symbol)
        return bool(positions)

    def trading_loop(self):
        """Main trading loop"""
        while not self._stop_event.is_set():
            try:
                for symbol in self.pairs:
                    # Skip if there is already an open position
                    if self.check_open_positions(symbol):
                        continue
                        
                    # Analyze the market
                    df = self.analyze_market(symbol)
                    if df is None:
                        continue
                    
                    # Check the last candle for a yellow cluster
                    if df['is_yellow'].iloc[-1]:
                        direction = 1 if df['close'].iloc[-1] > df['close'].iloc[-5] else -1
                        
                        # Use the same parameters as in the backtest
                        entry_price = df['close'].iloc[-1]
                        sl_price = entry_price - direction * 1200 * 0.0001  # 1200 pips stop
                        tp_price = entry_price + direction * 100 * 0.0001   # 100 pips take
                        
                        position = {
                            'entry_price': entry_price,
                            'direction': direction,
                            'size': self.calculate_position_size(symbol),
                            'sl_price': sl_price,
                            'tp_price': tp_price
                        }
                        
                        self.place_trade(symbol, position)
                
                # Pause between iterations
                time.sleep(15)
                
            except Exception as e:
                logger.error(f"Error in trading loop: {str(e)}")
                time.sleep(60)

    def start(self):
        """Launch a trading robot"""
        if not mt5.initialize(path=TERMINAL_PATH):
            logger.error("Failed to initialize MT5")
            return

        logger.info("Starting trading bot")
        logger.info(f"Trading pairs: {', '.join(self.pairs)}")
        
        self.trading_thread = threading.Thread(target=self.trading_loop)
        self.trading_thread.start()

    def stop(self):
        """Stop a trading robot"""
        logger.info("Stopping trading bot")
        self._stop_event.set()
        self.trading_thread.join()
        mt5.shutdown()
        logger.info("Trading bot stopped")

def main():
    # Create a directory for logs
    Path('logs').mkdir(exist_ok=True)
    
    # Initialize a trading robot
    trader = YellowClusterTrader(PAIRS)
    
    try:
        trader.start()
        
        # Keep the robot running until Ctrl+C is pressed
        while True:
            time.sleep(1)
            
    except KeyboardInterrupt:
        logger.info("Shutting down by user request")
        trader.stop()
    except Exception as e:
        logger.error(f"Critical error: {str(e)}")
        trader.stop()

if __name__ == "__main__":
    main()

首先,我添加了一个可靠的日志记录系统——当您使用真金白银进行交易时,记录系统的每一项操作都至关重要。所有日志都会写入一个文件,这让我们之后能够详细分析机器人的运行表现。

该机器人基于YellowClusterTrader类,可同时处理20种货币对。为什么正好是20种呢?测试结果表明,这个数量是最优的——既能提供足够的分散性,又不会使系统过载,还能让我们快速响应交易信号。

我特别关注了analyze_market方法。该方法会分析每种货币对最近的1000根K线——这些数据足以可靠地识别出“黄色”簇群。这里我采用了与回测相同的公式——通过波动率和标准化成交量的乘积来计算颜色强度。

我尤为自豪的是仓位控制机制。对于每种货币对,系统同一时间只支持一个未平仓头寸。这是经过长期实验得出的结论:事实证明,在已有头寸上再加仓,只会让结果变差。

我保持了与回测相同的入场参数:固定手数0.1,止损1200点,止盈100点。风险回报比相当不寻常,但正是这个数值在历史数据中展现出了如此高的效率。

一个有趣的解决方案是引入了多线程——机器人为交易启动一个单独的线程,这样主线程就能监控并处理用户指令。每次检查之间间隔15秒,确保系统负载最优。

我在错误处理上花了很多时间。每一项操作都在try-except块中——如果与交易终端的连接失败,系统会自动重启。用真钱交易可容不得半点代码疏忽。

订单下单方式特别值得一提。我采用了IOC(立即成交或取消)执行类型——这保证了我们要么能以请求的价格成交,要么订单会被取消。没有滑点,也没有重新报价。

为了便于控制,我添加了通过Ctrl+C平稳停止的功能。机器人会正确终止所有进程,关闭与交易终端的连接,并保存日志。这看似不起眼,但在实际工作中却非常实用。

该系统已经在真实账户上运行到第三周了。现在下最终结论还为时过早,但初步结果令人鼓舞——交易性质与我们在回测中看到的情况非常相似。尤其令人欣慰的是,该系统在所有20种货币对上都同样表现不俗,这证实了“黄色”簇群概念的普适性。

我们接下来的计划包括通过Telegram添加监控功能,以及根据特定货币对的波动性自动调整仓位大小。不过,这已经是下一篇文章的主题了。


实施风险价值(VaR)模型

在机器人基础版本运行几周后,我意识到固定0.1手的仓位大小并非最优。有些货币对在夜间波动过大,而有些则几乎不动。需要更灵活的方案。

解决方案出人意料。经过几个不眠之夜,一个想法诞生了——如果我们不仅用VaR来评估风险,还用它来动态分配各货币对之间的交易量呢?

class VarPositionManager:
    def __init__(self, target_var: float = 0.01, lookback_days: int = 30):
        self.target_var = target_var
        self.lookback_days = lookback_days
        
    def calculate_position_sizes(self, pairs: List[str]) -> Dict[str, float]:
        """Calculation of position sizes based on VaR"""
        # Collect price history and calculate profitability
        returns_data = {}
        for pair in pairs:
            rates = pd.DataFrame(mt5.copy_rates_from_pos(
                pair, 
                mt5.TIMEFRAME_D1,
                0,
                self.lookback_days
            ))
            if rates is not None and len(rates) > 0:
                returns_data[pair] = np.log(rates['close'] / rates['close'].shift(1))
        
        returns_df = pd.DataFrame(returns_data).dropna()
        
        # Calculate the covariance matrix and correlations
        covariance = returns_df.cov() * 252  # Annual covariance
        correlations = returns_df.corr()
        volatilities = returns_df.std() * np.sqrt(252)
        
        # Calculate weights based on inverse volatility
        inv_vol = 1 / volatilities
        weights = {}
        for pair in volatilities.index:
            # Correction for correlations
            corr_adjustment = 1.0
            for other_pair in volatilities.index:
                if pair != other_pair:
                    corr = correlations.loc[pair, other_pair]
                    if abs(corr) > 0.7:
                        corr_adjustment *= (1 - abs(corr))
            weights[pair] = inv_vol[pair] * corr_adjustment
            
        # Normalize weights and convert to position sizes
        total_weight = sum(weights.values())
        weights = {p: w/total_weight for p, w in weights.items()}
        
        account = mt5.account_info()
        position_sizes = {}
        for pair in pairs:
            symbol_info = mt5.symbol_info(pair)
            point_value = (symbol_info.point * 100 if 'JPY' in pair else symbol_info.point * 10000) * symbol_info.trade_contract_size
            
            # Base position size
            size = (self.target_var * account.equity * weights[pair]) / (volatilities[pair] * np.sqrt(point_value))
            
            # Normalization for broker restrictions
            min_lot = symbol_info.volume_min
            max_lot = symbol_info.volume_max
            step = symbol_info.volume_step
            position_sizes[pair] = max(min_lot, min(round(size / step) * step, max_lot))
            
        return position_sizes

代码的第一个版本相当简单——计算各货币对的波动率,并进行基本的权重分配。但测试次数越多,我就越清楚地意识到,必须考虑各货币对之间的相关性。对于日元交叉盘尤其如此,它们常常同步波动,导致某一方向上的风险敞口过大。

加入协方差矩阵后,代码变得复杂了许多,但结果令人满意。现在,系统会自动减少相关货币对的仓位规模,防止整个投资组合的风险超过指定水平。最重要的是,这一切都是动态发生的,能够适应市场条件的变化。

基于波动率倒数计算权重的过程特别有趣。起初,我采用简单的平均分配,但后来发现,波动率更高的货币对往往能给出更清晰的“黄色”簇群信号。然而,大仓位交易这些货币对风险很大。波动率倒数完美地解决了这一难题。

实施风险价值(VaR)模型需要对交易循环进行重大改写。现在,在每次扫描簇群之前,我们都要收集所有货币对的数据,构建协方差矩阵,并计算最优的手数分配。是的,这增加了CPU的负担,但现代计算机可以在毫秒内完成这些计算。

最困难的部分是正确调整权重,使其与实际仓位规模相匹配。这里,我们必须同时考虑不同货币对的点值成本,以及经纪商对最小和最大订单规模的限制。最终,我们得到了一个相当巧妙的方程,能自动将理论权重转化为实际仓位规模。

现在,在新版本运行一个月后,我可以自信地说,这一切都是值得的。回撤变得更加均匀,固定手数时常见的账户权益大幅波动现象消失了。最棒的是,系统已经真正实现了自适应,能够根据当前市场情况自动调整。

在不久的将来,我想根据检测到的簇群强度,动态调整目标VaR水平。按照我的理念,在形成特别强烈的交易模式时,我们可以允许系统承担稍多一点的风险。但这已经是下一个研究主题了。


进一步研究的前景

我在电脑前度过的无数不眠之夜没有白费。经过两个月的实盘交易和无数次参数实验,我终于看到了一些真正有希望改进系统的方向。在分析超过1万笔交易的日志时(实话实说,收集所有这些统计数据时几乎要把我逼疯了),我注意到了一些有趣的模式。

我记得有一个晚上。当我再次咒骂亚洲时段又一次的虚假信号时,我突然意识到一个显而易见的事实——入场参数应该取决于当前交易时段!亚洲时段流动性较低,产生了大量虚假信号,而我却一直在寻找通用设置。结果,我编写了一个针对不同交易时段设置不同过滤器的脚本,系统立刻焕发了生机。

簇群的微观结构是另一个令人头疼的问题。我已经开始研究小波分析了。初步结果令人鼓舞:看来簇群的内部结构确实包含了有关价格可能走势的信息。现在只需要弄清楚如何将其形式化。

我钻研得越深,出现的问题就越多。最重要的是不要骄傲自满,继续研究。毕竟,这正是交易能如此令人兴奋的原因。


结论

经过六个月的深入研究,我确信“黄色”簇群确实代表着一种独特的市场微观结构模式。这项始于三维可视化实验的研究,如今已发展为一个成果斐然的成熟交易系统。

主要发现在于这些特殊市场状况的形成模式。数学模型和实际交易结果均证实,检测到的“黄色”簇群中,有97%实际上预示着趋势反转。The implementation of the VaR model reduced the maximum drawdown by 31%, while the use of neural networks slashed the number of false signals by almost a half.

但技术层面的成功只是其中的一部分。对“黄色”簇群的研究开启了一种观察市场的新视角,揭示了市场数据流中存在的高阶结构。这些模式是传统技术分析无法捕捉的,但通过张量分析和机器学习的视角,却能完美呈现。

未来仍有许多工作要做——自适应相关性分析、微观结构的小波分析,以及将系统扩展至期货和期权领域。但可以明确的是,我们已经发现了一种市场微观结构的根本特性,它可能会改变我们对价格行为的理解。但这仅仅是开始!

本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/16580

最近评论 | 前往讨论 (9)
Aleksandr Grigorev
Aleksandr Grigorev | 4 1月 2025 在 20:03

非常有趣的文章,我从https://www.mql5.com/zh/articles/16580 开始就一直关注您的工作。

看来下一步是管理头寸的 TP/SL,以减少损失,增加利润?为此,完全可以连接跟踪止损/止赢,而不是 1200 点。

叶夫根尼-科什坚科(Yevgeniy Koshtenko),您在文章中提到了 63 点 - 这是所有货币对的平均波动深度,我没理解错吧?

mytarmailS
mytarmailS | 7 1月 2025 在 15:19
您能在价格图表上展示集群的反转和信号吗?
jorge luna
jorge luna | 8 7月 2025 在 09:06
我很想知道,如果这项工作是关于 3D 黄色集群的,您能否为 MT5 进行编码,以及您能否开发一个指标,让我们在实践中利用这项研究。感谢您分享您的知识。
aricchee
aricchee | 30 7月 2025 在 10:49
如果有任何指标或机器人,请通知我们!
Khai Cao
Khai Cao | 3 8月 2025 在 12:09
我们如何将这项研究应用到 MT5 交易中?请给我一些提示或步骤。谢谢你,兄弟!
外汇投资组合优化:风险价值理论与马科维茨理论的融合 外汇投资组合优化:风险价值理论与马科维茨理论的融合
外汇市场中的投资组合交易是如何运作的?我们如何将用于优化投资组合权重的马科维茨投资组合理论与用于优化投资组合风险的VaR模型结合起来?我们基于投资组合理论创建一个EA,一方面,我们将获得低风险;另一方面,获得可接受的长期盈利能力。
您应当知道的 MQL5 向导技术(第 47 部分):配合时态差异的强化学习 您应当知道的 MQL5 向导技术(第 47 部分):配合时态差异的强化学习
时态差异是强化学习中的另一种算法,它基于智顾训练期间预测和实际奖励之间的差异更新 Q-值。它专门驻守更新 Q-值,而不介意它们的状态-动作配对。因此,我们考察如何在向导汇编的智能系统中应用这一点,正如我们在之前文章中所做的那样。
MQL5 Algo Forge 入门 MQL5 Algo Forge 入门
我们正在推出 MQL5 Algo Forge —— 一个专为算法交易开发人员设计的门户网站。它将 Git 的强大功能与直观的界面相结合,用于管理和组织 MQL5 生态系统内的项目。在这里,您可以关注有趣的作者,组建团队,并在算法交易项目上进行协作。
从基础到中级:数组(三) 从基础到中级:数组(三)
在本文中,我们将介绍如何在 MQL5 中使用数组,包括如何使用数组在函数和过程之间传递信息。目的是为您准备在本系列后续材料中演示和解释的内容。因此,我强烈建议您仔细研究本文将展示的内容。