English Русский Español Deutsch 日本語 Português
preview
具有强化学习和灭绝失败个体的进化交易算法(ETARE)

具有强化学习和灭绝失败个体的进化交易算法(ETARE)

MetaTrader 5积分 |
199 7
Yevgeniy Koshtenko
Yevgeniy Koshtenko

概述

您知道进化、神经网络、和交易者有什么共同点吗?他们都会从错误中吸取教训。这正是我坐在终端前又一个不眠之夜后顿悟的,当时我的“完美”交易算法再次因意外的市场走势而令本金亏损。

我就像昨天一样记得那一天:2016 年 6 月 23 日,英国脱欧公投。我的算法基于经典的形态分析技术,自信地持有英镑的多头持仓。“所有的民意调查都表明,英国将继续留在欧盟!”,我也这般认为。凌晨 4 点莫斯科时间,当第一轮结果显示英国脱欧支持者获胜时,英镑在几分钟内暴跌了 1800 点。我的本金亏损了 40%。

2023 年 3 月,我开始开发 ETARE — 具有强化和灭绝(消除)的进化交易算法。为什么要灭绝?因为在自然界中,最强者生存。那为什么不将这一原则应用到交易策略呢?

您准备好进入经典技术分析与人工智能最新进展相遇的世界了吗?在达尔文自然选择中,每种交易策略都在为生存而奋斗?那么请系好安全带 — 继续前行会很有趣。因为您即将看到的,不光是另一款交易机器人。而是历经 15 年反复试错、数千小时编程,以及坦率地说,少量爆仓的结果。但最主要的,它是一款正在运营的系统,已为用户带来真正盈利。


系统架构

ETARE 的核心是混合架构,令人联想到现代量子计算机。还记得我们基于两条移动平均线交叉,为 MetaTrader 4 编写简单脚本的日子吗?在当时,这看似像是一次突破。现在回看,我意识到我们就像古代的水手,试图仅用指南针和星座就去横渡海洋。

2022 年崩盘后,很明显市场过于复杂,简单的方案无所适从。从那时起,我进入机器学习世界的旅程就开始了。

class HybridTrader:
    def __init__(self, symbols, population_size=50):
        self.population = []  # Population of strategies
        self.extinction_rate = 0.3  # Extinction rate
        self.elite_size = 5  # Elite individuals
        self.inefficient_extinction_interval = 5  # Cleaning interval

想象一群蚂蚁,每只蚂蚁都是一种交易策略。强壮的个体生存,并将基因遗传给后辈,而孱弱个体则消失。在我的系统中,由神经网络的权重比率来扮演基因的角色。

为什么 population_size=50?因为较少的策略无法提供足够的多元化,而太多策略则难以快速适应市场变化。

在自然界中,蚂蚁不断探索新的区域,寻找食物,并将信息传递给相关者。在 ETARE 中,每种策略还研究市场,成功的交易形态经由杂交机制传递给后代:

def _crossover(self, parent1, parent2):
    child = TradingIndividual(self.input_size)
    # Cross scales through a mask
    for attr in ['input_weights', 'hidden_weights', 'output_weights']:
        parent1_weights = getattr(parent1.weights, attr)
        parent2_weights = getattr(parent2.weights, attr)
        mask = np.random.random(parent1_weights.shape) < 0.5
        child_weights = np.where(mask, parent1_weights, parent2_weights)
        setattr(child.weights, attr, child_weights)
    return child

2024 年 12 月,在分析交易日志时,我注意到最成功的代码往往是其它成功方式的“混合体”。就像在自然界中,强大的基因会产生健康的后代一样,故在算法交易中,成功的形态能够结合起来,从而创造更有效的策略。

该系统的核心就是 LSTM 网络,这是一种拥有“记忆”的特殊类型神经网络。数月的各种架构试验,从简单的多层感知器、到复杂的变换器,我们最终安置在这种配置之上:

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(0.4)  # Protection from overfitting
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.dropout(out[:, -1, :])  # Use the last LSTM output
        out = self.fc(out)
        return out

每 100 笔交易,系统就会进行“清除”,无情地删除无利可图的策略。这是 ETARE 的关键机制之一,它的创建是一个单独的故事。我记得 2023 年 12 月的一个夜晚,我在分析交易日志时注意到一个令人惊讶的形态:大多数在前 100-150 笔交易中出现亏损的策略,以后仍无利可图。这一观察彻底改变了系统架构:

def _inefficient_extinction_event(self):
    """Periodic extinction of inefficient individuals"""
    initial_size = len(self.population)
    
    # Analyze efficiency of each strategy
    performance_metrics = []
    for individual in self.population:
        metrics = {
            'profit_factor': individual.total_profit / abs(individual.max_drawdown) if individual.max_drawdown != 0 else 0,
            'win_rate': len([t for t in individual.trade_history if t.profit > 0]) / len(individual.trade_history) if individual.trade_history else 0,
            'risk_adjusted_return': individual.total_profit / individual.volatility if individual.volatility != 0 else 0
        }
        performance_metrics.append(metrics)
    
    # Remove unprofitable strategies taking into account a comprehensive assessment
    self.population = [ind for ind, metrics in zip(self.population, performance_metrics)
                      if metrics['profit_factor'] > 1.5 or metrics['win_rate'] > 0.6]
    
    # Create new individuals with improved initialization
    while len(self.population) < initial_size:
        new_individual = TradingIndividual(self.input_size)
        new_individual.mutate()  # Random mutations
        
        # Inherit successful patterns
        if len(self.population) > 0:
            parent = random.choice(self.population)
            new_individual.inherit_patterns(parent)
            
        self.population.append(new_individual)

交易决策数据库充当系统记忆。每一个决定、每一个结果 — 所有一切都被记录下来,以供以后分析:

def _save_to_db(self):
    with self.conn:
        self.conn.execute('DELETE FROM population')
        for individual in self.population:
            data = {
                'weights': individual.weights.to_dict(),
                'fitness': individual.fitness,
                'profit': individual.total_profit
            }
            self.conn.execute(
                'INSERT INTO population (data) VALUES (?)',
                (json.dumps(data),)
            )

整个复杂的机制作为一个单一的有机体运营,持续进化,并适应市场变化。在高波动期间,例如 VIX 超过 25 时,系统会自动提升策略的可靠性需求。在平静期当中,它变得更加激进,允许用户尝试新的交易形态。



强化学习机制

开发交易机器人有一个悖论:算法越复杂,它在真实市场上的表现就越糟糕。 

这就是为什么我们专注于 ETARE 学习机制的简单性和透明度。历经两年试验不同架构,我们得出了一个优先记忆系统:

class RLMemory:
    def __init__(self, capacity=10000):
        self.memory = deque(maxlen=capacity)
        self.priorities = deque(maxlen=capacity)
        
    def add(self, state, action, reward, next_state):
        priority = max(self.priorities) if self.priorities else 1.0
        self.memory.append((state, action, reward, next_state))
        self.priorities.append(priority)

每一个交易决策都不仅仅是一次入场,更是风险与潜在回报之间的复杂平衡。看看系统如何从其决策中学习:

def update(self, state, action, reward, next_state):
    self.memory.add(state, action, reward, next_state)
    self.total_profit += reward

    if len(self.memory.memory) >= 32:
        batch = self.memory.sample(32)
        self._train_on_batch(batch)

我亏损过多次,因为模型无法适应不同的市场状况。就在那时,自适应学习的思路诞生了。现在,系统分析每笔业务,并调整其行为:

def _calculate_confidence(self, prediction, patterns):
    # Baseline confidence from ML model
    base_confidence = abs(prediction - 0.5) * 2
    
    # Consider historical experience
    pattern_confidence = self._get_pattern_confidence(patterns)
    
    # Dynamic adaptation to the market
    market_volatility = self._get_current_volatility()
    return (base_confidence * 0.7 + pattern_confidence * 0.3) / market_volatility

关键点是,系统不光记住成功的交易;它还学习理解为什么它们会成功。通过 PyTorch 中实现的多层反向传播架构,这成为可能:

def _train_on_batch(self, batch):
    states = torch.FloatTensor(np.array([x[0] for x in batch]))
    actions = torch.LongTensor(np.array([x[1].value for x in batch]))
    rewards = torch.FloatTensor(np.array([x[2] for x in batch]))
    next_states = torch.FloatTensor(np.array([x[3] for x in batch]))
    
    current_q = self.forward(states).gather(1, actions.unsqueeze(1))
    next_q = self.forward(next_states).max(1)[0].detach()
    target = rewards + self.gamma * next_q
    
    loss = self.criterion(current_q.squeeze(), target)
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

如是结果,我们得到一个系统,并非从理想的回测中学习,而是来自实盘交易经验。在涵盖过去一年的实盘市场测试中,ETARE 已证明了其适应多变市场条件的能力,从平静的趋势到高度波动的时期。

但最重要的是,这个系统还在继续进化。每笔交易,每一次市场循环,它都会变得更聪明一点、更高效一点。正如我们的一位内测人员所说,“这是我第一次看到一种算法能够真正从错误中吸取教训,而不仅仅是调整参数去拟合历史数据。


孱弱个体灭绝的机制

查尔斯·达尔文从未在金融市场上交易过,但他的进化论对成功交易策略的动态提供了非凡的描述。在自然界中,并非最强壮或最快的个体才能生存 ,而是那些最能适应环境变化的个体。同样的事情也发生在市场中。

