English Русский Português
preview
在外汇数据分析中使用关联规则

在外汇数据分析中使用关联规则

MetaTrader 5积分 |
66 2
Yevgeniy Koshtenko
Yevgeniy Koshtenko

关联规则概念介绍

现代算法交易需要新的分析方法。市场不断变化,经典的技术分析方法已不再能够识别复杂的市场关系。

我长期从事数据工作,发现许多成功的理念都来自相关领域。今天,我想分享我在交易中使用关联规则的经验。这种方法在零售分析中已经证明了自己,使我们能够找到购买、交易、价格走势以及未来供需之间的联系。如果我们将这种方法应用于外汇市场,会怎样呢?

基本的想法很简单——我们正在寻找价格行为、指标及其组合的稳定模式。例如,欧元兑美元(EURUSD)的上涨有多频繁地跟随美元兑日元(USDJPY)的下跌?或者,哪些条件最常先于强烈的走势?

在本文中将展示基于这一想法创建交易系统的完整过程。我们将:

  1. 在MQL5中收集历史数据
  2. 在Python中分析它们
  3. 找到重要的模式
  4. 将它们转化为交易信号

为什么选择这一特定的技术栈?MQL5非常适合处理证券交易所数据和交易自动化。反过来,Python提供了强大的分析工具。从我的经验来看,这种组合对于开发交易系统非常有效。

代码中将有许多有趣的内容,特别是在将关联规则应用于外汇的领域。


收集和准备历史外汇数据

对我们来说,收集和准备所有需要的数据至关重要。让我们以过去两年(自2022年起)主要货币对的1小时(H1)数据为基础。

现在,我们将编写一个MQL5脚本,该脚本将收集并以CSV格式导出我们需要的数据:

//+------------------------------------------------------------------+
//|                                                      Dataset.mq5 |
//|                                  Copyright 2024, MetaQuotes Ltd. |
//|                                             https://www.mql5.com |
//+------------------------------------------------------------------+
#property copyright "Copyright 2024, MetaQuotes Ltd."
#property link      "https://www.mql5.com"
#property version   "1.00"
//+------------------------------------------------------------------+
//| Script program start function                                    |
//+------------------------------------------------------------------+
//+------------------------------------------------------------------+
//| Script program start function                                    |
//+------------------------------------------------------------------+
void OnStart()
{
   string pairs[] = {"EURUSD", "GBPUSD", "USDJPY", "USDCHF"};
   datetime startTime = D'2022.01.01 00:00';
   datetime endTime = D'2024.01.01 00:00';
   
   for(int i=0; i<ArraySize(pairs); i++)
   {
      string filename = pairs[i] + "_H1.csv";
      int fileHandle = FileOpen(filename, FILE_WRITE|FILE_CSV);
      
      if(fileHandle != INVALID_HANDLE)
      {
         // Set headers
         FileWrite(fileHandle, "DateTime", "Open", "High", "Low", "Close", "Volume");
         
         MqlRates rates[];
         ArraySetAsSeries(rates, true);
         
         int copied = CopyRates(pairs[i], PERIOD_H1, startTime, endTime, rates);
         
         for(int j=copied-1; j>=0; j--)
         {
            FileWrite(fileHandle,
                     TimeToString(rates[j].time),
                     DoubleToString(rates[j].open, 5),
                     DoubleToString(rates[j].high, 5),
                     DoubleToString(rates[j].low, 5),
                     DoubleToString(rates[j].close, 5),
                     IntegerToString(rates[j].tick_volume)
                    );
         }
         FileClose(fileHandle);
      }
   }
}
//+------------------------------------------------------------------+


在Python中进行数据处理

在形成数据集之后,正确处理数据非常重要。 

为此,我创建了一个特殊的ForexDataProcessor类,它负责处理所有繁琐的工作。让我们来看看它的主要组成部分。

我们从加载数据开始。我们的函数处理主要货币对(EURUSD、GBPUSD、USDJPY和USDCHF)的小时数据。数据应以CSV格式存储,并包含主要的价格特征。

import pandas as pd
import numpy as np
from datetime import datetime
import os
import warnings
warnings.filterwarnings('ignore')

