English Русский Español Deutsch 日本語 Português
preview
量子计算与交易:价格预测的新方法

量子计算与交易:价格预测的新方法

MetaTrader 5交易系统 |
807 0
Yevgeniy Koshtenko
Yevgeniy Koshtenko

交易中量子计算的介绍:关键概念与优势

想象一下,在一个世界中,每一笔市场交易都通过同时存在的可能性来进行分析——就像著名的薛定谔之猫,在打开盒子之前,它既是活的也是死的。这就是量子交易的工作原理:它同时观察所有潜在的市场状态,为金融分析开辟了新的视野。

传统计算机按顺序逐位处理信息,而量子系统则利用微观世界的惊人特性——叠加和纠缠——来并行分析多种情景。这就像一位经验丰富的交易者,同时将数十张图表、新闻和指标牢记在心,但其能力被提升到了难以想象的程度。

我们生活在一个算法交易已成为常态的时代,而现在,我们正站在下一次革命的门槛上。量子计算承诺的不仅仅是更快的数据分析——它提供了一种理解市场过程的全新方法。想象一下,我们不再线性地预测资产价格,而是可以探索整个概率情景树,其中每个分支都考虑了最微妙的市场相关性。

在本文中,我们将深入探讨量子交易的世界——从量子计算的基本原理到交易系统的实际实现。我们将探讨量子算法如何在传统方法失败的地方找到模式,以及如何将这种优势应用于实时交易决策。

我们的旅程将从量子计算的基础知识开始,并逐步引导我们创建一个可行的市场预测系统。在此过程中,我们将把复杂的概念分解为简单的例子,并了解量子计算的理论优势如何转化为实际的交易工具。


金融时间序列分析中的量子叠加与纠缠

在分析金融市场时,我们面临着一个基本问题:存在无限多个相互影响的因素。每一次价格变动都是数千个变量的复杂相互作用的结果,从宏观经济指标到个别交易者的情绪。这正是量子计算通过其基本特性——叠加和纠缠——提供独特解决方案之处。

让我们考虑叠加。对于传统计算机而言,一个位只能是0或1。而一个量子位则同时存在于所有可能的状态中,直到我们进行测量。数学上,这被描述为|ψ⟩ = α|0⟩ + β|1⟩,其中α和β是复杂的概率振幅。当应用于时间序列分析时,这一特性使量子算法能够通过并行分析多种潜在情景,有效地探索投资组合和风险管理问题中的解空间。

量子纠缠增加了另一层可能性。当量子位纠缠时,它们的状态变得密不可分,例如,被描述为|ψ⟩ = (|00⟩ + |11⟩)/√2。在金融分析的背景下,这一特性被用于量子算法中,以模拟不同市场指标之间的复杂相关性。例如,我们可以创建能够考虑资产价格、交易量和市场波动性之间关系的系统。

这些量子特性在处理高频交易时尤其有用,因为在高频交易中,处理多维数据的速度至关重要。叠加使多种交易情景能够并行分析,而纠缠则有助于实时考虑复杂的跨市场相关性。量子优势在特定的优化和搜索问题中最为明显,在这些问题中,经典算法面临着计算复杂性的指数级增长。


使用QPE(量子相位估计)开发量子预测算法

我们的系统核心在于量子计算与经典技术分析的巧妙结合。想象一下,一个由八个量子位组成的量子乐团,每个量子位都是一位演奏家,在市场波动的复杂交响乐中演奏着自己的部分。

一切从数据准备开始。我们的量子预测器通过与MetaTrader 5的集成接收市场数据,就像神经元从感官收集信息一样。这些数据经过一个归一化过程——想象一下,就像我们正在将乐团中的所有乐器调至同一音调。

创建量子电路时,最有趣的部分开始了。首先,我们使用哈达玛门(H门)将每个量子位置于叠加状态。此时,每个量子位同时存在于所有可能的状态中,就像每位演奏家同时演奏着他部分的所有可能音符。

然后,我们通过ry门将市场数据编码到量子状态中,其中旋转角度由市场参数的值决定。这就像指挥家为每位演奏家设定演奏的速度和风格。我们特别关注当前价格——它拥有自己的量子旋转,像乐团中的独奏者一样影响着整个系统。

