将 MQL5 与数据处理包集成(第 4 部分):大数据处理
概述
金融市场不断发展,交易者不再仅仅处理价格图表和简单指标 —— 他们还要应对来自世界各个角落的大量数据。在这个大数据时代,成功的交易不仅仅是策略;这是关于你如何高效地筛选堆积如山的信息,以找到可操作的见解。本文是我们关于将 MQL5 与数据处理工具集成的系列文章中的第四篇,重点介绍为您提供无缝处理大量数据集的技能。从实时报价数据到跨越数十年的历史档案,驯服大数据的能力正迅速成为复杂交易系统的标志。
想象一下,分析数百万个数据点以揭示微妙的市场趋势,或者将社会情绪或经济指标等外部数据集整合到您的 MQL5 交易环境中。可能性是无限的 —— 但前提是你拥有正确的工具。在这篇文章中,我们将探讨如何通过将 MQL5 与高级数据处理库和大数据解决方案集成,将其推向内置功能之外。无论你是一位经验丰富的交易员,旨在提升你的优势,还是一位探索金融技术潜力的好奇的开发人员,本指南都有望改变游戏规则。请继续关注,了解如何将大量数据转化为决定性优势。
收集历史数据
from datetime import datetime import MetaTrader5 as mt5 import pandas as pd import pytz # Display data on the MetaTrader 5 package print("MetaTrader5 package author: ", mt5.__author__) print("MetaTrader5 package version: ", mt5.__version__) # Configure pandas display options pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1500) # Establish connection to MetaTrader 5 terminal if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() # Set time zone to UTC timezone = pytz.timezone("Etc/UTC") # Create 'datetime' objects in UTC time zone to avoid the implementation of a local time zone offset utc_from = datetime(2024, 8, 6, tzinfo=timezone.utc) utc_to = datetime.now(timezone) # Set to the current date and time # Get bars from BTC H1 (hourly timeframe) within the specified interval rates = mt5.copy_rates_range("BTCUSD", mt5.TIMEFRAME_H1, utc_from, utc_to) # Shut down connection to the MetaTrader 5 terminal mt5.shutdown() # Check if data was retrieved if rates is None or len(rates) == 0: print("No data retrieved. Please check the symbol or date range.") else: # Display each element of obtained data in a new line (for the first 10 entries) print("Display obtained data 'as is'") for rate in rates[:10]: print(rate) # Create DataFrame out of the obtained data rates_frame = pd.DataFrame(rates) # Convert time in seconds into the 'datetime' format rates_frame['time'] = pd.to_datetime(rates_frame['time'], unit='s') # Save the data to a CSV file filename = "BTC_H1.csv" rates_frame.to_csv(filename, index=False) print(f"\nData saved to file: {filename}")
为了检索历史数据,我们首先使用 “mt5.initialize()” 函数建立与 MetaTrader 5 终端的连接。这是必不可少的,因为 Python 包直接与正在运行的 MetaTrader 5 平台通信。我们通过指定开始和结束日期来配置代码,以设置数据提取所需的时间范围。`datetime` 对象是在 UTC 时区创建的,以确保跨不同时区的一致性。然后,脚本使用 `mt5.copy-rates-range()` 函数请求 BTC/USD 交易品种的历史每小时数据,从 2024 年 8 月 6 日开始,直到当前日期和时间。
使用 “mt5.shutdown()” 断开与 MetaTrader 5 终端的连接后,避免任何进一步不必要的连接。获取到的数据最初以原始格式显示,以确认数据提取成功。然后,我们将这些数据转换为 pandas 数据帧,以便于操作和分析。此外,该代码将 Unix 时间戳转换为可读的日期时间格式,确保数据结构良好,可供进一步处理或分析。
filename = "XAUUSD_H1_2nd.csv" rates_frame.to_csv(filename, index=False) print(f"\nData saved to file: {filename}")
由于我的操作系统是 Linux,我必须将收到的数据保存到一个文件中。但对于那些使用 Windows 的人来说,您只需使用以下脚本即可检索数据:
from datetime import datetime import MetaTrader5 as mt5 import pandas as pd import pytz # Display data on the MetaTrader 5 package print("MetaTrader5 package author: ", mt5.__author__) print("MetaTrader5 package version: ", mt5.__version__) # Configure pandas display options pd.set_option('display.max_columns', 500) pd.set_option('display.width', 1500) # Establish connection to MetaTrader 5 terminal if not mt5.initialize(): print("initialize() failed, error code =", mt5.last_error()) quit() # Set time zone to UTC timezone = pytz.timezone("Etc/UTC") # Create 'datetime' objects in UTC time zone to avoid the implementation of a local time zone offset utc_from = datetime(2024, 8, 6, tzinfo=timezone.utc) utc_to = datetime.now(timezone) # Set to the current date and time # Get bars from BTCUSD H1 (hourly timeframe) within the specified interval rates = mt5.copy_rates_range("BTCUSD", mt5.TIMEFRAME_H1, utc_from, utc_to) # Shut down connection to the MetaTrader 5 terminal mt5.shutdown() # Check if data was retrieved if rates is None or len(rates) == 0: print("No data retrieved. Please check the symbol or date range.") else: # Display each element of obtained data in a new line (for the first 10 entries) print("Display obtained data 'as is'") for rate in rates[:10]: print(rate) # Create DataFrame out of the obtained data rates_frame = pd.DataFrame(rates) # Convert time in seconds into the 'datetime' format rates_frame['time'] = pd.to_datetime(rates_frame['time'], unit='s') # Display data directly print("\nDisplay dataframe with data") print(rates_frame.head(10))
如果由于某种原因,您无法获取历史数据,您可以通过以下步骤在 MetTrader5 平台上手动检索。启动您的 MetaTrader 平台,在 MetaTrader 5 窗格/面板顶部导航至 >工具,然后 >选项,您将进入图表选项。然后您必须选择要下载的图表中的柱形数量。最好选择无限柱形的选项,因为我们将处理日期,并且我们不知道在给定的时间段内有多少柱形。