class ForexDataProcessor:
    def __init__(self):
        self.pairs = ["EURUSD", "GBPUSD", "USDJPY", "USDCHF"]
        self.data = {}
        self.processed_data = {}
    
    def load_data(self):
        """Load data for all currency pairs"""
        success = True
        for pair in self.pairs:
            filename = f"{pair}_H1.csv"
            try:
                df = pd.read_csv(filename, 
                               encoding='utf-16',
                               sep='\t',
                               names=['DateTime', 'Open', 'High', 'Low', 'Close', 'Volume'])
                
                # Remove lines with duplicate headers
                df = df[df['DateTime'] != 'DateTime']
                
                # Convert data types
                df['DateTime'] = pd.to_datetime(df['DateTime'], format='%Y.%m.%d %H:%M')
                for col in ['Open', 'High', 'Low', 'Close']:
                    df[col] = pd.to_numeric(df[col], errors='coerce')
                df['Volume'] = pd.to_numeric(df['Volume'], errors='coerce')
                
                # Remove NaN strings
                df = df.dropna()
                
                df.set_index('DateTime', inplace=True)
                self.data[pair] = df
                print(f"Loaded {pair} data successfully. Shape: {df.shape}")
            except Exception as e:
                print(f"Error loading {pair} data: {str(e)}")
                success = False
        return success

    def safe_qcut(self, series, q, labels):
        """Safe quantization with error handling"""
        try:
            if series.nunique() <= q:
                # If there are fewer unique values than quantiles, use regular categorization
                return pd.qcut(series, q=q, labels=labels, duplicates='drop')
            return pd.qcut(series, q=q, labels=labels)
        except Exception as e:
            print(f"Warning: Error in qcut - {str(e)}. Using manual categorization.")
            # Manual categorization as a backup option
            percentiles = np.percentile(series, [20, 40, 60, 80])
            return pd.cut(series, 
                         bins=[-np.inf] + list(percentiles) + [np.inf], 
                         labels=labels)

    def calculate_indicators(self, df):
        """Calculate technical indicators for a single dataframe"""
        result = df.copy()
        
        # Basic calculations
        result['Returns'] = result['Close'].pct_change()
        result['Log_Returns'] = np.log(result['Close']/result['Close'].shift(1))
        result['Range'] = result['High'] - result['Low']
        result['Range_Pct'] = result['Range'] / result['Open'] * 100
        
        # SMA calculations
        for period in [5, 10, 20, 50, 200]:
            result[f'SMA_{period}'] = result['Close'].rolling(window=period).mean()
        
        # EMA calculations
        for period in [5, 10, 20, 50]:
            result[f'EMA_{period}'] = result['Close'].ewm(span=period, adjust=False).mean()
        
        # Volatility
        result['Volatility'] = result['Returns'].rolling(window=20).std() * np.sqrt(20)
        
        # RSI
        delta = result['Close'].diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        result['RSI'] = 100 - (100 / (1 + rs))
        
        # MACD
        exp1 = result['Close'].ewm(span=12, adjust=False).mean()
        exp2 = result['Close'].ewm(span=26, adjust=False).mean()
        result['MACD'] = exp1 - exp2
        result['MACD_Signal'] = result['MACD'].ewm(span=9, adjust=False).mean()
        result['MACD_Hist'] = result['MACD'] - result['MACD_Signal']
        
        # Bollinger Bands
        result['BB_Middle'] = result['Close'].rolling(window=20).mean()
        result['BB_Upper'] = result['BB_Middle'] + (result['Close'].rolling(window=20).std() * 2)
        result['BB_Lower'] = result['BB_Middle'] - (result['Close'].rolling(window=20).std() * 2)
        result['BB_Width'] = (result['BB_Upper'] - result['BB_Lower']) / result['BB_Middle']
        
        # Discretization for association rules
        # SMA-based trend
        result['Trend'] = 'Sideways'
        result.loc[result['Close'] > result['SMA_50'], 'Trend'] = 'Uptrend'
        result.loc[result['Close'] < result['SMA_50'], 'Trend'] = 'Downtrend'
        
        # RSI zones
        result['RSI_Zone'] = pd.cut(result['RSI'].fillna(50), 
                                   bins=[-np.inf, 30, 45, 55, 70, np.inf],
                                   labels=['Oversold', 'Weak', 'Neutral', 'Strong', 'Overbought'])
        
        # Secure quantization for other parameters
        labels = ['Very_Low', 'Low', 'Medium', 'High', 'Very_High']
        
        result['Volatility_Zone'] = self.safe_qcut(
            result['Volatility'].fillna(result['Volatility'].mean()), 
            5, labels)
        
        result['Price_Zone'] = self.safe_qcut(
            result['Close'], 
            5, labels)
        
        result['Volume_Zone'] = self.safe_qcut(
            result['Volume'], 
            5, labels)
        
        # Candle patterns
        result['Body'] = result['Close'] - result['Open']
        result['Upper_Shadow'] = result['High'] - result[['Open', 'Close']].max(axis=1)
        result['Lower_Shadow'] = result[['Open', 'Close']].min(axis=1) - result['Low']
        result['Body_Pct'] = result['Body'] / result['Open'] * 100
        
        body_mean = abs(result['Body_Pct']).mean()
        result['Candle_Pattern'] = 'Normal'
        result.loc[abs(result['Body_Pct']) < body_mean * 0.1, 'Candle_Pattern'] = 'Doji'
        result.loc[result['Body_Pct'] > body_mean * 2, 'Candle_Pattern'] = 'Long_Bullish'
        result.loc[result['Body_Pct'] < -body_mean * 2, 'Candle_Pattern'] = 'Long_Bearish'
        
        return result

    def process_all_pairs(self):
        """Process all currency pairs and create combined dataset"""
        if not self.load_data():
            return None

        # Handling each pair
        for pair in self.pairs:
            if not self.data[pair].empty:
                print(f"Processing {pair}...")
                self.processed_data[pair] = self.calculate_indicators(self.data[pair])
                # Add a pair prefix to the column names
                self.processed_data[pair].columns = [f"{pair}_{col}" for col in self.processed_data[pair].columns]
            else:
                print(f"Skipping {pair} - no data")

        # Find the common time range for non-empty data
        common_dates = None
        for pair in self.pairs:
            if pair in self.processed_data and not self.processed_data[pair].empty:
                if common_dates is None:
                    common_dates = set(self.processed_data[pair].index)
                else:
                    common_dates &= set(self.processed_data[pair].index)

        if not common_dates:
            print("No common dates found")
            return None

        # Align all pairs by common dates
        aligned_data = {}
        for pair in self.pairs:
            if pair in self.processed_data and not self.processed_data[pair].empty:
                aligned_data[pair] = self.processed_data[pair].loc[sorted(common_dates)]

        # Combine all pairs
        combined_df = pd.concat([aligned_data[pair] for pair in aligned_data], axis=1)
        
        return combined_df

    def save_data(self, data, suffix='combined'):
        """Save processed data to CSV"""
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        filename = f"forex_data_{suffix}_{timestamp}.csv"
        
        try:
            data.to_csv(filename, sep='\t', encoding='utf-16')
            print(f"Saved processed data to: {filename}")
            return True
        except Exception as e:
            print(f"Error saving data: {str(e)}")
            return False