真正的魔力在于创建量子纠缠。使用cx门(CNOT),我们将相邻的量子位耦合起来,创建出不可分割的量子相关性。这就像将演奏家们的个人部分融合成了一个和谐的整体。

经过量子变换后,我们进行测量,就像录制一场音乐会一样。但这里有个转折:我们重复这个过程2000次(shots),得到结果的统计分布。每个维度都为我们提供一个比特串,其中1的个数决定了预测的方向。

最后的和弦是结果的解释。该系统在预测时非常保守,将最大价格变动限制在0.1%以内。这就像一位经验丰富的指挥家,不允许乐团演奏得过于响亮或过于柔和,保持着声音的平衡。

测试结果不言而喻。在EURUSD H1上的预测准确率超过了随机猜测,达到了54%。同时,该系统对其预测表现出了高度的信心,这体现在“置信度”指标上。

在此实现中,量子计算不仅仅是经典技术分析的补充——它为市场分析创造了一个新的维度,其中多种可能的情景在量子叠加中同时被探索。正如理查德·费曼所说:“量子层面的自然与我们通常所想的非常不同”。看来,金融市场也隐藏着一种我们才刚刚开始理解的量子特性。

import numpy as np
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime, timedelta
from qiskit import QuantumCircuit, transpile, QuantumRegister, ClassicalRegister
from qiskit_aer import AerSimulator
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import warnings
warnings.filterwarnings('ignore')

class MT5DataLoader:
    def __init__(self, symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1):
        if not mt5.initialize():
            raise Exception("MetaTrader5 initialization failed")
        
        self.symbol = symbol
        self.timeframe = timeframe
    
    def get_historical_data(self, lookback_bars=1000):
        current_time = datetime.now()
        rates = mt5.copy_rates_from(self.symbol, self.timeframe, current_time, lookback_bars)
        
        if rates is None:
            raise Exception(f"Failed to get data for {self.symbol}")
            
        df = pd.DataFrame(rates)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        return df

class EnhancedQuantumPredictor:
    def __init__(self, num_qubits=8):  # Reduce the number of qubits for stability
        self.num_qubits = num_qubits
        self.simulator = AerSimulator()
        self.scaler = MinMaxScaler()
        
    def create_qpe_circuit(self, market_data, current_price):
        """Create a simplified quantum circuit"""
        qr = QuantumRegister(self.num_qubits, 'qr')
        cr = ClassicalRegister(self.num_qubits, 'cr')
        qc = QuantumCircuit(qr, cr)
        
        # Normalize data
        scaled_data = self.scaler.fit_transform(market_data.reshape(-1, 1)).flatten()
        
        # Create superposition
        for i in range(self.num_qubits):
            qc.h(qr[i])
        
        # Apply market data as phases
        for i in range(min(len(scaled_data), self.num_qubits)):
            angle = float(scaled_data[i] * np.pi)  # Convert to float
            qc.ry(angle, qr[i])
        
        # Create entanglement
        for i in range(self.num_qubits - 1):
            qc.cx(qr[i], qr[i + 1])
        
        # Apply the current price
        price_angle = float((current_price % 0.01) * 100 * np.pi)  # Use only the last 2 characters
        qc.ry(price_angle, qr[0])
        
        # Measure all qubits
        qc.measure(qr, cr)
        
        return qc
    
    def predict(self, market_data, current_price, features=None, shots=2000):
        """Simplified prediction"""
        # Trim the input data
        if market_data.shape[0] > self.num_qubits:
            market_data = market_data[-self.num_qubits:]
        
        # Create and execute the circuit
        qc = self.create_qpe_circuit(market_data, current_price)
        compiled_circuit = transpile(qc, self.simulator, optimization_level=3)
        job = self.simulator.run(compiled_circuit, shots=shots)
        result = job.result()
        counts = result.get_counts()
        
        # Analyze the results
        predictions = []
        total_shots = sum(counts.values())
        
        for bitstring, count in counts.items():
            # Use the number of ones in the bitstring to determine the direction
            ones = bitstring.count('1')
            direction = ones / self.num_qubits  # Normalized direction
            
            # Predict the change of no more than 0.1%
            price_change = (direction - 0.5) * 0.001
            predicted_price = current_price * (1 + price_change)
            predictions.extend([predicted_price] * count)
        
        predicted_price = np.mean(predictions)
        up_probability = sum(1 for p in predictions if p > current_price) / len(predictions)
        
        confidence = 1 - np.std(predictions) / current_price
        

        
        return {
            'predicted_price': predicted_price,
            'up_probability': up_probability,
            'down_probability': 1 - up_probability,
            'confidence': confidence
        }

