将您自己的 LLM 集成到 EA 中(第 5 部分):使用 LLM 开发和测试交易策略(四) —— 测试交易策略
目录
概述
在之前的文章中,我们介绍了如何使用不同的方法微调预训练的 GPT-2 模型,使 GPT-2 根据我们的意愿执行任务,并在多个维度上比较了这些方法。当然,我们只介绍了几种常用的方法,这并不意味着只有这些方法可以用来微调 GPT-2 模型。您可以根据我们的示例实现过程,尝试使用其他方法对 GPT-2 进行微调,比较它们,并选择更好的模型。如果您在此过程中遇到任何问题,可以在文章末尾发表评论。
现在,我们微调的 GPT-2 模型具有执行简单量化交易策略的初始能力。因此,本文将介绍如何将我们的微调模型整合到我们的量化交易策略中。示例中使用的模型是经过适配器调优微调的 GPT-2 模型(具体文章链接:将您自己的 LLM 集成到 EA 中(第 5 部分):使用 LLM 开发和测试交易策略(三)—— 适配器调优)。因此,除非另有说明,否则本文中所有对 GPT-2 的引用都是指该模型。
然而,应该指出的是,我们微调的模型基于有限的数据进行演示,无法处理真实的交易环境。没有测试和优化,不要直接在真实交易中使用它们,这一点至关重要。我们之前的预测代码是在 Python 环境中完成的,但 MQL5 作为 MetaTrader 5 平台高度集成的编程语言,提供了开发 EA 交易的强大工具。因此,要实现自动化量化交易策略,我们需要回到 MQL5 环境。本文将逐步实现这一过程。
让我们看看如何将这个训练好的模型从 Python 环境迁移到 MQL5 EA,使其直接在 MetaTrader 5 平台上运行,以支持实时交易决策。
本文示例的开发环境
让我们介绍一下本文中提供的代码示例的运行环境。当然,这并不意味着您的代码环境必须与我的相同,但如果您在运行代码时遇到问题,可以参考我的环境配置。
- 操作系统:Ubuntu 22.04.5 LTS(或对应版本的 WSL)
- Python 版本:3.10.14
- 所需的 Python 库:
- torch-2.4.1
- numpy-1.26.3
- pandas-2.2.3
- transformers-4.45.1
- petf-0.13.0
- matplotlib-3.9.2
- onnx-1.17.0
- onnxconverter-common-1.14.0
- onnxruntime-1.20.1
- onnxruntime-tools-1.7.0
请注意,在进行下一步之前,请确保你已经按照上一篇文章所述使用适配器调优训练了一个模型(因为模型大小超出了平台限制,所以我已经训练好的权重无法上传)。
在 MQL5 中加载 LLM 的方法
为了将训练好的 GPT-2 模型集成到 MQL5 EA 中,我们需要解决的第一个问题是如何在 MQL5 环境中加载和运行这个模型,它本质上是一个 Python 训练的模型。下面介绍几种可行的方法:
1.将模型转换为 ONNX 并将其添加到 EA 中
ONNX(开放神经网络交换)是一种表示神经网络的开放格式,允许不同深度学习框架之间的互操作性。在我之前的文章中,我介绍了如何使用 ONNX 将简单模型集成到 EA 中(时间序列挖掘的数据标签(第六部分):使用 ONNX 在 EA 中应用和测试)。我们还可以将 GPT-2 模型转换为 ONNX 格式,将其导入 EA,并使用 MQL5 中的内置 ONNX 运行时库来执行模型推理。有关 MQL5 对 ONNX 的支持,请参阅帮助文件 “MQL5 参考/ONNX 模型” 或 MQL5 官方文档( https://www.mql5.com/zh/docs/onnx )。
- 优点:
- 高性能:ONNX 运行时通常针对性能进行优化,从而在 EA 中实现相对高效的推理。
- 高度集成:MQL5 内置了对 ONNX 的支持,无需外部程序或库。
- 独立:转换后的 ONNX 模型可以独立于 Python 环境运行。
- 缺点:
- 转换的复杂性:将复杂的语言模型转换为 ONNX 格式可能具有挑战性,需要处理运算符兼容性问题。
- 调试难度:调试 ONNX 模型不如调试 Python 模型方便。
2.使用 Winapi 直接运行 Python 推理脚本
MQL5 提供对 Winapi 的访问,允许我们从 “kernel32.dll” 调用 `WinExec()` 函数来执行外部程序。这样,我们可以使用现有的 Python 脚本加载 GPT-2 模型并进行推理,然后在 EA 中使用 `WinExec()` 调用该脚本并解析其输出结果(或者,`shell32.dll` 中的 `ShellExecuteW()` 函数也可以实现此功能)。该方法需要一定的开发经验和熟悉 Windows 开发才能实现。
- 优点:
- 简单直接:无需转换模型,直接利用现有的 Python 代码。
- 灵活性:可以轻松使用 Python 生态系统中丰富的库和工具。
- 缺点:
- 性能开销:每次推理都需要启动一个新的 Python 进程,从而导致显著的性能开销和效率低下。
- 依赖性:EA 依赖于外部 Python 环境和脚本。
- 数据交换:需要处理 MQL5 和 Python 之间的数据交换,增加了复杂性。
- 安全性:可能会发生各种意外情况,从而可能导致无法控制的崩溃。
请注意:这种方法非常不推荐!我提供这个解决方案只是为了表明它是可行的,可以在测试或受控条件下使用。如果没有足够的信心,请勿使用它。
3.通过 Socket 通信获取 Python 推理结果
和第二种方法类似,但是使用 Socket 通信代替 Winapi(其实也可以使用 HTTP 协议,类似主流推理框架提供的 HTTP 服务,本质上和 Socket 是一样的,本文不再赘述)。具体实现方法是用 Python 运行一个 Socket 服务器来加载模型并进行推理,EA 作为客户端连接服务器,发送输入数据,接收推理结果。
- 优点:
- 更好的性能:Socket 通信可以减少进程启动的开销,而且更加安全。
- 灵活性:仍然可以发挥 Python 的优势。
- 缺点:
- 复杂性:需要实现 Socket 服务器与客户端之间的通信逻辑。
- 依赖性:该 EA 依赖于外部 Python 环境和 Socket 服务器,需要一些知识来设置服务。
- 稳定性:Socket 连接的稳定性可能会影响 EA 的运行。
请注意:这个方法在我之前的文章中有具体的实现。有兴趣的话可以参考一下文章:时间序列挖掘的数据标签(第五部分):使用套接字在 EA 中应用和测试。
我们已经讨论了几种不同的转换方法。目前我还是倾向于选择将 GPT-2 模型转换成 ONNX 格式并集成到 EA 中,因为这样也可以解决跨平台的问题,并且 EA 的集成度和稳定性更高。但是,如果 ONNX 模型参数过大,则无法在 MQL5 中运行(例如,我们当前的 GPT-2 模型超过了 MQL5 文件加载限制)。
另一个具有挑战性的问题是解决 Transformers 模型中的标记器问题,因为像 GPT-2 这样的模型带有标记器来处理输入信息,而要在 MQL5 中运行 GPT-2 模型,我们必须在 MQL5 中构建 GPT-2 标记器,这是一个大工程。这很困难,但并非不可能。然而,MQL5文件大小加载限制是一个难以解决的问题。
虽然我尝试将其量化为 INT8 格式,但仍然超出限制,无法加载。如果量化为 INT4 格式,虽然模型大小满足要求,但是 MQL5 不支持 INT4 格式量化模型!因此,我们只能遗憾地放弃这种方法。不过,我仍然会在本文中提供一个如何将我们的适配器调优的 GPT-2 模型转换为 ONNX 格式的示例,希望能尽快解决这个问题!
在本文中,我最终决定讨论使用 Socket 通信与 Python 推理服务。这种方法的好处是可以保证数据安全,并且简化我们的 EA 实现。在 EA 中我们只需要关注我们的策略和交易逻辑,不需要考虑额外的模块集成问题。该方法的另一个优点是,如果本地没有相关的模型开发环境,比如在远程设备上开发和训练的模型,即使开发环境与本地环境不兼容,仍然可以使用此方法来开发 EA。
总体而言,尽管这种方法在技术上可能看起来很复杂,可能需要更多的知识,但它可以实现高运营效率,并确保 EA 的独立性,这对高效的实时交易环境至关重要。鉴于我们在上一篇文章中已经详细讨论了这一点,本文将不再进一步描述细节。如果对代码示例还有疑问,可以参考上一篇文章的详细介绍。
将 GPT-2 模型转换为 ONNX 模型
在上一节中,我描述了将微调后的 GPT-2 模型转换为 ONNX 格式并在 MQL5 中使用时遇到的各种挑战。然而,我仍然认为这是一个值得尝试的方向,所以在这篇文章中,我将用一个额外的部分来介绍如何将这种个性化的微调模型转换为 ONNX 格式,希望每个人都能找到解决当前困境的办法。如果您对这部分不感兴趣,可以跳过本节。
1.模型转换方法
一.直接转换( https://github.com/rayhern/convert-gpt2-xl-to-onnx )
该 GitHub 存储库提供了一个直接转换 GPT-2 模型的脚本,基于 Hugging Face 的 `transformers` 库和 `torch.onnx` 导出器。但由于作者长期缺乏维护,它可能存在一些限制,并且可能与最新的 `transformers` 库版本不兼容。
- 优点: 提供了相对简单的脚本,可以直接使用;专门针对 GPT-2 模型转换进行了优化。
- 缺点: 该存储库的维护状态可能不清楚,并且可能与最新的 “transformers” 版本不兼容,并且仅适用于特定版本的 GPT-2 模型。
二.微软的 ONNX API( https://github.com/microsoft/onnxruntime-genai )
微软的 “onnxruntime-genai” 库为生成式 AI 模型提供了一组 ONNX 转换和优化 API。
- 优点:针对 ONNX 运行时进行了优化,提高了推理性能,并由 Microsoft 支持和维护。
- 缺点:需要学习 “onnxruntime-genai” 库的 API,与其他方法相比,它可能更复杂。
三.使用 torch.onnx 导出模型
PyTorch 提供了内置的 ONNX 导出功能(`torch.onnx`),可以将 PyTorch 模型导出为 ONNX 格式。
- 优点:与 PyTorch 框架紧密集成,易于使用,`torch.onnx` 是一个广泛使用的 ONNX 导出工具。
- 缺点:可能需要处理一些运算符兼容性问题,特别是对于较新的或自定义的运算符,并且可能需要手动调整一些导出参数以确保模型的正确性和性能。
四.使用 “transformers.onnx” 转换模型
Hugging Face 的 `transformers` 库提供了自己的 ONNX 转换工具(`transformers.onnx`),可以轻松将 `transformers` 库中的模型转换为 ONNX 格式。
- 优点:简单易用,提供简单的命令行界面轻松转换模型,与 `transformers` 库紧密集成,支持多种预训练模型,并由 Hugging Face 团队积极维护和更新。
- 缺点:与 “torch.onnx” 相比,“transformers.onnx” 是一个相对较新的工具,可能存在兼容性问题。
五.使用 Optimum
Optimum 是 Hugging Face 推出的用于模型优化和加速的工具库,同时提供 ONNX 转换功能。
- 优点:优化集成,可以将 ONNX 转换与其他优化技术(例如量化、剪枝)相结合,由 Hugging Face 团队支持和维护。
- 缺点:需要学习 Optimum 库的使用方法,需要一定的技术基础。
这些转换方法各有优缺点。您不必局限于本文中使用的方法,可以根据您的需求选择合适的方法。我们的示例将使用 “transformers.onnx” 库来转换 GPT-2 模型。
2.将 GPT-2 模型转换为 ONNX 模型
在确定使用 `transformers.onnx` 进行模型转换后,我们现在提供详细的转换过程。
一.安装依赖项
首先,确保安装了“transformers” 库和 “onnx” 库。如果没有安装,可以使用以下命令进行安装:
pip install transformers onnx
如果需要针对特定硬件进行优化,例如使用 GPU 加速,还需要安装 `onnxruntime-gpu`:
pip install onnxruntime-gpu
二.转换命令
`transformers.onnx` 提供了一个简单的命令行工具。如果没有特殊要求,使用此工具进行模型转换很简单,只需运行以下命令:
python -m transformers.onnx --model=path/to/your/tuned_model --feature=causal-lm-with-past path/to/save/onnx_model
该命令中的参数:
- `python -m transformers.onnx`:调用“transformers.onnx”工具。
- `--model=path/to/your/tuned_model`:指定微调后的 GPT-2 模型的路径。在我们的示例中,此路径是 “gpt2_Adapter-tuning”。
- `--feature=causal-lm-with-past`:指定模型功能的类型。由于我们使用的是因果语言模型,需要支持 “past_key_values” 来提高生成效率,所以我们选择 “causal-lm-with-past”。
- `path/to/save/onnx_model`:指定保存 ONNX 模型的路径。例如,我们可以将其设置为 “gpt2_onnx”。
完整示例命令:
python -m transformers.onnx --model=gpt2_Adapter-tuning --feature=causal-lm-with-past gpt2_onnx
在命令行中运行上述命令,“transformers.onnx” 将自动下载必要的配置文件并将模型转换为 ONNX 格式。转换完成后,您将在指定的输出目录(在本例中为 “gpt2_onnx”)中看到一个名为 “model.onnx” 的文件,以及一些可能的 JSON 文件,例如 “config.json”。
但是,如果您需要在模型转换过程中调整一些设置以更好地适应当前的用例,则此工具显然无法满足我们的需求。因此,对于复杂的应用场景,仍然需要编写适当的转换脚本,以便对导出的模型表单进行更精确的控制。
三.转换脚本
要转换使用适配器微调后的 GPT-2 模型,转换过程中需要加载 Adapter 模块和设置,同时设置 ONNX OP 版本,避免兼容性问题。接下来我们根据自己的需求,逐步实现相关的功能。
首先,我们导入所需的 Python 库以及 Adapter() 和 GPT2LMHeadModelWithAdapters() 类。这些类在上一篇文章中进行了详细介绍(将您自己的 LLM 集成到 EA 中(第 5 部分):使用 LLM 开发和测试交易策略(三)—— 适配器调优)。您可以选择直接从现有脚本导入,在这里我们将这些类复制到转换脚本中,以便更好地理解:
import os import logging from pathlib import Path from transformers.onnx import export, FeaturesManager from transformers import AutoConfig, AutoTokenizer, GPT2LMHeadModel, modeling_outputs from torch import nn import torch.nn.functional as F import onnx # Set up basic configuration for logging logging.basicConfig(level=logging.INFO) tokenizer = AutoTokenizer.from_pretrained('gpt2') # Define the Adapter class, which is a simple feed-forward network with dropout class Adapter(nn.Module): def __init__(self, in_features, bottleneck_features=64): super(Adapter, self).__init__() # Down projection layer self.down_project = nn.Linear(in_features, bottleneck_features) # Up projection layer self.up_project = nn.Linear(bottleneck_features, in_features) # Dropout layer for regularization self.dropout = nn.Dropout(0.1) # Initialize weights of the layers self.init_weights() def init_weights(self): # Initialize weights for down projection layer nn.init.normal_(self.down_project.weight, mean=0.0, std=0.02) nn.init.constant_(self.down_project.bias, 0) # Initialize weights for up projection layer nn.init.normal_(self.up_project.weight, mean=0.0, std=0.02) nn.init.constant_(self.up_project.bias, 0) def forward(self, hidden_states): # Apply down projection and ReLU activation hidden_states = self.down_project(hidden_states) hidden_states = F.relu(hidden_states) # Apply dropout hidden_states = self.dropout(hidden_states) # Apply up projection hidden_states = self.up_project(hidden_states) # Apply dropout again hidden_states = self.dropout(hidden_states) return hidden_states # Define the GPT2LMHeadModelWithAdapters class, which inherits from GPT2LMHeadModel # and adds adapter layers to each transformer layer class GPT2LMHeadModelWithAdapters(GPT2LMHeadModel): def __init__(self, config): super().__init__(config) # Create a list of adapter modules, one for each transformer layer self.adapters = nn.ModuleList([Adapter(config.n_embd) for _ in range(config.n_layer)]) def forward( self, input_ids=None, past_key_values=None, attention_mask=None, token_type_ids=None, position_ids=None, head_mask=None, inputs_embeds=None, encoder_hidden_states=None, encoder_attention_mask=None, labels=None, use_cache=None, output_attentions=None, output_hidden_states=None, return_dict=None, ): # Get the outputs from the transformer transformer_outputs = self.transformer( input_ids, past_key_values=past_key_values, attention_mask=attention_mask, token_type_ids=token_type_ids, position_ids=position_ids, head_mask=head_mask, inputs_embeds=inputs_embeds, encoder_hidden_states=encoder_hidden_states, encoder_attention_mask=encoder_attention_mask, use_cache=use_cache, output_attentions=output_attentions, output_hidden_states=output_hidden_states, return_dict=return_dict, ) hidden_states = transformer_outputs[0] # Apply each adapter to the hidden states for i, adapter in enumerate(self.adapters): hidden_states = hidden_states + adapter(hidden_states) # Get the logits for the language modeling head lm_logits = self.lm_head(hidden_states) # Compute loss if labels are provided loss = None if labels is not None: # Shift logits and labels for loss computation shift_logits = lm_logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten the logits and labels for cross-entropy loss loss_fct = nn.CrossEntropyLoss() loss = loss_fct(shift_logits.view(-1, shift_logits.size(-1)), shift_labels.view(-1)) # Return the outputs in the appropriate format if not return_dict: output = (lm_logits,) + transformer_outputs[1:] return ((loss,) + output) if loss is not None else output return modeling_outputs.CausalLMOutputWithCrossAttentions( loss=loss, logits=lm_logits, past_key_values=transformer_outputs.past_key_values, hidden_states=transformer_outputs.hidden_states, attentions=transformer_outputs.attentions, cross_attentions=transformer_outputs.cross_attentions, )
接下来,我们需要加载微调后的 GPT-2 模型,并控制模型转换过程。我们使用 `load_model_and_tokenizer()` 函数加载经过微调的 GPT-2 模型,使用 `export_model_to_onnx()` 函数将模型转换为 ONNX 格式,使用 `main()` 函数控制整个流程和输入/输出路径。最后,我们定义一个 `check_onnx()` 函数来检查导出结果,并定义一个 `quantization()` 函数进行量化。以下是一个示例:
# Function to load the model and tokenizer def load_model_and_tokenizer(model_id): try: # Load the model configuration config = AutoConfig.from_pretrained(model_id) # Load the model model = GPT2LMHeadModelWithAdapters.from_pretrained(model_id) # Load the tokenizer # tokenizer = AutoTokenizer.from_pretrained('gpt2') return config, model,tokenizer except Exception as e: # Log any errors that occur during loading logging.error(f"Error loading model and tokenizer: {e}") raise # Function to export the model to ONNX format def export_model_to_onnx(model, config, tokenizer, output_path, opset): try: # Get the appropriate feature for the model model_type = config.model_type.replace("-", "_") feature = "causal-lm-with-past" # Get the ONNX configuration onnx_config_constructor = FeaturesManager.get_config(model_type, feature=feature) onnx_config = onnx_config_constructor(config) # Create the output directory if it doesn't exist if not os.path.exists(output_path.parent): os.makedirs(output_path.parent) # Export the model to ONNX export( model=model, config=onnx_config, opset=opset, output=output_path, preprocessor=tokenizer, ) # Log success message logging.info(f"Model successfully converted to ONNX and saved in {output_path}") except Exception as e: # Log any errors that occur during export logging.error(f"Error exporting model to ONNX: {e}") raise # Main function to orchestrate the process def main(): # Define the model ID, output path, and ONNX opset version model_id = "gpt2_Adapter-tuning" onnx_path = "./gpt2_onnx" out_path = Path(os.path.join(onnx_path, "gpt2_adapter_tuning.onnx")) opset = 14 # Load the model and tokenizer config, model, tokenizer = load_model_and_tokenizer(model_id) # Export the model to ONNX export_model_to_onnx(model, config, tokenizer, out_path, opset) def check_onnx(): # Check the ONNX model onnx_model = onnx.load("gpt2_onnx/gpt2_adapter_tuning.onnx") onnx.checker.check_model(onnx_model) print("ONNX model check passed!") def quantization(): from onnxruntime.quantization import quantize_dynamic, QuantType # load model model_path = "gpt2_onnx/gpt2_adapter_tuning.onnx" onnx_model = onnx.load(model_path) #dynamic quantize INT4 quantized_model_path = "gpt2_onnx/quantized_gpt2.onnx" quantize_dynamic(model_path, quantized_model_path, weight_type=QuantType.QUInt4) print(f"Save the quantized model to: {quantized_model_path}")
这部分代码的实现没有任何困难,代码中有详细的注释,所以我们不会详细讨论。我们将只讨论代码的关键部分:
- 必须将模型类与 Adapter 模块一起使用才能加载微调模型:
model = GPT2LMHeadModelWithAdapters.from_pretrained(model_id)
- 文件路径必须转换为 `transformers.onnx.export()` 支持的路径格式,不能直接作为字符串路径使用。我们使用 `pathlib` 库中的 `Path` 类来转换:
out_path = Path(os.path.join(onnx_path, "gpt2_adapter_tuning.onnx")) - `export()` 函数的 `tokenizer` 和 `preprocessor` 参数只能设置一个,否则会报错。建议使用 `preprocessor`:
export(model=model, config=onnx_config, opset=opset, output=output_path, preprocessor=tokenizer) - 确定 opset 版本,必须与 MQL5 支持的 opset 版本对应才能正确加载。我们选择 opset=14:
opset = 14 - 模型的输入路径(即包含使用适配器微调后的 GPT-2 模型的文件夹)设置为当前项目路径下的 `gpt2_Adapter-tuning` 文件夹,输出路径设置为当前项目路径下的 `gpt2_onnx` 文件夹:
model_id = "gpt2_Adapter-tuning" onnx_path = "./gpt2_onnx"
- `check_onnx()` 和 `quantization()` 函数不是强制性的,仅供参考。
当然,这只是一个基本的转换脚本示例。我们还没有设置更多细节,例如序列的动态输入支持。如果您需要相应的功能,请根据示例脚本添加相关功能。
完整的转换脚本也提供在附件中,名为 “torch2onnx.py”。
制定 EA 策略和服务器功能
我们已经确定了 EA 的运行模式。接下来,我们需要指定一个计划来确定服务器提供哪些服务以及客户端集成哪些功能:EA 客户端主要负责数据采集和事务执行;Python 服务器接收客户端发送的数据,计算推理结果,并将结果返回给客户端;EA 客户端和 Python 服务器通过 Socket 进行通信。
1.EA 策略
接下来,我们将根据 GPT-2 的预测结果设计交易策略。由于本文的重点是演示如何将 GPT-2 模型集成到 MQL5 EA 中,我们将创建一个简单的交易策略作为示例。应该强调的是,这只是一个简单的示例策略,仅用于演示目的,并不构成任何实际的交易建议。在实际应用中,需要制定更完整、更稳健的交易策略,并进行彻底的回溯测试和风险评估。
EA 策略逻辑:
- 每 1 分钟获取过去 20 个时间点的收盘价数据。
- 将数据传输到服务器,等待服务器发回计算结果。
- 根据服务器发回的交易信号发送交易订单,不设置止损或止盈,并且始终只持有一个订单。
2.服务器功能设计
在服务器端,我们必须实现从 EA 客户端接收数据、运行模型推理以获得结果以及根据推理结果计算要发送回客户端的交易信号的主要功能。
服务器端功能:
- 从客户端接收数据。
- 加载 GPT-2 模型和标记器并始终保持模型准备就绪。
- 运行推理,并根据推理结果计算当前实际价格与预测价格平均值之间的差异。如果差值超过 0,则发出“买入”信号;如果小于 0,则发出“卖出”信号;如果等于 0,则不发出任何信号。
- 检查并决定使用 CPU 还是 GPU 进行模型推理(取决于当前设备支持的模式)。
接下来,我们将完成相应的功能实现。
创建推理服务
关于如何创建推理服务,我在之前的文章中已经提供了详细的描述(时间序列挖掘的数据标签(第五部分):使用套接字在 EA 中应用和测试),其中提供了这里使用的 `server.py` 脚本)。代码部分依然会遵循上一篇文章中 `server.py` 脚本的主要逻辑,只是针对我们微调后的 GPT-2 模型做了一些适配,并做了一些其他的优化和改进。
修改后的代码主要有以下变化:
- 将模型推理适配到 GPT-2 模型,并对 `eva()` 函数进行较大改动。
- 优化 Socket 握手逻辑,增加断线后客户端无需重启服务器即可重连的功能,更加方便回测,回测结束后无需重启服务器。
- 添加客户端连接状态检测,以避免不必要的资源浪费。
- 避免重复打印结果,仅在预测结果发生变化时打印结果。
- 添加错误处理逻辑以避免服务器崩溃。
- 优化整体代码逻辑。
关于代码部分,本文将不再进一步讨论细节,只讨论需要修改的部分。
1.导入所需的库
除了导入所需的常规库之外,我们还需要导入我们内置在脚本中的 Adapter 和 GPT2LMHeadModelWithAdapters 类。您可以从我之前关于微调 GPT-2 的文章中获取这些类,或者直接从本文提供的 `torch2onnx.py` 导入它们。示例代码选择直接从 “torch2onnx.py” 导入这两个类。
import socket from time import sleep import pandas as pd import numpy as np import warnings import base64 import hashlib import struct from torch2onnx import GPT2LMHeadModelWithAdapters,Adapter from transformers import AutoTokenizer import logging import torch from statistics import mean # Set logging and warning logging.basicConfig(level=logging.INFO) warnings.filterwarnings("ignore") # Set device dvc='cuda' if torch.cuda.is_available() else 'cpu' # Global model_id = "gpt2_Adapter-tuning" encoder_length=20 prediction_length=10 info_file="results.json" host="0.0.0.0" port=10055
2.在 `load_model()` 函数中添加 GPT-2 模型加载逻辑
在原始脚本(`server.py`)中,使用 `load_model()` 函数来加载模型。注意,我们需要在这里添加 GPT-2 模型加载逻辑,以及 GPT-2 标记器的加载逻辑。
# Function to loda model def load_model(): try: # Load the model model = GPT2LMHeadModelWithAdapters.from_pretrained(model_id).to(dvc) # Load the tokenizer tokenizer = AutoTokenizer.from_pretrained('gpt2') print("Model loaded!") return model,tokenizer except Exception as e: # Log any errors that occur during loading logging.error(f"Error loading model and tokenizer: {e}") raise
3.在 `eva()` 函数中添加 GPT-2 模型推理逻辑
def eva(msg,model,tokenizer): # Get the data msg=np.fromstring(msg, dtype=float, sep= ',').tolist() # Parse the data input_data=msg[-encoder_length:] # Create the prompt prompt = ' '.join(map(str, input_data)) # Generate the predication token=tokenizer.encode(prompt, return_tensors='pt').to(dvc) attention_mask = torch.ones_like(token).to(dvc) model.eval() generated = tokenizer.decode( model.generate( token, attention_mask=attention_mask, pad_token_id=tokenizer.eos_token_id, do_sample=True, max_length=200)[0], skip_special_tokens=True) generated_prices=generated.split('\n')[0] # Remove non-numeric formats def try_float(s): try: return float(s) except ValueError: return None generated_prices=generated_prices.split() generated_prices=list(map(try_float,generated_prices)) generated_prices = [f for f in generated_prices if f is not None] generated_prices=generated_prices[0:prediction_length] # Calculate and send the results last_price=input_data[-1] prediction_mean=mean(generated_prices) if (last_price-prediction_mean) >= 0: # print('Send sell.') return "sell" else: # print("Send buy.") return "buy"
请注意,输入长度必须与使用适配器微调训练 GPT-2 模型时使用的数据格式匹配:
- input_data = msg[-encoder_length:]:将客户端发送的最后 20 个数据点作为模型输入
- prompt = ' '.join(map(str, input_data)):将数据转换为字符串格式并转换成提示
- token = tokenizer.encode(prompt, return_tensors='pt').to(dvc):使用预训练的 GPT-2 模型的标记器对提示进行编码并将其传输到当前支持的设备(与用于模型推理的设备匹配)。
- attention_mask = torch.ones_like(token).to(dvc):定义模型推理的注意力掩码
- model.generate(token, attention_mask=attention_mask, pad_token_id=tokenizer.eos_token_id, do_sample=True, max_length=200)[0]:运行模型推理
- generated = tokenizer.decode(model.generate(...), skip_special_tokens=True):解码预测结果并设置跳过特殊标记
- generated_prices = generated.split('\n')[0]:分割解码后的推理结果
- try_float(s) :该函数用于检测推理结果中是否存在无法转换为浮点格式的元素
- generated_prices = generated_prices.split():将预测结果用空格分隔,并去除无法转换为数字的分隔符
- generated_prices = list(map(try_float, generated_prices)):将 generated_prices 中所有元素转换为浮点格式的数字,如果有无法转换为数字的元素,使用 try_float(s) 函数将其设置为 None
- generated_prices = [f for f in generated_prices if f is not None]:遍历 generated_prices 中的所有元素,并删除 None 元素
- generated_prices = generated_prices[0:prediction_length]:仅获取前10个预测值作为参考
- if (last_price - prediction_mean) >= 0:计算客户端发送的最后一个数据与预测值的平均值之间的差异。如果大于或等于 0,则发出“卖出”信号;如果小于 0,则发出“买入”信号
我们选择使用 “transformers” 库进行推理。您还可以使用前面提到的 `torch2onnx.py` 脚本将模型转换为 ONNX 格式,并使用 `onnxruntime` 库进行推理。本文将不讨论这种方法。
4.服务器
服务器的所有功能都集成到了 `server_()` 类中,整体代码改动并不大。这里我们不做详细解读,只讨论修改的部分。
class server_: def __init__(self, host = host, port = port): self.sk = socket.socket(socket.AF_INET, socket.SOCK_STREAM,) self.host = host self.port = port self.sk.bind((self.host, self.port)) self.re = '' self.model,self.tokenizer=load_model() self.stop=None self.sk.listen(1) self.sk_, self.ad_ = self.sk.accept() self.last_action=None print('server running:',self.sk_, self.ad_) def msg(self): self.re = '' wsk=False while True: sleep(0.5) if self.is_connected(): try: data = self.sk_.recv(2500) except Exception as e: break if not data: break if (data[1] & 0x80) >> 7: fin = (data[0] & 0x80) >> 7 # FIN bit opcode = data[0] & 0x0f # opcode masked = (data[1] & 0x80) >> 7 # mask bit mask = data[4:8] # masking key payload = data[8:] # payload data # print('fin is:{},opcode is:{},mask:{}'.format(fin,opcode,masked)) message = "" for i in range(len(payload)): message += chr(payload[i] ^ mask[i % 4]) data=message wsk=True else: data=data.decode("utf-8") if '\r\n\r\n' in data: self.handshake(data) data=data.split('\r\n\r\n',1)[1] if "stop" in data: self.stop=True break if len(data)<50: break self.re+=data bt=eva(self.re, self.model,self.tokenizer) bt=bytes(bt, "utf-8") # If the signal changes,then print the information if bt != self.last_action: if bt == b'buy': print('Send buy.') elif bt == b'sell': print('Send sell.') self.last_action = bt if wsk: tk=b'\x81' lgt=len(bt) tk+=struct.pack('B',lgt) bt=tk+bt self.sk_.sendall(bt) else: print("Disconnected!Try to connect the client...") try: # reconnect self.sk_.close() self.sk.listen(1) self.sk_, self.ad_ = self.sk.accept() print('Reconnected:', self.sk_, self.ad_) # handshake while True: sleep(0.5) data = self.sk_.recv(2500) data=data.decode("utf-8") if '\r\n\r\n' in data: self.handshake(data) break print("Reconnection succeed!") # # clean the socket # while True: # if not self.sk_.recv(2500): # break except Exception as e: print(f"Reconnection failed: {e}") return self.re def __del__(self): print("server closed!") self.sk.close() if self.sk_ is not None: self.sk_.close() self.ad_.close() def handshake(self,data): try: # Handshake key = data.split("\r\n")[4].split(": ")[1] GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" ac = base64.b64encode(hashlib.sha1((key+GUID).encode('utf-8')).digest()) response_tpl="HTTP/1.1 101 Switching Protocols\r\n" \ "Upgrade:websocket\r\n" \ "Connection: Upgrade\r\n" \ "Sec-WebSocket-Accept: %s\r\n" \ "WebSocket-Location: ws://%s/\r\n\r\n" response_str = response_tpl % (ac.decode('utf-8'), "127.0.0.1:10055") self.sk_.send(bytes(response_str, encoding='utf-8')) print('Handshake succeed!') except Exception as e: print(f"Connection failed: {e}") return None def is_connected(self): try: # Check remote # remote_addr = self.sk_.getpeername() data = self.sk_.recv(1, socket.MSG_PEEK) return True except socket.error: self.last_action=None return False
- 添加类函数 `is_connected(self)` 来检测客户端是否在线。
- 添加类函数 `handshake(self, data)` 来集成握手逻辑,避免扰乱主解析逻辑。
- 添加类成员 “self.last_action” 来检测交易信号是否发生变化。仅在交易信号发生变化时打印结果,以避免频繁打印。当客户端断开连接时,重置为 None,以避免客户端重新连接时发送错误的信号。
请注意: 我们的主机地址设置为 “0.0.0.0”,因为如果将其设置为 “127.0.0.1”,则在不同主机上运行的远程客户端将无法连接。这意味着通过将其设置为 “0.0.0.0”,即使服务器和客户端不在同一台主机上,它们仍然可以连接(EA客户端需要设置正确的主机 IP 地址)。
整体代码在附加的 “server.py” 文件中。 服务器运行时,终端会给出相应的运行信息。

EA 客户端
客户端主要沿用了上一篇文章的逻辑(具体在上一篇文章中提到过),并在逻辑上做了相应的修改。依然保留了两种套接字兼容方式(一种使用 Winapi 实现 WebSocket,一种使用 MQL5 内置 Socket 模块),避免特殊情况下 MQL5 内置 Socket 无法连接导致的信号中断。主要运行逻辑是在 `OnInit()` 函数中初始化 Socket,在 `OnTick()` 函数中处理交易逻辑,在 `OnTimer()` 函数中处理每隔固定时间向服务器发送数据并接收推理结果。
1.定义常量
#include <WinAPI\winhttp.mqh> int sk=-1; string host="127.0.0.1"; int port= 10055; int data_len=100; string pre=NULL; HINTERNET ses_h,cnt_h,re_h,ws_h;
- `sk`:套接字句柄。
- `host` 和 `port`:要连接的服务器地址和端口。
- `data_len`:要发送的价格数据点的数量。
- `pre`:用于存储预测结果的字符串。
- `ses_h`、`cnt_h`、`re_h`、`ws_h`:分别为 WinHttp 的会话句柄、连接句柄、请求句柄和 WebSocket 句柄。
2.初始化套接字
int OnInit() { //--- create timer EventSetTimer(60); ses_h=cnt_h=re_h=ws_h=NULL; //handshake ses_h=WinHttpOpen("MT5", WINHTTP_ACCESS_TYPE_DEFAULT_PROXY, NULL, NULL, 0); //Print(ses_h); if (ses_h==NULL){ Print("Http open failed!",string(kernel32::GetLastError())); return INIT_FAILED; } cnt_h=WinHttpConnect(ses_h, host, port, 0); //Print(cnt_h); if (cnt_h==NULL){ Print("Http connect failed!",string(kernel32::GetLastError())); return INIT_FAILED; } re_h=WinHttpOpenRequest(cnt_h, "GET", NULL, NULL, NULL, NULL, 0); if(re_h==NULL){ Print("Request open failed!",string(kernel32::GetLastError())); return INIT_FAILED; } uchar nullpointer[]= {}; if(!WinHttpSetOption(re_h,WINHTTP_OPTION_UPGRADE_TO_WEB_SOCKET,nullpointer,0)) { Print("Set web socket failed!",string(kernel32::GetLastError())); return INIT_FAILED; } bool br; br = WinHttpSendRequest( re_h, NULL, 0, nullpointer, 0, 0, 0); if (!br) { Print("send request failed!",string(kernel32::GetLastError())); return INIT_FAILED; } br=WinHttpReceiveResponse(re_h,nullpointer); if (!br) { Print("receive response failed!",string(kernel32::GetLastError())); return INIT_FAILED; } ulong nv=0; ws_h=WinHttpWebSocketCompleteUpgrade(re_h,nv); if (!ws_h) { Print("Web socket upgrade failed!",string(kernel32::GetLastError())); return INIT_FAILED; } else{ Print("Web socket connected!"); } WinHttpCloseHandle(re_h); re_h=NULL; sk=SocketCreate(); Print(sk); Print(GetLastError()); if (sk==INVALID_HANDLE) { Print("Failed to create socket"); //return INIT_FAILED; } if (!SocketConnect(sk,host, port,1000)) { Print("Failed to connect to built-in socket"); //return INIT_FAILED; } //--- return(INIT_SUCCEEDED); }
在初始化部分我们主要实现 Winapi WebSocket 的初始化以及 MQL5 内置 Socket 的初始化。与前一篇文章的内容相比,这部分没有太大变化,因此本文将不再讨论。
3.交易策略
void OnTick() { //--- MqlTradeRequest request; MqlTradeResult result; //int x=SymbolInfoInteger(_Symbol,SYMBOL_FILLING_MODE); if (pre!=NULL) { //Print("The predicted value is:",pre); ulong numt=0; ulong tik=0; bool sod=false; ulong tpt=-1; ZeroMemory(request); numt=PositionsTotal(); //Print("All tickets: ",numt); if (numt>0) { tik=PositionGetTicket(numt-1); sod=PositionSelectByTicket(tik); tpt=PositionGetInteger(POSITION_TYPE);//ORDER_TYPE_BUY or ORDER_TYPE_SELL if (tik==0 || sod==false || tpt==0) return; } if (pre=="buy") { if (tpt==POSITION_TYPE_BUY) return; request.action=TRADE_ACTION_DEAL; request.symbol=Symbol(); request.volume=0.1; request.deviation=5; request.type_filling=ORDER_FILLING_IOC; request.type = ORDER_TYPE_BUY; request.price = SymbolInfoDouble(Symbol(), SYMBOL_ASK); if(tpt==POSITION_TYPE_SELL) { request.position=tik; Print("Close sell order."); } else{ Print("Open buy order."); } OrderSend(request, result); } else{ if (tpt==POSITION_TYPE_SELL) return; request.action = TRADE_ACTION_DEAL; request.symbol = Symbol(); request.volume = 0.1; request.type = ORDER_TYPE_SELL; request.price = SymbolInfoDouble(Symbol(), SYMBOL_BID); request.deviation = 5; //request.type_filling=SymbolInfoInteger(_Symbol,SYMBOL_FILLING_MODE); request.type_filling=ORDER_FILLING_IOC; if(tpt==POSITION_TYPE_BUY) { request.position=tik; Print("Close buy order."); } else{ Print("OPen sell order."); } OrderSend(request, result); } //is_pre=false; } pre=NULL; }
我们将整个交易策略集成到 `OnTick()` 函数中,使逻辑更加清晰。当执行 OnTick() 函数时,检查全局变量 `pre` 是否为空。如果不为空,则表示有客户端发来的预测结果。
然后,根据预测结果发送交易请求(“buy” 或 “sell”):
- 如果是“buy”(买入),则在没有现有仓位的情况下开仓,或者关闭现有的卖单。
- 如果是“sell”(卖出),则在没有现有仓位的情况下开仓,或关闭现有的买单。
全程只维持一个订单,不设置止盈或止损,仅通过交易信号控制头寸。
4.与服务器交互
void OnTimer() { //--- MqlTradeRequest request; MqlTradeResult result; char recv_data[5]; double priceData[100]; string dataToSend; char ds[]; int nc=CopyClose(Symbol(),0,0,data_len,priceData); for(int i=0;i<ArraySize(priceData);i++) dataToSend+=(string)priceData[i]+","; int dsl=StringToCharArray(dataToSend,ds); if (sk!=-1) { if (SocketIsWritable(sk)) { Print("Send data:",dsl); int ssl=SocketSend(sk,ds,dsl); } uint len=SocketIsReadable(sk); if (len) { int rsp_len=SocketRead(sk,recv_data,len,500); if(rsp_len>0) { string result=NULL; result+=CharArrayToString(recv_data,0,rsp_len); Print("The predicted value is:",result); if (StringFind(result,"buy")) { pre="buy"; } if (StringFind(result,"sell")){ pre="sell"; } } } } else { ulong send=0; if (ws_h) { send=WinHttpWebSocketSend(ws_h, WINHTTP_WEB_SOCKET_BINARY_MESSAGE_BUFFER_TYPE, ds, dsl); //Print("Send data failed!",string(kernel32::GetLastError())); if(!send) { ZeroMemory(recv_data); ulong rb=0; WINHTTP_WEB_SOCKET_BUFFER_TYPE st=-1; ulong get=WinHttpWebSocketReceive(ws_h,recv_data,ArraySize(recv_data),rb,st); if (!get) { pre=NULL; pre+=CharArrayToString(recv_data,0); Print("The predicted value is:",pre); } } } } }
服务器的主要功能是从当前图表中采集 100 个数据点,发送到服务器,并接收服务器的推理结果,根据结果修改全局变量,确保交易策略按照服务器给出的结果执行。这里我们采用两种套接字连接方式来实现数据交互逻辑,并根据当前连接的套接字类型自动选择合适的方法。
5.资源释放
void OnDeinit(const int reason) { //--- destroy timer EventKillTimer(); uchar stop[]; int ls=StringToCharArray("stop",stop); SocketSend(sk,stop,ls); SocketClose(sk); // close the websocket WinHttpSendRequest(re_h,NULL,0,stop,0,0,0); BYTE closearray[]= {}; ulong close=WinHttpWebSocketClose(ws_h, WINHTTP_WEB_SOCKET_SUCCESS_CLOSE_STATUS, closearray, 0); if(close) { Print("websocket close error "+string(kernel32::GetLastError())); if(re_h!=NULL) WinHttpCloseHandle(re_h); if(ws_h!=NULL) WinHttpCloseHandle(ws_h); if(cnt_h!=NULL) WinHttpCloseHandle(cnt_h); if(ses_h!=NULL) WinHttpCloseHandle(ses_h); } }
在 `OnDeinit()` 函数中,释放相关系统资源,并进行资源回收。
由于 GPT-2 模型推理过程未在 EA 中实现,这使得我们的 EA 逻辑更加简单和简洁。注意,我们没有在 EA 中添加风险控制逻辑,单纯依靠 GPT-2 推理结果来决定是否持有或开仓是一种非常冒险的做法。请再次注意,此 EA 示例不应用于真实交易!
完整的代码在文章的附件中提供,名为“gpt2_EA.mql5”。
回溯测试
为了评估 EA 的性能,我们可以在 MetaTrader 5 客户端的策略测试器中进行回溯测试。我们选择合适范围的历史数据,设置回测参数,然后运行回测(由于我们的 gpt2 模型是在 NZDUSD 货币对上训练的,所以回测中只能选择 NZDUSD 货币对进行测试)

回测正在进行:

回测完成后结果如下:

您可以通过查看回测报告来分析 EA 的盈利能力、最大回撤、胜率和其他指标。请记住,我们的交易策略很简单,所以回溯测试的结果并不理想。这主要是因为我们的策略没有经过任何参数优化或风险控制,模型的训练过程和数据准备具有巨大的优化潜力。总的来说,这样做需要极大的耐心。值得注意的是,由于市场条件的变化和模型的局限性,回测结果无法保证 EA 在未来实时交易中的表现,模型的局限也可能导致预测结果不稳定。
结论
在本文中,我们演示了如何将使用特定金融数据(来自 NZDUSD 货币对)微调的 GPT-2 模型集成到 EA 程序中,系统地讲解了从模型微调和推理逻辑实现到服务器和客户端设置,最后到交易策略集成的整个过程。
重要的是要强调,我们的交易策略设计相对简单,仅用于演示目的。在实际应用中,需要制定更完整、更稳健的策略,如结合多个技术指标、考虑市场情绪、设置止损和止盈等。
此外,模型的训练过程和数据准备具有巨大的优化潜力,市场条件和模型限制的变化可能会导致预测结果不稳定。尽管如此,这项工作的意义在于展示了大型语言模型在量化交易中的潜力。GPT-2 等模型可以分析传统市场数据并处理新闻数据、社交媒体数据和其他富含文本的数据,提供更全面的市场情绪分析,帮助交易者做出更明智的决策。这种跨模式能力是传统金融模型所不具备的,也是我们需要进一步探索的领域。
在下一篇文章中,我们将使用一个示例来演示如何优化大型语言模型在量化交易中的应用。
附录:
| 文件 | 描述 |
|---|---|
| torch2onnx.py | 将 GPT-2 模型转换为 ONNX 格式的 Python 脚本 |
| server.py | 提供 GPT2 模型推理服务和结果的 Python 脚本 |
| gpt2_EA.mq5 | 用于测试 GPT2 模型推理结果的 EA 程序 |
本文由MetaQuotes Ltd译自英文
原文地址: https://www.mql5.com/en/articles/13506
注意: MetaQuotes Ltd.将保留所有关于这些材料的权利。全部或部分复制或者转载这些材料将被禁止。
本文由网站的一位用户撰写,反映了他们的个人观点。MetaQuotes Ltd 不对所提供信息的准确性负责,也不对因使用所述解决方案、策略或建议而产生的任何后果负责。
JSON 从入门到精通: 创建自己的 MQL5 版本 JSON 解读器
MQL5自动化交易策略(第十四部分):基于MACD-RSI统计方法的交易分层策略
在MQL5中创建交易管理面板(第九部分):代码组织(5):分析面板(AnalyticsPanel)类
在 MQL5 中创建交易管理面板(第九部分):代码组织(三):通信模块
查看新文章:将自己的 LLM 整合到 EA 中(第 5 部分):使用 LLMs 开发和测试交易策略(四) - 测试交易策略。
作者:潘玉强潘宇强