if __name__ == "__main__":
    processor = ForexDataProcessor()
    
    # Handling all pairs
    combined_data = processor.process_all_pairs()
    
    if combined_data is not None:
        # Save the combined dataset
        processor.save_data(combined_data)
        
        # Display dataset info
        print("\nCombined dataset shape:", combined_data.shape)
        print("\nFeatures for association rules analysis:")
        for col in combined_data.columns:
            if any(x in col for x in ['_Zone', '_Pattern', 'Trend']):
                print(f"- {col}")
        
        # Save individual pairs
        for pair in processor.pairs:
            if pair in processor.processed_data and not processor.processed_data[pair].empty:
                processor.save_data(processor.processed_data[pair], pair)


在成功加载数据之后,到了最有意思的部分——计算技术指标。在这里,我依赖于一系列经过时间检验的工具。移动平均线有助于识别不同持续时间的趋势。SMA(50)通常作为动态支撑或阻力。RSI振荡器,其经典周期为14,适用于确定市场的超买和超卖区域。MACD对于识别动量和反转点是不可或缺的。布林带能清晰地展示当前市场的波动性。

# Volatility and RSI calculation example
result['Volatility'] = result['Returns'].rolling(window=20).std() * np.sqrt(20)

delta = result['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
result['RSI'] = 100 - (100 / (1 + rs))

数据离散化值得特别关注。所有连续值都需要被划分为清晰的类别。在这个问题上,找到一个平衡点至关重要——过于粗糙的划分会使模式搜索变得复杂,而过于精细的划分则会导致丢失重要的市场细节。例如,为了确定趋势,更简单的划分方式效果更好——根据价格相对于平均值的位置:

# Defining a trend
result['Trend'] = 'Sideways'
result.loc[result['Close'] > result['SMA_50'], 'Trend'] = 'Uptrend'
result.loc[result['Close'] < result['SMA_50'], 'Trend'] = 'Downtrend'

K线模式也需要一个特别的方法来分析。基于统计分析,我根据最小的K线实体的大小区分出十字星(Doji),以及在极端价格波动下的长阳线(Long_Bullish)和长阴线(Long_Bearish)。这种分类使我们能够清晰地识别市场犹豫不决的时刻以及强烈的冲动走势。

在处理的最后阶段,所有货币对被合并到一个具有统一时间尺度的单一数据数组中。这一步至关重要——它为我们提供了在不同工具之间寻找复杂关系的可能性。现在我们可以看到一个货币对的趋势如何影响另一个货币对的波动性,或者K线模式如何与整个市场的交易量相关。


在Python中实现Apriori算法

在准备好数据之后,我们进入关键阶段——在金融数据中实现Apriori算法以寻找关联规则。我们将原本用于分析购物车的Apriori算法改编为适用于货币对时间序列的算法。 

在外汇市场的背景下,“交易”是在某一特定时间点上各种指标和货币对状态的集合。例如:
  • EURUSD_Trend = Uptrend
  • GBPUSD_RSI_Zone = Overbought
  • USDJPY_Volatility_Zone = High

该算法搜索此类状态的组合频度,基于这些组合形成交易规则。

import pandas as pd
import numpy as np
from collections import defaultdict
from itertools import combinations
import time
import logging

# Setting up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('apriori_forex_advanced.log'),
        logging.StreamHandler()
    ]
)