class MarketPredictor:
    def __init__(self, symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1, window_size=14):
        self.symbol = symbol
        self.timeframe = timeframe
        self.window_size = window_size
        self.quantum_predictor = EnhancedQuantumPredictor()
        self.data_loader = MT5DataLoader(symbol, timeframe)
    
    def prepare_features(self, df):
        """Prepare technical indicators"""
        df['sma'] = df['close'].rolling(window=self.window_size).mean()
        df['ema'] = df['close'].ewm(span=self.window_size).mean()
        df['std'] = df['close'].rolling(window=self.window_size).std()
        df['upper_band'] = df['sma'] + (df['std'] * 2)
        df['lower_band'] = df['sma'] - (df['std'] * 2)
        df['rsi'] = self.calculate_rsi(df['close'])
        df['momentum'] = df['close'] - df['close'].shift(self.window_size)
        df['rate_of_change'] = (df['close'] / df['close'].shift(1) - 1) * 100
        
        features = df[['sma', 'ema', 'std', 'upper_band', 'lower_band', 
                      'rsi', 'momentum', 'rate_of_change']].dropna()
        return features
    
    def calculate_rsi(self, prices, period=14):
        delta = prices.diff()
        gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period).mean()
        loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period).mean()
        rs = gain / loss
        return 100 - (100 / (1 + rs))
    
    def predict(self):
        # Get data
        df = self.data_loader.get_historical_data(self.window_size + 50)
        features = self.prepare_features(df)
        
        if len(features) < self.window_size:
            raise ValueError("Insufficient data")
        
        # Get the latest data for the forecast
        latest_features = features.iloc[-self.window_size:].values
        current_price = df['close'].iloc[-1]
        
        # Make a prediction, now pass features as DataFrame
        prediction = self.quantum_predictor.predict(
            market_data=latest_features,
            current_price=current_price,
            features=features.iloc[-self.window_size:]  # Pass the last entries
        )
        
        prediction.update({
            'timestamp': datetime.now(),
            'current_price': current_price,
            'rsi': features['rsi'].iloc[-1],
            'sma': features['sma'].iloc[-1],
            'ema': features['ema'].iloc[-1]
        })
        
        return prediction

def evaluate_model(symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1, test_periods=100):
    """Evaluation of model accuracy"""
    predictor = MarketPredictor(symbol, timeframe)
    predictions = []
    actual_movements = []
    
    # Get historical data
    df = predictor.data_loader.get_historical_data(test_periods + 50)
    
    for i in range(test_periods):
        try:
            temp_df = df.iloc[:-(test_periods-i)]
            predictor_temp = MarketPredictor(symbol, timeframe)
            features_temp = predictor_temp.prepare_features(temp_df)
            
            # Get data for forecasting
            latest_features = features_temp.iloc[-predictor_temp.window_size:].values
            current_price = temp_df['close'].iloc[-1]
            
            # Make a forecast with the transfer of all necessary parameters
            prediction = predictor_temp.quantum_predictor.predict(
                market_data=latest_features,
                current_price=current_price,
                features=features_temp.iloc[-predictor_temp.window_size:]
            )
            
            predicted_movement = 1 if prediction['up_probability'] > 0.5 else 0
            predictions.append(predicted_movement)
            
            actual_price_next = df['close'].iloc[-(test_periods-i)]
            actual_price_current = df['close'].iloc[-(test_periods-i)-1]
            actual_movement = 1 if actual_price_next > actual_price_current else 0
            actual_movements.append(actual_movement)
            
        except Exception as e:
            print(f"Error in evaluation: {e}")
            continue
    
    if len(predictions) > 0:
        metrics = {
            'accuracy': accuracy_score(actual_movements, predictions),
            'precision': precision_score(actual_movements, predictions),
            'recall': recall_score(actual_movements, predictions),
            'f1': f1_score(actual_movements, predictions)
        }
    else:
        metrics = {
            'accuracy': 0,
            'precision': 0,
            'recall': 0,
            'f1': 0
        }
    
    return metrics