之后,您现在必须下载实际数据。为此,您必须导航至 >查看,然后导航至 >交易品种,然后进入规格选项卡。根据要下载的数据类型,只需导航至 > 柱或报价 。继续并输入您想要下载的历史数据的开始和结束日期,然后单击请求按钮下载数据并将其保存为 .csv 格式。

Jupyter Lab 上的 MetaTrader 5 大数据处理
import pandas as pd # Load the uploaded BTC 1H CSV file file_path = '/home/int_junkie/Documents/DataVisuals/BTCUSD_H1.csv' btc_data = pd.read_csv(file_path) # Display basic information about the dataset btc_data_info = btc_data.info() btc_data_head = btc_data.head() btc_data_info, btc_data_head
输出:

从上面的代码中,我们一如既往地检查数据并了解数据集结构。我们检查数据类型、形态和完整性(使用info() )。我们还获取数据集的内容和布局(使用head() )。这是探索性数据分析中常见的第一步,以确保数据正确加载并熟悉其结构。
# Reload the data with tab-separated values btc_data = pd.read_csv(file_path, delimiter='\t') # Display basic information and the first few rows after parsing btc_data_info = btc_data.info() btc_data_head = btc_data.head() btc_data_info, btc_data_head
输出:

现在,我们使用此代码从假定使用制表符分隔值(TSV)而不是默认逗号分隔格式的文件中重新加载数据集。通过在 ` pd.read-csv() ` 中指定 ` delimiter=`\t` ,数据被正确解析为 Pandas `DataFrame` 以供进一步分析。然后,我们使用 “btc-data-infor” 来显示有关数据集的元数据,例如行数、列数、数据类型和任何缺失值。
# Combine <DATE> and <TIME> into a single datetime column and set it as the index btc_data['DATETIME'] = pd.to_datetime(btc_data['<DATE>'] + ' ' + btc_data['<TIME>']) btc_data.set_index('DATETIME', inplace=True) # Drop the original <DATE> and <TIME> columns as they're no longer needed btc_data.drop(columns=['<DATE>', '<TIME>'], inplace=True) # Display the first few rows after modifications btc_data.head()
输出:

# Check for missing values and duplicates missing_values = btc_data.isnull().sum() duplicate_rows = btc_data.duplicated().sum() # Clean data (if needed) btc_data_cleaned = btc_data.drop_duplicates() # Results missing_values, duplicate_rows, btc_data_cleaned.shape
输出:

从输出中我们可以看到,我们的数据集中没有任何缺失的值。
# Check for missing values print("Missing values per column:\n", btc_data.isnull().sum()) # Check for duplicate rows print("Number of duplicate rows:", btc_data.duplicated().sum()) # Drop duplicate rows if any btc_data = btc_data.drop_duplicates()
输出:

从输出中,我们还可以看到没有任何重复的行和列。
# Calculate a 20-period moving average btc_data['MA20'] = btc_data['<CLOSE>'].rolling(window=20).mean() import ta # Add RSI using the `ta` library btc_data['RSI'] = ta.momentum.RSIIndicator(btc_data['<CLOSE>'], window=14).rsi()
在这里,我们根据 “btc-data” 数据帧中的收盘价计算 20 周期移动平均线和 14 周期 RSI。这些指标广泛用于技术分析,被添加为新列(MA-20 和 RSI),以便进一步分析或可视化。这些步骤有助于交易者识别市场趋势和潜在的超买或超卖情况。
import matplotlib.pyplot as plt # Plot closing price and MA20 plt.figure(figsize=(12, 6)) plt.plot(btc_data.index, btc_data['<CLOSE>'], label='Close Price') plt.plot(btc_data.index, btc_data['MA20'], label='20-period MA', color='orange') plt.legend() plt.title('BTC Closing Price and Moving Average') plt.show()
输出:

我们使用 Matplotlib 库创建了比特币收盘价及其 20 周期移动平均线 (MA20) 的可视化表示。它初始化一个尺寸为 12x6 英寸的图形,并根据 DataFrame 的索引绘制收盘价,并将其标记为 “Close Price”(收盘价)。它覆盖了橙色的 20 周期移动平均线的第二个图,标记为 “20-period MA”(20 周期 MA)。添加了图例来区分这两条线,图表标题为 “BTC Closing Price and Moving Average”(BTC 收盘价和移动平均线)。最后,显示了图表,清晰地显示了价格趋势及其与移动平均线的关系。
import numpy as np # Add log returns btc_data['Log_Returns'] = (btc_data['<CLOSE>'] / btc_data['<CLOSE>'].shift(1)).apply(lambda x: np.log(x)) # Save the cleaned data btc_data.to_csv('BTCUSD_H1_cleaned.csv')
现在,我们计算比特币收盘价的对数回报,并将更新的数据集保存到一个新的 CSV 文件中。对数回报是通过将每个收盘价除以前一时期的收盘价,并对结果应用自然对数来计算的。这是通过使用 `shift(1)` 方法将每个价格与其前任对齐,然后应用带有 `np.log` 的 lambda 函数来实现的。 计算出的值存储在名为 “Log-returns”(对数回报)的新列中,提供了更易于分析的价格变化衡量标准,在金融建模和风险分析中特别有用。最后,更新后的数据集(包括新添加的 “Log-returns” 列)我们将其保存到名为 “BTCUSD-H1-cleaned.csv” 的文件中,以供进一步分析。
import seaborn as sns import matplotlib.pyplot as plt # Correlation heatmap sns.heatmap(btc_data.corr(), annot=True, cmap='coolwarm') plt.title('Correlation Heatmap') plt.show()
输出:

从热图中,我们使用 Seaborn 和 Matplotlib 可视化 `btc-data` 数据帧中数值列之间的相关性。`btc-data.corr()` 函数计算所有数值列的成对相关系数,量化它们之间的线性关系。`sns.heatmap()` 函数将此相关矩阵显示为热图,使用 `annot=True` 显示每个单元格中的相关值,使用 `cmap='coolwarm'` 使用发散调色板以便于解释。暖色调(红色)表示正相关,而冷色调(蓝色)表示负相关。使用 Matplotlib 添加标题 “Correlation Heatmap”(相关热图),并使用 “plt.show()” 显示图表。这种可视化有助于一目了然地识别数据集中的模式和关系。
from sklearn.model_selection import train_test_split # Define features and target variable X = btc_data.drop(columns=['<CLOSE>']) y = btc_data['<CLOSE>'] # Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
我们将 “btc-data” 数据帧分成训练和测试子集,为机器学习做好准备。首先,通过从数据集中删除 “<CLOSE>” 列来定义特征 “(x)”,而目标变量 “(y)” 设置为 “<CLOSE>” 列,表示要预测的值。然后使用 Scikit-learn 中的 “train-test-split” 函数将数据分为训练集和测试集,其中 80% 的数据用于训练,20% 的数据用于测试,如 “test-size=0.2” 所指定。`random-state=42` 确保分割可重复,从而在不同运行之间保持一致性。
# Simple Moving Average Crossover Strategy btc_data['Signal'] = (btc_data['MA20'] > btc_data['RSI']).astype(int) btc_data['Returns'] = btc_data['<CLOSE>'].pct_change() btc_data['Strategy_Returns'] = btc_data['Signal'].shift(1) * btc_data['Returns'] # Plot cumulative returns btc_data['Cumulative_Strategy'] = (1 + btc_data['Strategy_Returns']).cumprod() btc_data['Cumulative_Market'] = (1 + btc_data['Returns']).cumprod() btc_data[['Cumulative_Strategy', 'Cumulative_Market']].plot(title='Strategy vs. Market Returns') plt.show()
输出:

# Calculate short-term and long-term moving averages btc_data['MA20'] = btc_data['<CLOSE>'].rolling(window=20).mean() btc_data['MA50'] = btc_data['<CLOSE>'].rolling(window=50).mean() # Generate signals: 1 for Buy, -1 for Sell btc_data['Signal'] = 0 btc_data.loc[btc_data['MA20'] > btc_data['MA50'], 'Signal'] = 1 btc_data.loc[btc_data['MA20'] < btc_data['MA50'], 'Signal'] = -1 # Shift signal to avoid look-ahead bias btc_data['Signal'] = btc_data['Signal'].shift(1)
# Calculate returns btc_data['Returns'] = btc_data['<CLOSE>'].pct_change() btc_data['Strategy_Returns'] = btc_data['Signal'] * btc_data['Returns'] # Calculate cumulative returns btc_data['Cumulative_Market'] = (1 + btc_data['Returns']).cumprod() btc_data['Cumulative_Strategy'] = (1 + btc_data['Strategy_Returns']).cumprod() # Plot performance import matplotlib.pyplot as plt plt.figure(figsize=(12, 6)) plt.plot(btc_data['Cumulative_Market'], label='Market Returns') plt.plot(btc_data['Cumulative_Strategy'], label='Strategy Returns') plt.title('Strategy vs. Market Performance') plt.legend() plt.show()
输出:

在评估交易策略与市场相比的表现时,将结果可视化。首先,我们使用 `pct-change()` 将市场回报计算为 `<CLOSE>` 价格的百分比变化,并将其存储在 `Returns` 列中。策略回报是通过将 “signal” 列(代表交易信号,例如 1 表示买入、-1 表示卖出或 0 表示持有)与市场回报相乘来计算的,并将结果存储在 “strategy-returns” 中。市场和策略的累积回报均使用 `(1 + returns).comprod()` 计算,它模拟了在市场 `(Cumulative-market)` 中投资 1 美元或遵循策略 `(Cumulative-strategy)` 的复合增长。
# Add RSI from ta.momentum import RSIIndicator btc_data['RSI'] = RSIIndicator(btc_data['<CLOSE>'], window=14).rsi() # Add MACD from ta.trend import MACD macd = MACD(btc_data['<CLOSE>']) btc_data['MACD'] = macd.macd() btc_data['MACD_Signal'] = macd.macd_signal() # Target variable: 1 if next period's close > current close btc_data['Target'] = (btc_data['<CLOSE>'].shift(-1) > btc_data['<CLOSE>']).astype(int)
from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import accuracy_score, classification_report # Define features and target features = ['MA20', 'MA50', 'RSI', 'MACD', 'MACD_Signal'] X = btc_data.dropna()[features] y = btc_data.dropna()['Target'] # Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # Train a Random Forest Classifier model = RandomForestClassifier(n_estimators=100, random_state=42) model.fit(X_train, y_train) # Evaluate the model y_pred = model.predict(X_test) print("Accuracy:", accuracy_score(y_test, y_pred)) print(classification_report(y_test, y_pred))
输出:

从上面的代码中,我们实现了一个机器学习管道,使用随机森林分类器根据技术指标对交易信号进行分类。首先,定义特征集 `(x)`,包括 20 周期和 50 周期移动平均线 `(MA20, MA50)`、相对强弱指数(RSI)以及 MACD 相关特征 `(MACD, MACD-Signals)` 等指标。目标变量 “(y)” 设置为目标“列”,通常表示买入、卖出或持有信号。然后将 `(x)` 和 `(y)` 数据分成训练集和测试集,其中 80% 用于训练,20% 用于测试,通过 `(random-state=42)` 确保一致性。
随机森林分类器由 100 个决策树 `(n-estimators=100)` 初始化,并在训练数据 `(X-train 和 Y-train)` 上进行训练。 使用准确度分数来评估模型对测试集 “(X-test)” 的预测,以确定其正确性,并使用分类报告来提供每个类别的详细指标,例如精度、召回率和 F1 分数。
然后,我们使用以下代码部署模型:
import joblib # Save the model joblib.dump(model, 'btc_trading_model.pkl')
将所有内容放在 MQL5 中
我们将把 MQL5 连接到将运行我们训练的模型的 Python 脚本,我们必须在 MQL5 和 Python 之间建立一个通信渠道。在本例中,我们将使用 WebRequest 。
//+------------------------------------------------------------------+ //| BTC-Big-DataH.mq5 | //| Copyright 2024, MetaQuotes Ltd. | //| https://www.mql5.com | //+------------------------------------------------------------------+ #property copyright "Copyright 2024, MetaQuotes Ltd." #property link "https://www.mql5.com" #property version "1.00" #include <Trade\Trade.mqh> CTrade trade;
交易管理所需的所有必要内容和交易库(“trade.mqh”)。
// Function to get predictions from Python API double GetPrediction(double &features[]) { // Convert the features array to a JSON-like string string jsonRequest = "["; for (int i = 0; i < ArraySize(features); i++) { jsonRequest += DoubleToString(features[i], 6); if (i != ArraySize(features) - 1) jsonRequest += ","; } jsonRequest += "]"; // Define the WebRequest parameters string url = "http://127.0.0.1:5000/predict"; string hdrs = {"Content-Type: application/json"}; // Add headers if needed char data[]; StringToCharArray(jsonRequest, data); // Convert JSON request string to char array char response[]; ulong result_headers_size = 0; //-------------------------------------------------------------------------------------- string cookie=NULL; char post[], resultsss[]; // Send the WebRequest int result = WebRequest("POST", url, cookie, NULL, 500, post, 0, resultsss, hdrs); // Handle the response if (result == -1) { Print("Error sending WebRequest: ", GetLastError()); return -1; // Return an error signal } // Convert response char array back to a string string responseString; CharArrayToString(response, (int)responseString); // Parse the response (assuming the server returns a numeric value) double prediction = StringToDouble(responseString); return prediction; }
函数 “GetPrediction()” 将一组输入特征发送到基于 Python 的 API 并检索预测。这些特性作为一个 double 型数组传递,这些 double 型数据被转换为 JSON 格式的字符串,以匹配 API 的预期输入格式。此转换涉及遍历特征数组并将每个值附加到类似 JSON 的数组结构。`DoubleToString` 函数确保值以六位小数表示。然后将生成的 JSON 字符串转换为 “char” 数组。
然后,该函数准备使用 Web 请求向 API 端点 `(http://127.0.0.1:5000/predict)` 发出 POST 请求。定义了所需的参数, 一旦收到 API 响应,就会使用 “CharArrayToString” 将其转换回字符串。如果 Web 请求失败,则会记录错误,并且函数返回 -1。
//+------------------------------------------------------------------+ //| Expert tick function | //+------------------------------------------------------------------+ void OnTick(){ // Calculate indicators double MA20 = iMA(_Symbol, PERIOD_CURRENT, 20, 0, MODE_SMA, PRICE_CLOSE); double MA50 = iMA(_Symbol, PERIOD_CURRENT, 50, 0, MODE_SMA, PRICE_CLOSE); double RSI = iRSI(_Symbol, PERIOD_CURRENT, 14, PRICE_CLOSE); // Declare arrays to hold MACD data double MACD_Buffer[1], SignalLine_Buffer[1], Hist_Buffer[1]; // Get MACD handle int macd_handle = iMACD(NULL, 0, 12, 26, 9, PRICE_CLOSE); if (macd_handle != INVALID_HANDLE) { // Copy the most recent MACD values into buffers if (CopyBuffer(macd_handle, 0, 0, 1, MACD_Buffer) <= 0) Print("Failed to copy MACD"); if (CopyBuffer(macd_handle, 1, 0, 1, SignalLine_Buffer) <= 0) Print("Failed to copy Signal Line"); if (CopyBuffer(macd_handle, 2, 0, 1, Hist_Buffer) <= 0) Print("Failed to copy Histogram"); } // Assign the values from the buffers double MACD = MACD_Buffer[0]; double SignalLine = SignalLine_Buffer[0]; // Assign features double features[5]; features[0] = MA20; features[1] = MA50; features[2] = RSI; features[3] = MACD; features[4] = SignalLine; // Get prediction double signal = GetPrediction(features); if (signal == 1){ MBuy(); // Adjust lot size } else if (signal == -1){ MSell(); } }
`OnTick` 首先计算关键技术指标:20 周期和 50 周期简单移动平均线(MA20 和 MA50)用于跟踪趋势方向,以及 14 周期相对强弱指数(RSI)用于衡量市场动量。此外,它使用 “iMACD” 函数检索 MACD 线、信号线和直方图的值,并在验证 MACD 句柄后将这些值存储在缓冲区中。这些计算指标被组装成一个 “features” 数组,作为通过 “GetPrediction” 函数访问的机器学习模型的输入。该模型预测交易行为,对于买入信号返回 1,对于卖出信号返回 -1。根据预测,该函数使用 “MBuy” 执行买入交易或使用 “MSell()” 执行卖出交易。
Python API
以下是使用 Flask 为 BTC 交易决策提供来自预先训练的机器学习模型的预测的 web API。
from flask import Flask, request, jsonify import joblib import pandas as pd # Load the model model = joblib.load('btc_trading_model.pkl') app = Flask(__name__) @app.route('/predict', methods=['POST']) def predict(): data = request.json df = pd.DataFrame(data) prediction = model.predict(df) return jsonify(prediction.tolist()) app.run(port=5000)
结论
总之,我们通过结合大数据处理、机器学习和自动化开发了一个全面的交易解决方案。从历史 BTC/USD 数据开始,我们对其进行处理和清理,以提取有意义的特征,如移动平均线、RSI 和 MACD。我们使用这些处理过的数据来训练一个能够预测交易信号的机器学习模型。训练好的模型被部署为基于 Flask 的 API,允许外部系统查询预测。在 MQL5 中,我们实现了一个 EA 交易,它收集实时指标值,将它们发送到 Flask API 进行预测,并根据返回的信号执行交易。
这种集成交易解决方案通过将技术指标的精度与机器学习的智能相结合,为交易者提供支持。通过利用基于历史数据训练的机器学习模型,该系统能够适应市场动态,做出明智的预测,从而改善交易结果。通过 API 部署模型实现了灵活性,允许交易者将其集成到 MQL5 等不同平台中。
本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/16446
注意: MetaQuotes Ltd.将保留所有关于这些材料的权利。全部或部分复制或者转载这些材料将被禁止。
本文由网站的一位用户撰写,反映了他们的个人观点。MetaQuotes Ltd 不对所提供信息的准确性负责,也不对因使用所述解决方案、策略或建议而产生的任何后果负责。
MQL5中的自动化交易策略(第七部分):构建具备仓位动态调整功能的网格交易EA
价格行为分析工具包开发(第十三部分):RSI 哨兵工具
您应当知道的 MQL5 向导技术(第 52 部分):加速器振荡器
在 MQL5 中创建交易管理面板(第九部分):代码组织(二):模块化