class AdvancedForexApriori:
    def __init__(self, min_support=0.01, min_confidence=0.7, max_length=3):
        self.min_support = min_support
        self.min_confidence = min_confidence
        self.max_length = max_length
        
    def find_patterns(self, df):
        start_time = time.time()
        logging.info("Starting advanced pattern search...")
        
        # Group columns by type for more meaningful analysis
        column_groups = {
            'trend': [col for col in df.columns if 'Trend' in col],
            'rsi': [col for col in df.columns if 'RSI_Zone' in col],
            'volume': [col for col in df.columns if 'Volume_Zone' in col],
            'price': [col for col in df.columns if 'Price_Zone' in col],
            'pattern': [col for col in df.columns if 'Pattern' in col]
        }
        
        # Create a list of all columns for analysis
        pattern_cols = []
        for cols in column_groups.values():
            pattern_cols.extend(cols)
        
        logging.info(f"Found {len(pattern_cols)} pattern columns in {len(column_groups)} groups")
        
        # Prepare data
        pattern_df = df[pattern_cols]
        n_rows = len(pattern_df)
        
        # Find single patterns
        logging.info("Finding single patterns...")
        single_patterns = {}
        for col in pattern_cols:
            value_counts = pattern_df[col].value_counts()
            value_counts = value_counts[value_counts/n_rows >= self.min_support]
            for value, count in value_counts.items():
                pattern = f"{col}={value}"
                single_patterns[pattern] = count/n_rows
        
        # Find pair and triple patterns 
        logging.info("Finding complex patterns...")
        complex_rules = []
        
        # Generate column combinations for analysis
        column_combinations = []
        for i in range(2, self.max_length + 1):
            column_combinations.extend(combinations(pattern_cols, i))
        
        total_combinations = len(column_combinations)
        for idx, cols in enumerate(column_combinations, 1):
            if idx % 10 == 0:
                logging.info(f"Processing combination {idx}/{total_combinations}")
            
            # Create a cross-table for the selected columns
            grouped = pattern_df.groupby([*cols]).size().reset_index(name='count')
            grouped['support'] = grouped['count'] / n_rows
            
            # Sort by minimum support
            grouped = grouped[grouped['support'] >= self.min_support]
            
            for _, row in grouped.iterrows():
                # Form all possible combinations of antecedents and consequents
                items = [f"{col}={row[col]}" for col in cols]
                
                for i in range(1, len(items)):
                    for antecedent in combinations(items, i):
                        consequent = tuple(set(items) - set(antecedent))
                        
                        # Calculate the support of the antecedent
                        ant_support = self._calculate_support(pattern_df, antecedent)
                        
                        if ant_support > 0:  # Avoid division by zero
                            confidence = row['support'] / ant_support
                            
                            if confidence >= self.min_confidence:
                                # Count the lift
                                cons_support = self._calculate_support(pattern_df, consequent)
                                lift = confidence / cons_support if cons_support > 0 else 0
                                
                                # Adding additional metrics to evaluate rules
                                leverage = row['support'] - (ant_support * cons_support)
                                conviction = (1 - cons_support) / (1 - confidence) if confidence < 1 else float('inf')
                                
                                rule = {
                                    'antecedent': antecedent,
                                    'consequent': consequent,
                                    'support': row['support'],
                                    'confidence': confidence,
                                    'lift': lift,
                                    'leverage': leverage,
                                    'conviction': conviction
                                }
                                
                                # Sort the rules by additional criteria 
                                if self._is_meaningful_rule(rule):
                                    complex_rules.append(rule)
        
        # Sort the rules by complex metric
        complex_rules.sort(key=self._rule_score, reverse=True)
        
        end_time = time.time()
        logging.info(f"Pattern search completed in {end_time - start_time:.2f} seconds")
        logging.info(f"Found {len(complex_rules)} meaningful rules")
        
        return complex_rules
    
    def _calculate_support(self, df, items):
        """Calculate support for a set of elements"""
        mask = pd.Series(True, index=df.index)
        for item in items:
            col, val = item.split('=')
            mask &= (df[col] == val)
        return mask.mean()
    
    def _is_meaningful_rule(self, rule):
        """Check the rule for its relevance to trading"""
        # The rule should have the high lift and 'leverage'
        if rule['lift'] < 1.5 or rule['leverage'] < 0.01:
            return False
            
        # At least one element should be related to a trend or RSI
        has_trend_or_rsi = any('Trend' in item or 'RSI' in item 
                              for item in rule['antecedent'] + rule['consequent'])
        if not has_trend_or_rsi:
            return False
            
        return True
    
    def _rule_score(self, rule):
        """Calculate the rule complex evaluation"""
        return (rule['lift'] * 0.4 + 
                rule['confidence'] * 0.3 + 
                rule['support'] * 0.2 + 
                rule['leverage'] * 0.1)