if __name__ == "__main__":
    if not mt5.initialize():
        print("MetaTrader5 initialization failed")
        mt5.shutdown()
    else:
        try:
            symbol = "EURUSD"
            timeframe = mt5.TIMEFRAME_H1
            
            print("\nTest the model...")
            metrics = evaluate_model(symbol, timeframe, test_periods=100)
            
            print("\nModel quality metrics:")
            print(f"Accuracy: {metrics['accuracy']:.2%}")
            print(f"Precision: {metrics['precision']:.2%}")
            print(f"Recall: {metrics['recall']:.2%}")
            print(f"F1-score: {metrics['f1']:.2%}")
            
            print("\nCurrent forecast:")
            predictor = MarketPredictor(symbol, timeframe)
            df = predictor.data_loader.get_historical_data(predictor.window_size + 50)
            features = predictor.prepare_features(df)
            latest_features = features.iloc[-predictor.window_size:].values
            current_price = df['close'].iloc[-1]
            
            prediction = predictor.predict()  # Now this method passes all parameters correctly
            
            print(f"Predicted price: {prediction['predicted_price']:.5f}")
            print(f"Growth probability: {prediction['up_probability']:.2%}")
            print(f"Fall probability: {prediction['down_probability']:.2%}")
            print(f"Forecast confidence: {prediction['confidence']:.2%}")
            print(f"Current price: {prediction['current_price']:.5f}")
            print(f"RSI: {prediction['rsi']:.2f}")
            print(f"SMA: {prediction['sma']:.5f}")
            print(f"EMA: {prediction['ema']:.5f}")
            
        finally:
            mt5.shutdown()


量子交易系统的测试与验证:方法与结果



关键性能指标

  • 准确率:55.00%,在波动较大的欧元兑美元(EURUSD)市场中,该准确率适度高于随机猜测水平。
  • 精确率:63.64%,表明系统生成的信号具有较高的可靠性——近三分之二的预测是正确的。
  • 召回率:14.58%,较低的召回率表明系统具有选择性,仅在预测具有高度信心时才生成信号。这有助于避免错误信号。
  • F1分数:23.73%,F1分数值反映了精确率与召回率之间的平衡,证实了系统的保守策略。

当前市场预测

  • 当前欧元兑美元价格:1.02903
  • 预测价格:1.02905
  • 概率分布
    • 上涨概率:38.95%
    • 下跌概率:61.05%
  • 预测置信度:99.98%

尽管下跌概率占优,但系统仍以高置信度预测会出现适度的上涨走势。这可能表明整体下行趋势中存在短期的修正期。

技术指标

  • 相对强弱指数(RSI):49.13(接近中性区域,表明没有明显的超买或超卖情况)
  • 简单移动平均线(SMA):1.02904
  • 指数移动平均线(EMA):1.02909
  • 趋势:中性(当前价格接近移动平均线水平)

总结

该系统具有以下关键特性:

  1. 准确率持续高于随机猜测水平
  2. 信号生成具有高度选择性(63.64%的预测准确)
  3. 能够量化不同情景的置信度和概率
  4. 对市场情况进行全面分析,同时考虑技术指标和更复杂的模式

这种优先考虑信号质量而非数量的保守方法,使该系统成为实际交易中潜在有效的工具。


融合机器学习指标与量子算法:思考与初步尝试

量子计算与机器学习的结合开辟了新的视野。在我们的实验中,使用sklearn中的MinMaxScaler在将市场数据量子编码为量子角度之前对其进行标准化处理。