历史上获知许多案例,在第一只黑天鹅之后,“完美”的交易算法被灭绝了。在 2015 年,当瑞士国家银行将瑞士法郎与欧元脱钩时,我损失了可观的本金。我的算法于那一刻被证明针对这样的事件完全束手无策。这令我思考:为什么大自然能够成功地对付黑天鹅数百万年,而我们的算法却不行?

答案出乎意料,是在阅读《物种起源论》一书时。达尔文描述了在气候突然变化时期,幸存下来的不是最特殊的物种,留下的都是具备适应能力的那些。正是这一原则构成了 ETARE 中灭绝机制的基础:

def _inefficient_extinction_event(self):
    """Periodic extinction of inefficient individuals"""
    initial_population = len(self.population)
    market_conditions = self._analyze_market_state()
    
    # Assessing the adaptability of each strategy
    adaptability_scores = []
    for individual in self.population:
        score = self._calculate_adaptability(
            individual, 
            market_conditions
        )
        adaptability_scores.append(score)
    
    # Dynamic survival threshold
    survival_threshold = np.percentile(
        adaptability_scores, 
        30  # The bottom 30% of the population is dying out
    )
    
    # Merciless extinction
    survivors = []
    for ind, score in zip(self.population, adaptability_scores):
        if score > survival_threshold:
            survivors.append(ind)
    
    self.population = survivors
    
    # Restore population through mutations and crossbreeding
    while len(self.population) < initial_population:
        if len(self.population) >= 2:
            # Crossbreeding of survivors
            parent1 = self._tournament_selection()
            parent2 = self._tournament_selection()
            child = self._crossover(parent1, parent2)
        else:
            # Create a new individual 
            child = TradingIndividual(self.input_size)
        
        # Mutations for adaptation
        child.mutate(market_conditions.volatility)
        self.population.append(child)

正如在自然界中,大规模灭绝时期会导致新的、更先进的物种出现,故在我们的系统中,高波动时期成为策略进化的催化剂。看看自然选择的机制:

def _extinction_event(self):
    # Analyze market conditions
    market_phase = self._identify_market_phase()
    volatility = self._calculate_market_volatility()
    trend_strength = self._measure_trend_strength()
    
    # Adaptive sorting by survival
    def fitness_score(individual):
        return (
            individual.profit_factor * 0.4 +
            individual.sharp_ratio * 0.3 +
            individual.adaptability_score * 0.3
        ) * (1 + individual.correlation_with_market)
    
    self.population.sort(
        key=fitness_score, 
        reverse=True
    )
    
    # Preserve elite with diversity in mind
    elite_size = max(
        5, 
        int(len(self.population) * 0.1)
    )
    survivors = self.population[:elite_size]
    
    # Create a new generation
    while len(survivors) < self.population_size:
        if random.random() < 0.8:  # 80% crossover
            # Tournament selection of parents
            parent1 = self._tournament_selection()
            parent2 = self._tournament_selection()
            
            # Crossbreeding considering account market conditions
            child = self._adaptive_crossover(
                parent1, 
                parent2, 
                market_phase
            )
        else:  # 20% elite mutation
            # Clone with mutations
            template = random.choice(survivors[:3])
            child = self._clone_with_mutations(
                template,
                volatility,
                trend_strength
            )
        survivors.append(child)

我们特别注意到适应度评估机制。在自然界中,这是个体产生有活力后代的能力;在我们的例子中,它是策略在各种市场条件下产生盈利的能力:

def evaluate_fitness(self, individual):
    # Basic metrics
    profit_factor = individual.total_profit / max(
        abs(individual.total_loss), 
        1e-6
    )
    
    # Resistance to drawdowns
    max_dd = max(individual.drawdown_history) if individual.drawdown_history else 0
    drawdown_resistance = 1 / (1 + max_dd)
    
    # Profit sequence analysis
    profit_sequence = [t.profit for t in individual.trade_history[-50:]]
    consistency = self._analyze_profit_sequence(profit_sequence)
    
    # Correlation with the market
    market_correlation = self._calculate_market_correlation(
        individual.trade_history
    )
    
    # Adaptability to changes
    adaptability = self._measure_adaptability(
        individual.performance_history
    )
    
    # Comprehensive assessment
    fitness = (
        profit_factor * 0.3 +
        drawdown_resistance * 0.2 +
        consistency * 0.2 +
        (1 - abs(market_correlation)) * 0.1 +
        adaptability * 0.2
    )
    
    return fitness

这就是生存策略的突变如何发生的。这个过程令人想起自然界中的基因突变,DNA 的随机变化有时会导致更具活力的有机体出现:

def mutate(self, market_conditions):
    """Adaptive mutation considering market conditions"""
    # Dynamic adjustment of mutation strength
    self.mutation_strength = self._calculate_mutation_strength(
        market_conditions.volatility,
        market_conditions.trend_strength
    )
    
    if np.random.random() < self.mutation_rate:
        # Mutation of neural network weights
        for weight_matrix in [
            self.weights.input_weights,
            self.weights.hidden_weights,
            self.weights.output_weights
        ]:
            # Mutation mask with adaptive threshold
            mutation_threshold = 0.1 * (
                1 + market_conditions.uncertainty
            )
            mask = np.random.random(weight_matrix.shape) < mutation_threshold
            
            # Volatility-aware mutation generation
            mutations = np.random.normal(
                0,
                self.mutation_strength * market_conditions.volatility,
                size=mask.sum()
            )
            
            # Apply mutations
            weight_matrix[mask] += mutations
            
        # Mutation of hyperparameters
        if random.random() < 0.3:  # 30% chance
            self._mutate_hyperparameters(market_conditions)

有趣的是,在系统的某些版本中,在市场波动较大期间,系统会自动提升突变的密度。这令人想起一些细菌在压力条件下如何加速突变。在我们的例子中:

def _calculate_mutation_strength(self, volatility, trend_strength):
    """Calculate mutation strength based on market conditions"""
    base_strength = self.base_mutation_strength
    
    # Mutation enhancement under high volatility
    volatility_factor = 1 + (volatility / self.average_volatility - 1)
    
    # Weaken mutations in a strong trend
    trend_factor = 1 / (1 + trend_strength)
    
    # Mutation total strength
    mutation_strength = (
        base_strength * 
        volatility_factor * 
        trend_factor
    )
    
    return np.clip(
        mutation_strength,
        self.min_mutation_strength,
        self.max_mutation_strength
    )

种群多样化的机制尤为重要。在自然界中,遗传多样性是物种生存的关键。在 ETARE 中,我们实现了类似的原则:

def _maintain_population_diversity(self):
    """ Maintain diversity in the population"""
    # Calculate the strategy similarity matrix
    similarity_matrix = np.zeros(
        (len(self.population), len(self.population))
    )
    
    for i, ind1 in enumerate(self.population):
        for j, ind2 in enumerate(self.population[i+1:], i+1):
            similarity = self._calculate_strategy_similarity(ind1, ind2)
            similarity_matrix[i,j] = similarity_matrix[j,i] = similarity
    
    # Identify clusters of similar strategies
    clusters = self._identify_strategy_clusters(similarity_matrix)
    
    # Forced diversification when necessary
    for cluster in clusters:
        if len(cluster) > self.max_cluster_size:
            # We leave only the best strategies in the cluster
            survivors = sorted(
                cluster,
                key=lambda x: x.fitness,
                reverse=True
            )[:self.max_cluster_size]
            
            # Replace the rest with new strategies
            for idx in cluster[self.max_cluster_size:]:
                self.population[idx] = TradingIndividual(
                    self.input_size,
                    mutation_rate=self.high_mutation_rate
                )

结果呢?这个系统不光进行交易,且随市场进化。正如达尔文所说,生存下来的不是最强的,而是最具适应性的。在算法交易世界,这一点比任何其它都更重要。


交易决策数据库

维护交易经验与获得交易经验同样重要。多年来,我一直与算法系统打交道,我一再相信,如果没有可靠的数据库,任何交易系统迟早都会“忘记”其最佳策略。在 ETARE 中,我们为交易决策实现了多级存储:

def _create_tables(self):
    """ Create a database structure"""
    with self.conn:
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS population (
                id INTEGER PRIMARY KEY,
                individual TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                last_update TIMESTAMP
            )
        ''')
        
        self.conn.execute('''
            CREATE TABLE IF NOT EXISTS history (
                id INTEGER PRIMARY KEY,
                generation INTEGER,
                individual_id INTEGER,
                trade_history TEXT,
                market_conditions TEXT,
                FOREIGN KEY(individual_id) REFERENCES population(id)
            )
        ''')

每笔交易、每一次决策,即使看似微不足道的决策,都会成为系统集体经验的一部分。以下是我们在每笔交易循环后保存数据的方法:

def _save_to_db(self):
    try:
        with self.conn:
            self.conn.execute('DELETE FROM population')
            for individual in self.population:
                individual_data = {
                    'weights': {
                        'input_weights': individual.weights.input_weights.tolist(),
                        'hidden_weights': individual.weights.hidden_weights.tolist(),
                        'output_weights': individual.weights.output_weights.tolist(),
                        'hidden_bias': individual.weights.hidden_bias.tolist(),
                        'output_bias': individual.weights.output_bias.tolist()
                    },
                    'fitness': individual.fitness,
                    'total_profit': individual.total_profit,
                    'trade_history': list(individual.trade_history),
                    'market_metadata': self._get_market_conditions()
                }
                self.conn.execute(
                    'INSERT INTO population (individual) VALUES (?)', 
                    (json.dumps(individual_data),)
                )
    except Exception as e:
        logging.error(f"Error saving population: {str(e)}")

即使在严重的服务器故障之后,归因于详细的日志和备份,整个系统在几分钟内也能恢复。以下是恢复机制如何工作的原理:

def _load_from_db(self):
    """Load population from database"""
    try:
        cursor = self.conn.execute('SELECT individual FROM population')
        rows = cursor.fetchall()
        for row in rows:
            individual_data = json.loads(row[0])
            individual = TradingIndividual(self.input_size)
            individual.weights = GeneticWeights(**individual_data['weights'])
            individual.fitness = individual_data['fitness']
            individual.total_profit = individual_data['total_profit']
            individual.trade_history = deque(
                individual_data['trade_history'], 
                maxlen=1000
            )
            self.population.append(individual)
    except Exception as e:
        logging.error(f"Error loading population: {str(e)}")

我们将特别关注历史数据的分析。每个成功的策略都会留下痕迹,可用于改进未来决策:

def analyze_historical_performance(self):
    """ Historical performance analysis"""
    query = '''
        SELECT h.*, p.individual 
        FROM history h 
        JOIN population p ON h.individual_id = p.id 
        WHERE h.generation > ? 
        ORDER BY h.generation DESC
    '''
    
    cursor = self.conn.execute(query, (self.generation - 100,))
    performance_data = cursor.fetchall()
    
    # Analyze patterns of successful strategies
    success_patterns = defaultdict(list)
    for record in performance_data:
        trade_data = json.loads(record[3])
        if trade_data['profit'] > 0:
            market_conditions = json.loads(record[4])
            key_pattern = self._extract_key_pattern(market_conditions)
            success_patterns[key_pattern].append(trade_data)
    
    return success_patterns

ETARE 数据库不仅是一个信息存储设施,更是系统真正的“大脑”,能够分析过去,并预测未来。正如我的老导师常说的那样:“没有记忆的交易系统就像一个没有经验的交易者:他每天都从草案开始”。


数据和函数

在从事算法交易的这些年中,我尝试了数百种指标组合。有一次,我的交易系统使用了 50 多种不同的指标,从经典的 RSI 到我自己设计的奇异指标。但您知道我在又一次亏空本金后意识到了什么吗?这不是关于数量,而是关于恰当的数据处理。

我记得英国脱欧期间发生的一件事:一个有几十个指标的系统,由于信号冲突而无法做出决定,直接“宕机”了。这就是 ETARE 的思路诞生的时候 — 使用最少必要指标集,但以智能方式处理它们的系统。

def prepare_features(data: pd.DataFrame) -> pd.DataFrame:
    """Prepare features for analysis"""
    df = data.copy()

    # RSI - as an overbought/oversold detector
    delta = df['close'].diff()
    gain = delta.where(delta > 0, 0).rolling(14).mean()
    loss = -delta.where(delta < 0, 0).rolling(14).mean()
    rs = gain / loss
    df['rsi'] = 100 - (100 / (1 + rs))
我们系统中的 RSI 不仅仅是一个超买/超卖指标。我们将其用作市场情绪综合分析的一部分。它与 MACD 结合使用特别有效:
# MACD - to determine the trend
    exp1 = df['close'].ewm(span=12, adjust=False).mean()
    exp2 = df['close'].ewm(span=26, adjust=False).mean()
    df['macd'] = exp1 - exp2
    df['macd_signal'] = df['macd'].ewm(span=9, adjust=False).mean()
    df['macd_hist'] = df['macd'] - df['macd_signal']

布林带是我们的波动率“雷达”。  

# Bollinger Bands with adaptive period
    volatility = df['close'].rolling(50).std()
    adaptive_period = int(20 * (1 + volatility.mean()))
    
    df['bb_middle'] = df['close'].rolling(adaptive_period).mean()
    df['bb_std'] = df['close'].rolling(adaptive_period).std()
    df['bb_upper'] = df['bb_middle'] + 2 * df['bb_std']
    df['bb_lower'] = df['bb_middle'] - 2 * df['bb_std']

一个单独故事是对波动性和动量的分析。 

# Momentum - market "temperature"
    df['momentum'] = df['close'] / df['close'].shift(10)
    df['momentum_ma'] = df['momentum'].rolling(20).mean()
    df['momentum_std'] = df['momentum'].rolling(20).std()
    
    # Volatility is our "seismograph"
    df['atr'] = df['high'].rolling(14).max() - df['low'].rolling(14).min()
    df['price_change'] = df['close'].pct_change()
    df['price_change_abs'] = df['price_change'].abs()
    
    # Volume volatility
    df['volume_volatility'] = df['tick_volume'].rolling(20).std() / df['tick_volume'].rolling(20).mean()

ETARE 中的成交量分析不仅仅是即刻报价计数。我们开发了一种专用算法来检测异常交易量,有助于预测强劲走势:

# Volume analysis - market "pulse"
    df['volume_ma'] = df['tick_volume'].rolling(20).mean()
    df['volume_std'] = df['tick_volume'].rolling(20).std()
    df['volume_ratio'] = df['tick_volume'] / df['volume_ma']
    
    # Detection of abnormal volumes
    df['volume_spike'] = (
        df['tick_volume'] > df['volume_ma'] + 2 * df['volume_std']
    ).astype(int)
    
    # Cluster analysis of volumes
    df['volume_cluster'] = (
        df['tick_volume'].rolling(3).sum() / 
        df['tick_volume'].rolling(20).sum()
    )

最后触及的是数据归一化化。这是许多人低估的关键一步。

# Normalization considering market phases
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        # Adaptive normalization
        rolling_mean = df[col].rolling(100).mean()
        rolling_std = df[col].rolling(100).std()
        df[col] = (df[col] - rolling_mean) / (rolling_std + 1e-8)
    
    # Removing outliers
    df = df.clip(-4, 4)  # Limit values to the range [-4, 4]
    
    return df

ETARE 中的每个指标不仅仅是一个数字,而是市场分析复杂拼图的一部分。系统持续适应市场变化,根据当前状况调整每个指标的权重。在以下章节中,我们将看到这些数据如何翻译到实际的交易决策。


交易逻辑

我向您表述一个体现尖端算法交易技术创新交易系统的描述。该系统基于结合了遗传优化、机器学习、以及高级风险管理的混合方式。

该系统的核心是一个持续运作的交易循环,它持续分析市场条件,并适应它们。就像自然进化一样,该系统会定期“清理”无效的交易策略,给新的、更有前景的途径让道。每 50 笔交易就会发生这种情况,确保交易算法的持续改进。

考虑到独特的特征,每种交易金融产品都单独处理。该系统分析最近 100 根蜡烛的历史数据,从而能够准确了解当前市场状态。基于该分析,就可做出明智的开仓和平仓决策。

特别关注持仓均摊策略(DCA)。当新开仓时,系统会自动降低其交易量,从 0.1 手开始,逐渐减少到最小值 0.01 手。这样就可以有效管理风险,并最大化潜在盈利。

平仓的过程也是经过深思熟虑的。系统监控每笔持仓的盈利能力,并在达到指定的盈利水平时平仓。在这种情况下,买入和卖出的仓位是分开处理的,这令投资组合管理更加灵活。交易获得的奖励或惩罚是深入成功学习的关键。 

有关交易操作和系统状态的所有信息都存储在数据库之中,提供了执行细节分析、和优化策略的能力。这为进一步改进交易算法奠定了坚实的基础。

    def _process_individual(self, symbol: str, individual: TradingIndividual, current_state: np.ndarray):
        """Handle trading logic for an individual using DCA and split closing by profit"""
        try:
            positions = individual.open_positions.get(symbol, [])

            if not positions:  # Open a new position
                action, _ = individual.predict(current_state)
                if action in [Action.OPEN_BUY, Action.OPEN_SELL]:
                    self._open_position(symbol, individual, action)
            else:  # Manage existing positions
                current_price = mt5.symbol_info_tick(symbol).bid

                # Close positions by profit
                self._close_positions_by_profit(symbol, individual, current_price)

                # Check for the need to open a new position by DCA
                if len(positions) < self.max_positions_per_pair:
                    action, _ = individual.predict(current_state)
                    if action in [Action.OPEN_BUY, Action.OPEN_SELL]:
                        self._open_dca_position(symbol, individual, action, len(positions))

        except Exception as e:
            logging.error(f"Error processing individual: {str(e)}")

    def _open_position(self, symbol: str, individual: TradingIndividual, action: Action):
        """Open a position"""
        try:
            volume = 0.1
            price = mt5.symbol_info_tick(symbol).ask if action == Action.OPEN_BUY else mt5.symbol_info_tick(symbol).bid

            request = {
                "action": mt5.TRADE_ACTION_DEAL,
                "symbol": symbol,
                "volume": volume,
                "type": mt5.ORDER_TYPE_BUY if action == Action.OPEN_BUY else mt5.ORDER_TYPE_SELL,
                "price": price,
                "deviation": 20,
                "magic": 123456,
                "comment": f"Gen{self.generation}",
                "type_time": mt5.ORDER_TIME_GTC,
                "type_filling": mt5.ORDER_FILLING_FOK,
            }

            result = mt5.order_send(request)
            if result and result.retcode == mt5.TRADE_RETCODE_DONE:
                trade = Trade(symbol=symbol, action=action, volume=volume,
                              entry_price=result.price, entry_time=time.time())
                if symbol not in individual.open_positions:
                    individual.open_positions[symbol] = []
                individual.open_positions[symbol].append(trade)

        except Exception as e:
            logging.error(f"Error opening position: {str(e)}")

    def _open_dca_position(self, symbol: str, individual: TradingIndividual, action: Action, position_count: int):
        """Open a position using the DCA strategy"""
        try:
            # Basic volume
            base_volume = 0.1  # Initial volume in lots
            # Reduce the volume by 0.01 lot for each subsequent position
            volume = max(0.01, base_volume - (position_count * 0.01))  # Minimum volume of 0.01 lots
            price = mt5.symbol_info_tick(symbol).ask if action == Action.OPEN_BUY else mt5.symbol_info_tick(symbol).bid

            request = {
                "action": mt5.TRADE_ACTION_DEAL,
                "symbol": symbol,
                "volume": volume,
                "type": mt5.ORDER_TYPE_BUY if action == Action.OPEN_BUY else mt5.ORDER_TYPE_SELL,
                "price": price,
                "deviation": 20,
                "magic": 123456,
                "comment": f"Gen{self.generation} DCA",
                "type_time": mt5.ORDER_TIME_GTC,
                "type_filling": mt5.ORDER_FILLING_FOK,
            }

            result = mt5.order_send(request)
            if result and result.retcode == mt5.TRADE_RETCODE_DONE:
                trade = Trade(symbol=symbol, action=action, volume=volume,
                              entry_price=result.price, entry_time=time.time())
                if symbol not in individual.open_positions:
                    individual.open_positions[symbol] = []
                individual.open_positions[symbol].append(trade)

        except Exception as e:
            logging.error(f"Error opening DCA position: {str(e)}")

    def _close_positions_by_profit(self, symbol: str, individual: TradingIndividual, current_price: float):
        """Close positions by profit separately for Buy and Sell"""
        try:
            positions = individual.open_positions.get(symbol, [])
            buy_positions = [pos for pos in positions if pos.action == Action.OPEN_BUY]
            sell_positions = [pos for pos in positions if pos.action == Action.OPEN_SELL]

            # Close Buy positions
            for position in buy_positions:
                profit = calculate_profit(position, current_price)
                if profit >= self.min_profit_pips:
                    self._close_position(symbol, individual, position)

            # Close Sell positions
            for position in sell_positions:
                profit = calculate_profit(position, current_price)
                if profit >= self.min_profit_pips:
                    self._close_position(symbol, individual, position)

        except Exception as e:
            logging.error(f"Error closing positions by profit: {str(e)}")

    def _close_position(self, symbol: str, individual: TradingIndividual, position: Trade):
        """Close a position with a model update"""
        try:
            close_type = mt5.ORDER_TYPE_SELL if position.action == Action.OPEN_BUY else mt5.ORDER_TYPE_BUY
            price = mt5.symbol_info_tick(symbol).bid if close_type == mt5.ORDER_TYPE_SELL else mt5.symbol_info_tick(symbol).ask

            request = {
                "action": mt5.TRADE_ACTION_DEAL,
                "symbol": symbol,
                "volume": position.volume,
                "type": close_type,
                "price": price,
                "deviation": 20,
                "magic": 123456,
                "comment": "Close",
                "type_time": mt5.ORDER_TIME_GTC,
                "type_filling": mt5.ORDER_FILLING_FOK,
            }

            result = mt5.order_send(request)
            if result and result.retcode == mt5.TRADE_RETCODE_DONE:
                position.is_open = False
                position.exit_price = result.price
                position.exit_time = time.time()
                position.profit = calculate_profit(position, result.price)
                
                # Generate data for training
                trade_data = {
                    'symbol': symbol,
                    'action': position.action,
                    'entry_price': position.entry_price,
                    'exit_price': position.exit_price,
                    'volume': position.volume,
                    'profit': position.profit,
                    'holding_time': position.exit_time - position.entry_time
                }
                
                # Update the model with new data
                individual.model.update(trade_data)
                
                # Save history and update open positions
                individual.trade_history.append(position)
                individual.open_positions[symbol].remove(position)
                
                # Log training results
                logging.info(f"Model updated with trade data: {trade_data}")

        except Exception as e:
            logging.error(f"Error closing position: {str(e)}")

def main():
    symbols = ['EURUSD.ecn', 'GBPUSD.ecn', 'USDJPY.ecn', 'AUDUSD.ecn']
    trader = HybridTrader(symbols)
    trader.run_trading_cycle()

if __name__ == "__main__":
    main()

其结果是一个可靠的、自学习交易系统,能够在各种市场条件下高效运行。进化算法、机器学习、和经过验证的交易策略的结合,令其成为当代交易的强大工具。


结束语

总结,我想强调的是,ETARE 不仅仅是另一种交易算法,而是算法交易多年进化的结果。该系统结合了来自各领域的最佳实践:适应不断变化的市场条件的遗传算法、用于决策的深度学习、以及经典的风险管理方法。

ETARE 的独特之处在于它能够不断从自己的经验中学习。每笔交易,无论结果如何,都会成为系统集体记忆的一部分,有助于改进未来的交易决策。受达尔文进化论启发的交易策略自然选择机制,确保只有最有效的方式才能生存。

在开发和测试过程中,该系统已经证明了其在各种市场条件下的弹性,从平静的趋势走势、到高度波动的时期。特别重要的是要注意 DCA 策略的效率和单独平仓的机制,这令我们能够在控制风险水平的同时,实现盈利最大化。

现在,关于效率。我会直截了当地说:ETARE 主模块本身对我来说并不重要。它作为一个模块,集成到更广泛的 Midas 交易生态系统当中。


Midas 目前有 24 个模块,包括这个模块。复杂性将稳步增加,我将在以后的文章中描述更多内容。 


算法交易的未来恰恰在于这种能够随市场一起发展的自适应系统。ETARE 是朝着这个方向迈出的一步,展示了如何应用现代技术来创建可靠、且可盈利的交易解决方案。

本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/16971

附加的文件 |
ETARE_module.py (34.39 KB)
最近评论 | 前往讨论 (7)
Pegaso
Pegaso | 9 10月 2025 在 08:17
这是一种引人入胜的方法,感谢作者的贡献。不过,代码只是一个 Python 类,没有 EA 和 DBMS 就无法使用。我希望作者将来能为我们提供一个可运行的系统,或者至少提供一些指导,以便我们实施和试验他的进化方法。无论如何,谢谢你。
Martino Hart
Martino Hart | 10 10月 2025 在 18:16

你好,来自印度尼西亚的问候、

我看了你的算法,觉得文章很不错。

,我能得到你的Github 链接吗?先谢谢了

xiaomaozai
xiaomaozai | 13 11月 2025 在 01:11
您好,请问您能提供下python的MetaTrader5包 么 谢谢
Rashid Umarov
Rashid Umarov | 13 11月 2025 在 08:02
xiaomaozai #:
您好,能否提供用于 python 的 MetaTrader5 软件包?
https://www.mql5.com/zh/docs/python_metatrader5
Hong Wei Dan
Hong Wei Dan | 13 11月 2025 在 11:31
将进化理论应用于策略编写,灭绝错误,发挥优势,最终会进化成一个怎样的水平?很期待。
市场模拟(第三部分):性能问题 市场模拟(第三部分):性能问题
我们经常需要后退一步,然后继续前进。在本文中,我们将展示所有必要的更改,以确保鼠标和 Chart Trade 指标不会中断。作为奖励,我们还将介绍未来将广泛使用的其他头文件中发生的其他更改。
价格行为分析工具包开发(第 17 部分):TrendLoom EA 工具 价格行为分析工具包开发(第 17 部分):TrendLoom EA 工具
作为一名价格行为的观察者和交易者,我注意到当一个趋势得到多个时间周期的确认时,它通常会朝着该方向延续。可能不同的是趋势持续的时间,而这取决于您是哪种类型的交易者,无论是长期持仓还是从事剥头皮交易。您为确认所选的时间周期起着至关重要的作用。读这篇文章,了解一个快速、自动化的系统,只需点击一下按钮或通过定期更新,就能帮助您分析不同时间周期的整体趋势。
MQL5自动化交易策略(第十一部分):开发多层级网格交易系统 MQL5自动化交易策略(第十一部分):开发多层级网格交易系统
在本文中,我们将使用MQL5开发一款多层级网格交易系统EA,重点探讨网格交易策略背后的架构与算法设计。我们将研究多层网格逻辑的实现方式以及应对不同市场状况的风险管理技术。最后,我们将提供详尽的解释和实用技巧,指导您完成自动化交易系统的构建、测试与优化。
从基础到中级:模板和类型名称(一) 从基础到中级:模板和类型名称(一)
在本文中,我们开始考虑许多初学者避免的概念之一。这与模板不是一个容易的话题有关,因为许多人不理解模板的基本原理:函数和过程的重载。