# Load data
logging.info("Loading data...")
data = pd.read_csv('forex_data_combined_20241116_074242.csv', 
                  sep='\t', 
                  encoding='utf-16',
                  index_col='DateTime')
logging.info(f"Data loaded, shape: {data.shape}")

# Apply the algorithm
apriori = AdvancedForexApriori(min_support=0.01, min_confidence=0.7, max_length=3)
rules = apriori.find_patterns(data)

# Display results
logging.info("\nTop 10 trading rules:")
for i, rule in enumerate(rules[:10], 1):
    logging.info(f"\nRule {i}:")
    logging.info(f"IF {' AND '.join(rule['antecedent'])}")
    logging.info(f"THEN {' AND '.join(rule['consequent'])}")
    logging.info(f"Support: {rule['support']:.3f}")
    logging.info(f"Confidence: {rule['confidence']:.3f}")
    logging.info(f"Lift: {rule['lift']:.3f}")
    logging.info(f"Leverage: {rule['leverage']:.3f}")
    logging.info(f"Conviction: {rule['conviction']:.3f}")

# Save results
results_df = pd.DataFrame(rules)
results_df.to_csv('forex_rules_advanced.csv', index=False, sep='\t', encoding='utf-16')
logging.info("Results saved to forex_rules_advanced.csv")


将关联规则应用于货币对分析的适配

在我将Apriori算法适配到外汇市场的过程中,我遇到了一些有趣的挑战。尽管这种方法最初是为分析店内购买行为而创建的,但我认为它在外汇市场的潜力是值得期待的。

主要的困难在于,外汇市场与普通商店购物有着根本的不同。在金融市场工作多年后,我已经习惯了处理不断变化的价格和指标。但是,如何将一个通常只是在超市收据上寻找香蕉和牛奶之间联系的算法应用到外汇市场上呢? 

经过我的实验,诞生了一个由五个指标组成的系统。我对每一个指标都进行了彻底的测试。

“支持度”(Support)被证明是一个非常棘手的指标。我曾经差点将一条表现优异的规则纳入交易系统,但它的支持度仅为0.02。幸运的是,我及时注意到了这一点——实际上,这样的规则每百年才会触发一次!

“置信度”(Confidence)则相对简单。当你在市场中工作时,你会很快意识到,即使是70%的概率也是一个非常出色的指标。关键是要合理地管理剩余的30%的风险。我们始终应该牢记风险管理。没有它,即使你手中有圣杯,你也会面临回撤甚至资金耗尽的风险。

“提升度”(Lift)成了我最喜欢的指标。经过数百小时的测试,我注意到一个模式——提升度超过1.5的规则实际上在真实市场中是有效的。这一发现深刻地影响了我对信号筛选的方法。 

处理“杠杆”(Leverage)的过程相当有趣。起初,我甚至想完全从系统中排除它,认为它毫无用处。但在市场特别波动的一个时期,它帮助我筛选出了大部分虚假信号。

“确信度”(Conviction)是在研究论坛后最后加入的。它帮助我理解了这个指标在评估所发现模式的实际重要性方面的重要性。

最让我惊讶的是,算法能够发现不同货币对之间意想不到的联系。例如,谁能想到EURUSD中的某些模式能够如此准确地预测USDJPY的走势呢?在市场中交易了9年,我都没有注意到算法发现的许多关系。尽管货币对交易、一揽子货币交易和套利曾经是我的专长,但我仍然记得cmillion刚开始基于货币对间的相互走势开发其EA的时候。

现在,我继续我的研究,测试新的指标组合和时间周期。市场不断变化,每一天都有新的发现。下周,我计划发布在年度数据上测试系统的成果,以及算法在实时模拟交易中的首批成果。那里有一些非常有趣的发现。

说实话,我甚至没想到这个项目会进展到这一步。这一切始于一个简单的数据挖掘实验,尝试为分类算法的需要严格分类所有市场走势,最终发展成了一个完整的交易系统。我认为我刚刚开始理解这种方法的真正潜力。


外汇交易中的实现细节

让我们稍微回顾一下代码本身。我们的代码有几个重要的细节,用于处理金融数据:

column_groups = {
    'trend': [col for col in df.columns if 'Trend' in col],
    'rsi': [col for col in df.columns if 'RSI_Zone' in col],
    'volume': [col for col in df.columns if 'Volume_Zone' in col],
    'price': [col for col in df.columns if 'Price_Zone' in col],
    'pattern': [col for col in df.columns if 'Pattern' in col]
}

这种分组有助于找到更有意义的指标组合,并降低计算复杂性。