该系统将机器学习指标(准确率、精确率、召回率、F1分数)与量子测量相结合。其准确率达到60%,能够捕捉经典算法无法触及的模式,展现出如同经验丰富的交易员一般的谨慎和选择性。

未来,此类系统将整合更复杂的预处理方法,如自动编码器和变换器,以进行更深入地数据分析。量子算法可以成为经典模型的过滤器,反之亦然,从而适应市场条件。

我们的实验证明,量子方法与经典方法的协同作用并非一种选择,而是算法交易领域下一个突破的必经之路。


如何开始编写我的第一个量子神经网络——分类器

当开始在交易领域尝试量子计算时,我决定超越简单的量子电路,创建一个将量子计算与经典机器学习方法相结合的混合系统。这个理念似乎很有前景。我打算使用量子态来编码市场信息,然后基于这些数据训练一个常规分类器。

该系统相当复杂。它基于一个8量子比特的量子电路,通过一系列量子门将市场数据转换为量子态。每段价格历史都被编码为ry门的旋转角度,并使用cx门在量子比特之间创建纠缠。这使系统能够捕捉数据中的复杂非线性关系。

为了提高预测能力,我添加了一组经典二元指标:价格方向、动量、成交量、移动平均线收敛/发散指标、波动率、RSI和布林带。每个指标都生成二元信号,然后与量子特征相结合。

import numpy as np
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime, timedelta
from qiskit import QuantumCircuit, transpile, QuantumRegister, ClassicalRegister
from qiskit_aer import AerSimulator
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from catboost import CatBoostClassifier
import warnings
warnings.filterwarnings('ignore')

class MT5DataLoader:
    def __init__(self, symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1):
        if not mt5.initialize():
            raise Exception("MetaTrader5 initialization failed")
        
        self.symbol = symbol
        self.timeframe = timeframe
    
    def get_historical_data(self, lookback_bars=1000):
        current_time = datetime.now()
        rates = mt5.copy_rates_from(self.symbol, self.timeframe, current_time, lookback_bars)
        
        if rates is None:
            raise Exception(f"Failed to get data for {self.symbol}")
            
        df = pd.DataFrame(rates)
        df['time'] = pd.to_datetime(df['time'], unit='s')
        return df

class BinaryPatternGenerator:
    def __init__(self, df, lookback=10):
        self.df = df
        self.lookback = lookback
        
    def direction_encoding(self):
        return (self.df['close'] > self.df['close'].shift(1)).astype(int)
        
    def momentum_encoding(self, threshold=0.0001):
        returns = self.df['close'].pct_change()
        return (returns.abs() > threshold).astype(int)
        
    def volume_encoding(self):
        return (self.df['tick_volume'] > self.df['tick_volume'].rolling(self.lookback).mean()).astype(int)
        
    def convergence_encoding(self):
        ma_fast = self.df['close'].rolling(5).mean()
        ma_slow = self.df['close'].rolling(20).mean()
        return (ma_fast > ma_slow).astype(int)
        
    def volatility_encoding(self):
        volatility = self.df['high'] - self.df['low']
        avg_volatility = volatility.rolling(20).mean()
        return (volatility > avg_volatility).astype(int)
        
    def rsi_encoding(self, period=14, threshold=50):
        delta = self.df['close'].diff()
        gain = (delta.where(delta > 0, 0)).ewm(alpha=1/period).mean()
        loss = (-delta.where(delta < 0, 0)).ewm(alpha=1/period).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        return (rsi > threshold).astype(int)
        
    def bollinger_encoding(self, window=20):
        ma = self.df['close'].rolling(window=window).mean()
        std = self.df['close'].rolling(window=window).std()
        upper = ma + (std * 2)
        lower = ma - (std * 2)
        return ((self.df['close'] - lower)/(upper - lower) > 0.5).astype(int)
        
    def get_all_patterns(self):
        patterns = {
            'direction': self.direction_encoding(),
            'momentum': self.momentum_encoding(),
            'volume': self.volume_encoding(),
            'convergence': self.convergence_encoding(),
            'volatility': self.volatility_encoding(),
            'rsi': self.rsi_encoding(),
            'bollinger': self.bollinger_encoding()
        }
        return patterns