def _is_meaningful_rule(self, rule):
    if rule['lift'] < 1.5 or rule['leverage'] < 0.01:
        return False
    has_trend_or_rsi = any('Trend' in item or 'RSI' in item 
                          for item in rule['antecedent'] + rule['consequent'])
    if not has_trend_or_rsi:
        return False
    return True

我们只选择具有强统计显著性(提升度 > 1.5)且必须包含趋势指标或RSI的规则。

def _rule_score(self, rule):
    return (rule['lift'] * 0.4 + 
            rule['confidence'] * 0.3 + 
            rule['support'] * 0.2 + 
            rule['leverage'] * 0.1)

加权评分有助于根据其对交易的潜在有用性对规则进行排名。


可视化发现的规则

在找到关联规则后,我们应该正确地对它们进行可视化和分析。为此,我开发了一个特殊的ForexRulesVisualizer类,它提供了几种对找到的模式进行视觉分析的方法。

规则指标的分布

分析的第一步是了解找到的规则的主要指标的分布情况。“支持度”、“置信度”、“提升度”和“杠杆”的分布图有助于评估找到的规则的质量,并在必要时调整算法参数。

特别有用的工具是交互式网络图,它清晰地显示了不同市场条件之间的联系。在这个图中,节点是指标状态(例如“EURUSD_Trend=Uptrend”或“USDJPY_RSI_Zone=Overbought”),边代表找到的规则,其中边的厚度与“提升度”值成正比。

货币对相互作用的热力图


为了分析货币对之间的关系,我使用热力图,它显示了不同工具之间的关系强度。这有助于识别最常相互影响的货币对,这对于构建多样化的交易组合至关重要。


创建交易信号

在找到并可视化了关联规则之后,下一步重要的步骤是将它们转化为交易信号。为此,我开发了ForexSignalGenerator类,它分析市场的当前状态,并根据找到的规则生成交易信号。

import pandas as pd
import numpy as np
from datetime import datetime
import logging

class ForexSignalGenerator:
    def __init__(self, rules_df, min_rule_strength=0.5):
        """
        Signal generator initialization
        
        Parameters:
        rules_df: DataFrame with association rules
        min_rule_strength: minimum rule strength to generate a signal
        """
        self.rules_df = rules_df
        self.min_rule_strength = min_rule_strength
        self.active_signals = {}
        
    def calculate_rule_strength(self, rule):
        """
        Comprehensive assessment of the rule strength
        Takes into account all metrics with different weights
        """
        strength = (
            rule['lift'] * 0.4 +        # Main weight on 'lift'
            rule['confidence'] * 0.3 +   # Rule confidence
            rule['support'] * 0.2 +      # Occurrence frequency
            rule['leverage'] * 0.1       # Improvement over randomness
        )
        
        # Additional bonus for having trend indicators
        if any('Trend' in item for item in rule['antecedent']):
            strength *= 1.2
            
        return strength
        
    def analyze_market_state(self, current_data):
        """
        Current market state analysis
        
        Parameters:
        current_data: DataFrame with current indicator values
        """
        signals = []
        state = self._create_market_state(current_data)
        
        # Find all the matching rules
        matching_rules = self._find_matching_rules(state)
        
        # Grouping rules by currency pairs
        for pair in ['EURUSD', 'GBPUSD', 'USDJPY', 'USDCHF']:
            pair_rules = [r for r in matching_rules if any(pair in c for c in r['consequent'])]
            if pair_rules:
                signal = self._generate_pair_signal(pair, pair_rules)
                signals.append(signal)
        
        return signals
    
    def _create_market_state(self, data):
        """Forming the current market state"""
        state = []
        for col in data.columns:
            if any(x in col for x in ['_Zone', '_Pattern', 'Trend']):
                state.append(f"{col}={data[col].iloc[-1]}")
        return set(state)
    
    def _find_matching_rules(self, state):
        """Searching for rules that match the current state"""
        matching_rules = []
        
        for _, rule in self.rules_df.iterrows():
            # Check if all the rule conditions are met
            if all(cond in state for cond in rule['antecedent']):
                strength = self.calculate_rule_strength(rule)
                if strength >= self.min_rule_strength:
                    rule['calculated_strength'] = strength
                    matching_rules.append(rule)
        
        return matching_rules
    
    def _generate_pair_signal(self, pair, rules):
        """Generating a signal for a specific currency pair"""
        # Divide the rules by signal type
        trend_signals = defaultdict(float)
        
        for rule in rules:
            # Looking for trend-related consequents
            trend_cons = [c for c in rule['consequent'] if pair in c and 'Trend' in c]
            if trend_cons:
                for cons in trend_cons:
                    trend = cons.split('=')[1]
                    trend_signals[trend] += rule['calculated_strength']
        
        # Determine the final signal
        if trend_signals:
            strongest_trend = max(trend_signals.items(), key=lambda x: x[1])
            return {
                'pair': pair,
                'signal': strongest_trend[0],
                'strength': strongest_trend[1],
                'timestamp': datetime.now()
            }
        
        return None