class QuantumFeatureGenerator:
    def __init__(self, num_qubits=8):
        self.num_qubits = num_qubits
        self.simulator = AerSimulator()
        self.scaler = MinMaxScaler()
        
    def create_quantum_circuit(self, market_data, current_price):
        qr = QuantumRegister(self.num_qubits, 'qr')
        cr = ClassicalRegister(self.num_qubits, 'cr')
        qc = QuantumCircuit(qr, cr)
        
        # Normalize data
        scaled_data = self.scaler.fit_transform(market_data.reshape(-1, 1)).flatten()
        
        # Create superposition
        for i in range(self.num_qubits):
            qc.h(qr[i])
        
        # Apply market data as phases
        for i in range(min(len(scaled_data), self.num_qubits)):
            angle = float(scaled_data[i] * np.pi)
            qc.ry(angle, qr[i])
        
        # Create entanglement
        for i in range(self.num_qubits - 1):
            qc.cx(qr[i], qr[i + 1])
        
        # Add the current price
        price_angle = float((current_price % 0.01) * 100 * np.pi)
        qc.ry(price_angle, qr[0])
        
        qc.measure(qr, cr)
        return qc
        
    def get_quantum_features(self, market_data, current_price):
        qc = self.create_quantum_circuit(market_data, current_price)
        compiled_circuit = transpile(qc, self.simulator, optimization_level=3)
        job = self.simulator.run(compiled_circuit, shots=2000)
        result = job.result()
        counts = result.get_counts()
        
        # Create a vector of quantum features
        feature_vector = np.zeros(2**self.num_qubits)
        total_shots = sum(counts.values())
        
        for bitstring, count in counts.items():
            index = int(bitstring, 2)
            feature_vector[index] = count / total_shots
            
        return feature_vector


class HybridQuantumBinaryPredictor:
    def __init__(self, num_qubits=8, lookback=10, forecast_window=5):
        self.num_qubits = num_qubits
        self.lookback = lookback
        self.forecast_window = forecast_window
        self.quantum_generator = QuantumFeatureGenerator(num_qubits)
        self.model = CatBoostClassifier(
            iterations=500,
            learning_rate=0.03,
            depth=6,
            loss_function='Logloss',
            verbose=False
        )
        
    def prepare_features(self, df):
        """Prepare hybrid features"""
        pattern_generator = BinaryPatternGenerator(df, self.lookback)
        binary_patterns = pattern_generator.get_all_patterns()
        
        features = []
        labels = []
        
        # Fill NaN in binary patterns
        for key in binary_patterns:
            binary_patterns[key] = binary_patterns[key].fillna(0)
        
        for i in range(self.lookback, len(df) - self.forecast_window):
            try:
                # Quantum features
                market_data = df['close'].iloc[i-self.lookback:i].values
                current_price = df['close'].iloc[i]
                quantum_features = self.quantum_generator.get_quantum_features(market_data, current_price)
                
                # Binary features
                binary_vector = []
                for key in binary_patterns:
                    window = binary_patterns[key].iloc[i-self.lookback:i].values
                    binary_vector.extend([
                        sum(window),  # Total number of signals
                        window[-1],   # Last signal
                        sum(window[-3:])  # Last 3 signals
                    ])
                
                # Technical indicators
                rsi = binary_patterns['rsi'].iloc[i]
                bollinger = binary_patterns['bollinger'].iloc[i]
                momentum = binary_patterns['momentum'].iloc[i]
                
                # Combine all features
                feature_vector = np.concatenate([
                    quantum_features,
                    binary_vector,
                    [rsi, bollinger, momentum]
                ])
                
                # Label: price movement direction
                future_price = df['close'].iloc[i + self.forecast_window]
                current_price = df['close'].iloc[i]
                label = 1 if future_price > current_price else 0
                
                features.append(feature_vector)
                labels.append(label)
                
            except Exception as e:
                print(f"Error at index {i}: {str(e)}")
                continue
        
        return np.array(features), np.array(labels)
    
    def train(self, df):
        """Train hybrid model"""
        print("Preparing features...")
        X, y = self.prepare_features(df)
        
        # Split into training and test samples
        split_point = int(len(X) * 0.8)
        X_train, X_test = X[:split_point], X[split_point:]
        y_train, y_test = y[:split_point], y[split_point:]
        
        print("Training model...")
        self.model.fit(X_train, y_train, eval_set=(X_test, y_test))
        
        # Model evaluation
        predictions = self.model.predict(X_test)
        probas = self.model.predict_proba(X_test)
        
        # Calculate metrics
        metrics = {
            'accuracy': accuracy_score(y_test, predictions),
            'precision': precision_score(y_test, predictions),
            'recall': recall_score(y_test, predictions),
            'f1': f1_score(y_test, predictions)
        }
        
        # Feature importance analysis
        feature_importance = self.model.feature_importances_
        quantum_importance = np.mean(feature_importance[:2**self.num_qubits])
        binary_importance = np.mean(feature_importance[2**self.num_qubits:])
        
        metrics.update({
            'quantum_importance': quantum_importance,
            'binary_importance': binary_importance,
            'test_predictions': predictions,
            'test_probas': probas,
            'test_actual': y_test
        })
        
        return metrics
    
    def predict_next(self, df):
        """Next movement forecast"""
        X, _ = self.prepare_features(df)
        if len(X) > 0:
            last_features = X[-1].reshape(1, -1)
            prediction_proba = self.model.predict_proba(last_features)[0]
            prediction = self.model.predict(last_features)[0]
            
            return {
                'direction': 'UP' if prediction == 1 else 'DOWN',
                'probability_up': prediction_proba[1],
                'probability_down': prediction_proba[0],
                'confidence': max(prediction_proba)
            }
        return None

def test_hybrid_model(symbol="EURUSD", timeframe=mt5.TIMEFRAME_H1, periods=1000):
    """Full test of the hybrid model"""
    try:
        # MT5 initialization
        if not mt5.initialize():
            raise Exception("Failed to initialize MT5")
        
        # Download data
        print(f"Loading {periods} periods of {symbol} {timeframe} data...")
        loader = MT5DataLoader(symbol, timeframe)
        df = loader.get_historical_data(periods)
        
        # Create and train model
        print("Creating hybrid model...")
        model = HybridQuantumBinaryPredictor()
        
        # Training and assessment
        print("Training and evaluating model...")
        metrics = model.train(df)
        
        # Output results
        print("\nModel Performance Metrics:")
        print(f"Accuracy: {metrics['accuracy']:.2%}")
        print(f"Precision: {metrics['precision']:.2%}")
        print(f"Recall: {metrics['recall']:.2%}")
        print(f"F1 Score: {metrics['f1']:.2%}")
        
        print("\nFeature Importance Analysis:")
        print(f"Quantum Features: {metrics['quantum_importance']:.2%}")
        print(f"Binary Features: {metrics['binary_importance']:.2%}")
        
        # Current forecast
        print("\nCurrent Market Prediction:")
        prediction = model.predict_next(df)
        if prediction:
            print(f"Predicted Direction: {prediction['direction']}")
            print(f"Up Probability: {prediction['probability_up']:.2%}")
            print(f"Down Probability: {prediction['probability_down']:.2%}")
            print(f"Confidence: {prediction['confidence']:.2%}")
        
        return model, metrics
        
    finally:
        mt5.shutdown()

if __name__ == "__main__":
    print("Starting Quantum-Binary Hybrid Trading System...")
    model, metrics = test_hybrid_model()
    print("\nSystem test completed.")

最初的结果令人咋舌。在欧元兑美元1小时图(H1)上,该系统的准确率仅为42.13%,甚至比随机猜测还要差。精确率为39.71%,召回率为27%,这表明模型的预测存在显著错误。F1分数为32.14%,也证实了预测结果的整体弱势。

对特征重要性的分析特别有趣。量子特征对模型整体预测能力的贡献仅为27.63%,而二元特征的贡献率则高达121.95%。这表明,要么系统的量子部分未能捕捉到数据中的重要模式,要么量子编码方法本身需要大幅改进。