# Usage example
def run_trading_system(data, rules_df):
    """
    Trading system launch
    
    Parameters:
    data: DataFrame with historical data
    rules_df: DataFrame with association rules
    """
    signal_generator = ForexSignalGenerator(rules_df)
    
    # Simulate a pass along historical data
    signals_history = []
    
    for i in range(len(data) - 1):
        current_slice = data.iloc[i:i+1]
        signals = signal_generator.analyze_market_state(current_slice)
        
        for signal in signals:
            if signal:
                signals_history.append({
                    'datetime': current_slice.index[0],
                    'pair': signal['pair'],
                    'signal': signal['signal'],
                    'strength': signal['strength']
                })
    
    return pd.DataFrame(signals_history)

# Loading historical data and rules
data = pd.read_csv('forex_data_combined_20241116_090857.csv', 
                  sep='\t', 
                  encoding='utf-16',
                  index_col='DateTime',
                  parse_dates=True)

rules_df = pd.read_csv('forex_rules_advanced.csv',
                      sep='\t',
                      encoding='utf-16')
rules_df['antecedent'] = rules_df['antecedent'].apply(eval)
rules_df['consequent'] = rules_df['consequent'].apply(eval)

# Launch the test
signals_df = run_trading_system(data, rules_df)

# Analyze the results
print("Generated signals statistics:")
print(signals_df.groupby('pair')['signal'].value_counts())


评估规则的有效性

在长时间进行规则可视化实验后,现在到了最困难的部分——创建实际的交易信号。我承认,这项任务让我费了不少力气。在图表上找到漂亮的模式是一回事,而将其转化为可运作的交易系统则是另一回事。

我决定创建一个单独的模块 ForexSignalGenerator。起初,我只是想根据最强的规则生成信号,但很快意识到事情要复杂得多。市场在不断变化,昨天效果很好的规则今天可能会失效。

我不得不认真对待规则有效性的评估。经过几次失败的实验后,我开发了一个评分系统。在选择比例方面我遇到了最大的困难——我可能尝试了数十种组合。最终,我确定了“提升度”占最终评估的40%(这是一个真正关键的指标),“置信度”占30%,“支持度”占20%,“杠杆率”占10%。

有趣的是,当规则包含趋势成分时,往往能获得最强的信号。我甚至为这样的规则增加了20%的特殊奖励,实践证明这是合理的。

在分析当前市场状态时,我也付出了不少努力。起初,我只是简单地将当前指标值与规则条件进行比较。但后来我意识到需要考虑更广泛的背景。例如,我添加了对过去几个周期内整体趋势的验证、波动状态,甚至是一天中的时间段。

目前,该系统会对每种货币对大约20个不同的参数进行分析。我找到的一些模式真的让我感到惊讶。 

诚然,该系统仍然远非完美。有时,我会想是否需要添加基本面因素。然而,我留待以后再处理这个问题。首先,我想完成当前版本。 


信号排序和聚合

在系统开发过程中,我很快意识到仅仅找到规则是不够的——我们需要对信号质量进行严格控制。经过几次不成功的交易后,很明显,排序甚至比找到模式本身更重要。

我开始采用一个简单的最低规则有效性阈值。起初我将其设置为0.5,但我不断得到误报。经过两周的测试后,我将阈值提高到0.7,情况明显改善。信号数量减少了大约三分之一,但质量显著提高。

第二级排序是在一次特别令人沮丧的事件之后出现的。有一个表现优异的规则,我根据它开仓,但市场却朝着完全相反的方向发展。当我开始调查时,发现当时其他规则正在发出相反的信号。从那时起,我都会检查一致性,只有在多个规则指向同一方向时才会开仓。

处理波动性变得很有趣。我注意到在平稳时期系统运行得像钟表一样精确,但一旦市场变得更加活跃,问题就开始出现。因此,我添加了基于ATR的动态过滤器。如果过去20天的波动性超过第75百分位数,我们会提高对规则强度的要求20%。

最困难的部分是检查冲突信号。有时一些规则建议买入,另一些建议卖出,而所有规则都有良好的参数。我尝试了不同的方法,但最终确定了一个简单的解决方案:如果信号存在显著矛盾,我们就跳过这种情况。这样做我们会失去一些机会,但会显著降低风险。

下个月,我计划添加时间筛选规则。我注意到在某些时段规则的表现明显更差。这在流动性较低和发布重要新闻的时期尤其如此。我认为,这应该会进一步提高成功交易的比例。


测试结果

在系统开发几个月后,我遇到了一个关键问题——如何正确评估每个找到的规则的有效性?理论上看起来很简单,但真实市场很快暴露了初始方法的所有弱点。

经过长时间的实验,我得出了一套不同因素的权重系统。我让“提升度”成为主要组成部分(影响40%)——实践证明这是一个真正至关重要的指标。“置信度”占30%——毕竟规则的置信度也很重要。“支持度”和“杠杆率”被赋予了较小的权重——它们更多地起到过滤作用。

信号排序成了一个独立的规则。起初,我尝试按顺序交易所有规则,但很快意识到自己的错误。因此,我不得不引入一个多级排序系统。首先,我们根据最低强度阈值筛选出弱规则。然后我们检查信号是否由多个规则确认——通常单个规则可靠性较低。

考虑波动性被证明尤为重要。在市场平稳时期,系统运行完美,但一旦波动性飙升,误报数量急剧增加。我不得不添加动态过滤器,随着波动性增加而变得更加严格。

系统测试几乎花了三个月。我对其进行了为期两年的四对主要货币对的回测。结果相当出乎意料。例如,USDJPY表现最佳——65%的交易盈利,风险回报率1.6。但GBPUSD令人失望——只有58%,风险回报率1.4。

有趣的是,提升度超过2.0且置信度超过0.8的规则对所有货币对都始终表现出最佳结果。显然,这些水平在汇市中确实是一些自然显著性阈值。


进一步改进

目前,我看到系统有几个改进方向。首先,规则的参数需要更加动态——市场在变化,系统需要适应。其次,明显缺乏对宏观经济和新闻背景的考虑。是的,这会使系统复杂化,但潜在的收益值得这样做。

应用自适应过滤器似乎特别有趣。不同的市场阶段显然需要不同的系统设置。目前实现得比较粗糙,但我已经看到几种改进方法。

下周我计划开始测试一个具有动态优化仓位大小的新版本。历史数据上的初步结果看起来很有希望,但真实市场,一如既往,会做出自己的调整。


结论

在算法交易中使用关联规则为发现市场中不明显模式开辟了新的机会。成功的关键在于适当的数据准备、仔细选择规则以及一个设计良好的信号生成系统。

重要的是要记住,任何交易系统都需要持续监控并适应不断变化的市场条件。关联规则是一个强大的分析工具,但它们需要与其他技术分析和基本面分析方法结合使用。

本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/16061

附加的文件 |
Dataset.mq5 (4.29 KB)
最近评论 | 前往讨论 (2)
Aleksey Vyazmikin
Aleksey Vyazmikin | 22 11月 2024 在 18:27

显然,我们假定读者已经对这种方法有所了解,如果不是呢?

我尤其不理解其中提到的指标:

提升率已成为我最喜欢的指标。经过数百小时的测试,我发现了一个规律--升水高于 1.5 的规则在实际市场中确实有效。这一发现严重影响了我的信号过滤 方法。

如果我对该方法理解正确的话,相关信号是在量子段中搜索的。但我不明白下一步该怎么做。目标信号是什么?我猜测是根据目标检查生成的规则,并根据度量标准进行评估。

如果是这样的话,这与我的方法不谋而合,而且评估性能和效率也很有趣。

Cks1295
Cks1295 | 24 11月 2024 在 05:28
你好,尤金!请给我回信(我已向您发送了加为好友的请求,有一个严肃的话题需要讨论(支持者模式及其实际应用)。感谢您的回复,谨致敬意,安德烈
DoEasy.服务函数(第 3 部分):外包线形态 DoEasy.服务函数(第 3 部分):外包线形态
在本文中,我们将开发 DoEasy 库中的外包线(Outside Bar)价格行为形态,并优化访问价格形态管理的方法。此外,我们将修复在库测试中发现的错误和缺点。
交易中的神经网络:具有相对编码的变换器 交易中的神经网络:具有相对编码的变换器
自我监督学习是分析大量无标签数据的有效方法。通过令模型适应金融市场的特定特征来提供效率,这有助于提升传统方法的有效性。本文讲述了一种替代的注意力机制,它参考输入之间的相对依赖关系。
价格行为分析工具包开发(第二部分):分析注释脚本 价格行为分析工具包开发(第二部分):分析注释脚本
秉承我们简化价格行为分析的核心理念,我们很高兴推出又一款可显著提升市场分析能力、助力您做出精准决策的工具。该工具可展示关键技术指标(如前一日价格、重要支撑阻力位、成交量),并在图表上自动生成可视化标记。
基于MQL5的自动化交易策略(第一部分):Profitunity系统(比尔·威廉姆斯的《交易混沌》) 基于MQL5的自动化交易策略(第一部分):Profitunity系统(比尔·威廉姆斯的《交易混沌》)
在本文中,我们研究了比尔·威廉姆斯(Bill Williams)的Profitunity系统,深入剖析其核心组成部分以及在混沌市场中独特的交易方法。我们指导读者在MQL5中实现该系统,专注于自动化关键指标和入场/出场信号。最后,我们对策略进行测试和优化,提供其在不同市场环境下的表现。