但是这些结果并不会令人气馁——相反,它们指出了具体的改进方向。或许,应该改变量子编码电路,增加或减少量子比特的数量,并尝试其他量子门。或者,问题可能出在量子特征和常规特征如何相互整合上。

实验表明,创建一个高效的量子-经典混合系统并非简单地结合两种方法。这需要对量子计算和金融市场的特点都有深入地理解。这只是漫长研究和优化之旅的第一步。

在未来的迭代中,我计划对系统的量子部分进行大幅修改,可能增加更复杂的量子变换,并改进将市场信息编码为量子态的方式。同时,也可以尝试不同的量子特征和经典特征组合方式,以找到它们之间的最佳平衡。




结论

我本以为将量子计算与技术分析相结合是个绝妙的主意。您知道的,就像电影里演的那样:拿一项酷炫的技术,加上一点机器学习的魔力,瞧——一台印钞机就造好啦!但现实证明我错了。

我那个“绝妙”的量子分类器,准确率才勉强达到42%。这比抛硬币猜正反面还糟糕!您知道最搞笑的是什么吗?我花了数周时间研究出来的那些花哨的量子特征,只带来了27%的增益效果。而我“以防万一”加上的那些老掉牙的技术指标,却飙升了122%。

当我开始理解量子数据编码时,我会特别兴奋。想象一下——我们把普通的价格走势转化为量子态!这听起来像科幻小说,但它真的可行。尽管,效果并不完全如我所愿。

我的量子分类器目前更像是一个坏掉的计算器,而不是未来的交易系统。但我不会放弃。因为在这个由量子态和市场模式构成的奇妙世界里,某些真正有价值的东西正隐藏在某处。哪怕我得从头开始重写整个代码,我也打算找到它。

与此同时……我要重新研究我的量子比特了。不过,这个普通的“量子”方案还是显示出了一些效果。至少比瞎猜强……但我还有几个想法能让这玩意儿真正运转起来。而且我肯定会告诉您这些想法。


文中所用的程序

 文件 文件内容
Quant_Predict_p_1.py 基于经典量子方法的预测尝试及原型探索
Quant_Neural_Link.py 量子神经网络的草案版本
Quant_ML_Model.py 量子机器学习模型的草案版本



本文由MetaQuotes Ltd译自俄文
原文地址: https://www.mql5.com/ru/articles/16879

附加的文件 |
Quant_ML_Model.py (11.58 KB)
您应当知道的 MQL5 向导技术(第 51 部分):配以 SAC 的强化学习 您应当知道的 MQL5 向导技术(第 51 部分):配以 SAC 的强化学习
柔性参与者评论者是一种利用 3 个神经网络的强化学习算法。一名参与者网络和 2 个评论者网络。这些机器学习模型按主从伙伴关系配对,其中所建模评论者能提升参与者网络的预测准确性。在这些序列中引入 ONNX 的同时,我们探讨了如何将这些思路作为由向导汇编的智能系统的自定义信号,推进测试。
在 MQL5 中自动化交易策略(第三部分):用于动态交易管理的RSI区域反转系统 在 MQL5 中自动化交易策略(第三部分):用于动态交易管理的RSI区域反转系统
在本文中,我们将在MQL5中创建一个基于RSI区域反转策略的EA系统,该系统使用RSI信号来触发交易,并采用反转策略来管理亏损。我们实现了一个“ZoneRecovery”类,用以自动化交易入场、反转逻辑和仓位管理。文章最后将进行系统的回测,以优化性能并提升 EA 的有效性。
开发多币种 EA 交易(第 20 部分):整理自动项目优化阶段的输送机(一) 开发多币种 EA 交易(第 20 部分):整理自动项目优化阶段的输送机(一)
我们已经创建了不少有助于安排自动优化的组件。在创建过程中,我们遵循了传统的循环结构:从创建最小的工作代码到重构和获得改进的代码。是时候开始清理我们的数据库了,这也是我们正在创建的系统中的一个关键组件。
交易中的趋势准则 交易中的趋势准则
趋势是许多交易策略的重要组成部分。在本文中,我们将考察一些用来识别趋势及其特征的工具。理解并正确解释趋势,能够显著提升交易效率,并将风险最小化。