English Русский 中文 Español Deutsch 日本語
preview
Anotação de dados na análise de série temporal (Parte 3): Exemplo de uso de anotação de dados

Anotação de dados na análise de série temporal (Parte 3): Exemplo de uso de anotação de dados

MetaTrader 5Experts | 12 março 2024, 12:53
138 0
Yuqiang Pan
Yuqiang Pan

Introdução

O artigo descreve como usar PyTorch Lightning e o framework PyTorch Forecasting através da plataforma de negociação MetaTrader 5 para implementar a previsão de séries temporais financeiras baseadas em redes neurais.

Neste artigo, também explicamos os motivos pelos quais escolhemos essas duas plataformas e o formato de dados que usamos.

Quanto aos dados, você pode usar os dados obtidos por meio da anotação de dados de meus dois artigos anteriores. Uma vez que eles têm o mesmo formato, você pode facilmente expandi-los, seguindo a metodologia descrita neste artigo.

Links para os dois artigos anteriores: 

  1. Anotação de dados na análise de série temporal (Parte 1): Criação de um conjunto de dados com rótulos de tendência usando um gráfico EA
  2. Anotação de dados na análise de série temporal (Parte 2): Criação de conjuntos de dados com rótulos de tendência usando Python

Conteúdo:


Algumas bibliotecas importantes do Python

Primeiramente, vamos apresentar as principais bibliotecas Python que usaremos.

1. PyTorch Lightning

PyTorch Lightning é um ambiente de aprendizado profundo, especialmente projetado para pesquisadores e engenheiros de inteligência artificial e aprendizado de máquina que precisam de máxima flexibilidade sem comprometer a escalabilidade.

A ideia principal é separar o código acadêmico (por exemplo, definições de modelos, propagação/retropropagação, otimizadores, validação etc.) do código de engenharia (por exemplo, laços for, mecanismos de salvamento, logs do TensorBoard, estratégias de treinamento etc.), resultando em um código mais organizado e compreensível.

Entre as principais vantagens estão:

  • Alta reusabilidade. A estrutura permite reutilizar o código em vários projetos.
  • Facilidade de manutenção. Graças ao design estruturado, manter o código se torna mais simples.
  • Lógica clara. Com a abstração do código de engenharia padrão, o código de aprendizado de máquina se torna mais fácil de identificar e compreender.

No geral, o PyTorch Lightning é uma biblioteca extremamente poderosa que oferece um método eficiente para elaborar e gerenciar seu código PyTorch. Além disso, ele proporciona uma abordagem estruturada para resolver tarefas comuns, mas complexas, como treinamento, validação e teste do modelo.

O uso detalhado dessa biblioteca pode ser encontrado na documentação oficial: https://lightning.ai/docs.

2. PyTorch Forecasting

Trata-se de uma biblioteca Python especificamente desenvolvida para a previsão de séries temporais. Como é baseada no PyTorch, você pode aproveitar as poderosas bibliotecas de diferenciação automática e otimização do PyTorch, bem como desfrutar da usabilidade que o PyTorch Forecasting oferece para a previsão de séries temporais.

No PyTorch Forecasting, você pode encontrar implementações de vários modelos de previsão, incluindo, entre outros, modelos de autorregressão (AR, ARIMA), modelos de espaço de estados (SARIMAX), redes neurais (LSTM, GRU) e métodos de ensemble (Prophet, N-Beats). Isso significa que você pode experimentar e comparar diferentes abordagens de previsão no mesmo ambiente sem a necessidade de escrever extenso código padrão para cada abordagem.

A biblioteca também oferece uma série de ferramentas de pré-processamento de dados, que podem ajudar a resolver tarefas típicas em séries temporais. Essas ferramentas incluem, entre outras, a substituição de valores ausentes, escalonamento, extração de características e transformações de janela deslizante. Isso significa que você pode se concentrar mais no desenvolvimento e otimização do seu modelo, sem gastar muito tempo no processamento de dados.

A biblioteca também possui uma interface unificada para avaliação de desempenho do modelo. Ela implementa funções de perda e métricas de validação para séries temporais, como QuantileLoss e SMAPE, e também suporta metodologias de treinamento como parada precoce e validação cruzada. Isso permite que você monitore e melhore a performance do seu modelo de maneira mais simples.

Se você está procurando uma maneira de aumentar a eficiência e a facilidade de manutenção do seu projeto de previsão de séries temporais, então o PyTorch Forecasting pode ser uma excelente escolha. A biblioteca oferece meios eficazes e flexíveis para organizar e gerenciar seu código PyTorch, permitindo que você se concentre no aspecto mais importante: o próprio modelo de aprendizado de máquina.

Uma descrição detalhada do uso da biblioteca pode ser encontrada na documentação oficial: https://pytorch-forecasting.readthedocs.io/en/stable.

3. Modelo N-HiTS

O modelo N-HiTS resolve os problemas de volatilidade das previsões e complexidade computacional em previsões de longo prazo através da implementação de métodos inovadores de interpolação hierárquica e amostragem de dados com várias taxas. Isso permite que o modelo N-HiTS aproxime eficientemente um intervalo de previsão de qualquer comprimento.

Além disso, experimentos abrangentes realizados em conjuntos de dados de grande escala mostraram que o modelo N-HiTS aumenta a precisão em média quase 20% em comparação com a última arquitetura Transformer, além de reduzir em 50 vezes o tempo de cálculo.

Link para o artigo: https://doi.org/10.48550/arXiv.2201.12886.


Inicialização

Primeiramente, precisamos importar as bibliotecas necessárias. Entre essas bibliotecas estão o MetaTrader 5 (para interagir com o terminal de mesmo nome), PyTorch Lightning (para treinar o modelo) e algumas outras bibliotecas para processamento e visualização de dados.

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

Em seguida, precisamos inicializar o MetaTrader 5. Isso é feito chamando a função mt.initialize(). Se você não conseguir inicializá-lo apenas usando isso, você precisará passar o caminho para o terminal MetaTrader 5 como um parâmetro desta função (no exemplo "D:\Project\mt\MT5\terminal64.exe" — esse é o meu caminho, no aplicativo real você precisará definir seu próprio caminho). Se a inicialização for bem-sucedida, a função retornará True, caso contrário, retornará False.

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

A função mt.symbols_total() é usada para obter o número total de símbolos negociáveis disponíveis no terminal MetaTrader 5. Podemos usá-la para determinar se podemos obter os dados corretamente. Se o número total for maior que 0, podemos usar a função mt.copy_rates_from_pos() para obter os dados históricos do símbolo especificado. Neste exemplo, obtivemos os dados mais recentes sobre o comprimento do período "mt_data_len" M15 (15 minutos) do símbolo GOLD_micro.

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

Por fim, usamos a função mt.shutdown() para fechar a conexão com o terminal MetaTrader 5 e converter os dados recebidos para o formato Pandas DataFrame.

mt.shutdown()
rts_fm=pd.DataFrame(rts)

Agora, vamos considerar como pré-processar os dados obtidos do terminal.

Primeiro, precisamos converter os rótulos temporais em datas:

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

Aqui, não descrevemos mais como anotar os dados. Você pode encontrar métodos nos meus dois artigos anteriores (os links estão na introdução deste artigo). Para uma breve demonstração de como usar modelos de previsão, simplesmente dividimos todos os fragmentos de dados por max_encoder_length+2max_prediction_length em grupos. Em cada grupo, há uma sequência de 0 a "max_encoder_length+2max_prediction_length-1". Vamos preenchê-la. Dessa forma, adicionamos os rótulos necessários aos dados de entrada. Primeiro, precisamos converter o índice de tempo inicial (ou seja, o índice do DataFrame). Vamos calcular o resto do índice de tempo inicial, dividido por (max_encoder_length+2max_prediction_length), e usar o resultado como o novo índice de tempo. Associaremos o índice de tempo ao intervalo de 0 a "max_encoder_length+2max_prediction_length-1":

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

Também precisamos converter o índice de tempo inicial em um grupo. Calcularemos o índice de tempo inicial, dividido por "max_encoder_length+2*max_prediction_length", e usaremos o resultado como o novo grupo:

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

Encapsularemos a parte de pré-processamento de dados em uma função. Precisamos passar para ela apenas o comprimento dos dados que precisamos recuperar, e ela poderá completar o trabalho de pré-processamento de dados:

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


Reescrevendo a classe pytorch_forecasting.TimeSeriesDataSet

Reescrevendo a função to_dataloader() em pytorch_forecasting. Isso permite controlar se os dados serão misturados e se o último grupo do pacote será descartado (principalmente para evitar erros imprevisíveis causados ​​por um comprimento insuficiente do último grupo de dados). Veja como você pode fazer isso:

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
Este código cria uma nova classe New_TmSrDt, que herda de TimeSeriesDataSet. Em seguida, nesta nova classe, a função to_dataloader() é sobrescrita para incluir os parâmetros shuffle e drop_last. Dessa forma, você pode controlar melhor o processo de carregamento dos dados. Não esqueça de substituir as instâncias de TimeSeriesDataSet por New_TmSrDt no código.


Criando conjuntos de dados para treinamento e validação

Primeiramente, precisamos definir um ponto de corte para os dados de treinamento. Isso é feito subtraindo o comprimento máximo de previsão do valor máximo de time_idx.

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

Então usamos a classe New_TmSrDt (que é a classe TimeSeriesDataSet reescrita por nós) para criar o conjunto de dados de treinamento. Essa classe requer os seguintes parâmetros:

  • DataFrame (neste caso, rts_fm)
  • A coluna time_idx, que é uma sequência contínua de números inteiros.
  • A coluna alvo (neste caso, close), que é o valor que queremos prever.
  • A coluna do grupo (neste caso, series), que representa diferentes séries temporais.
  • Os comprimentos máximos do codificador e do preditor
context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

Em seguida, usamos a função New_TmSrDt.from_dataset() para criar um conjunto de dados de validação. Esta função requer os seguintes parâmetros:

  • Conjunto de dados de treinamento
  • DataFrame
  • O índice mínimo de previsão, que deve ser maior em um do que o valor máximo de time_idx dos dados de treinamento.

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

Finalmente, usamos a função to_dataloader() para converter os conjuntos de dados de treinamento e validação em objetos PyTorch DataLoader. Esta função requer os seguintes parâmetros:

  • O parâmetro train, que indica se os dados devem ser embaralhados.
  • O parâmetro batch_size, que especifica o número de amostras em um lote.
  • O parâmetro num_workers, que indica o número de processos de trabalho para carregar os dados.

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

Finalmente, encapsulamos esta parte do código em uma função spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) e especificamos os seguintes parâmetros:

  • O parâmetro data é usado para obter o conjunto de dados que precisa ser processado.
  • O parâmetro t_drop_last indica se o último grupo do conjunto de dados de treinamento deve ser removido.
  • O parâmetro t_shuffle indica se os dados de treinamento devem ser misturados.
  • O parâmetro v_drop_last indica se o último grupo do conjunto de dados de validação deve ser removido.
  • O parâmetro v_shuffle indica se os dados de validação devem ser embaralhados.

train_dataloader (instância do dataloader para o conjunto de dados de treinamento), val_dataloader (instância do dataloader para o conjunto de dados de validação) e training (instância do TimeSeriesDataSet para o conjunto de dados) são usados como valores de retorno desta função, pois serão utilizados posteriormente.

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


Criando e treinando o modelo

Vamos iniciar a criação do modelo NHiTS. Esta parte mostrará como configurar seus parâmetros e como treiná-lo.

1. Encontramos a melhor taxa de aprendizado

Antes de começarmos a criar o modelo, usamos o objeto Tuner do PyTorch Lightning para encontrar a melhor taxa de aprendizado.

Primeiramente, precisamos criar um objeto Trainer, onde o parâmetro accelerator é usado para indicar o tipo de dispositivo, e gradient_clip_val é usado para prevenir a explosão do gradiente.

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

Em seguida, usamos a função NHiTS.from_dataset() para criar a rede do modelo NHiTS. Esta função requer os seguintes parâmetros:

  • Conjunto de dados de treinamento
  • Taxa de aprendizado
  • Decaimento de peso
  • Função de perda
  • Tamanho da camada oculta
  • Otimizador
net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

Depois, criamos uma instância da classe Tuner e chamamos a função lr_find(). Ela treinará o modelo com várias taxas de aprendizado baseadas em nosso conjunto de dados e comparará as perdas de cada taxa de aprendizado para encontrar a melhor delas.

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

De forma similar, encapsulamos essa parte do código que obtém a melhor taxa de aprendizado em uma função get_learning_rate() e fazemos da taxa de aprendizado obtida seu valor de retorno:

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

Se quiser visualizar a taxa de aprendizado, você pode adicionar o seguinte código:

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

O resultado neste exemplo é o seguinte:

lr

Taxa de aprendizado recomendada: 0.003981071705534973.

2. Definição do EarlyStopping Callback

Esse callback é principalmente usado para monitorar as perdas de validação e parar o treinamento quando as perdas não melhoram ao longo de várias épocas consecutivas. Isso pode prevenir o sobreajuste do modelo.

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

Aqui, é importante notar o parâmetro patience, que essencialmente define quando parar durante o treinamento, se as perdas não diminuírem ao longo de várias épocas consecutivas. Nós o definimos como 10.

3. Definição do Callback ModelCheckpoint

Esse callback é principalmente usado para gerenciar o salvamento do modelo e o nome do arquivo salvo. Principalmente, definimos as seguintes duas variáveis.

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

save_top_k é usado para controlar o salvamento de múltiplos melhores modelos. Definimos o valor como 1 e mantemos apenas o melhor modelo.

4. Definição do Training Model

Primeiramente, precisamos criar uma instância da classe Trainer em Lightning.pytorch e adicionar os dois callbacks que definimos anteriormente.

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

Aqui, precisamos prestar atenção aos parâmetros max_epochs (número máximo de épocas de treinamento), gradient_clip_val (usado para prevenir a explosão do gradiente) e callbacks. Aqui, max_epochs usa a variável global ep, que definiremos mais tarde, e callbacks é nossa coleção de callbacks.

Em seguida, também precisamos definir o modelo NHiTS e criar sua instância:

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

Aqui, não é necessário alterar os parâmetros, basta usar os valores padrão. Aqui, apenas definimos loss para a função de perda MQF2DistributionLoss.

5. Módulo de treinamento 

Usamos a função fit() do objeto Trainer para treinar o modelo:

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

Da mesma forma, encapsulamos essa parte do código em uma função train():

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

Essa função retornará o modelo treinado, que você pode usar para tarefas de previsão.


Definindo a lógica de execução

1. Definição de variáveis globais:

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__train é usado para controlar se estamos treinando ou testando o modelo no momento.

Vale destacar que ep é usado para gerenciar a época máxima de treinamento. Como definimos EarlyStopping, esse valor pode ser aumentado, pois o modelo automaticamente parará quando a convergência cessar.

mt_data_len é o número dos últimos dados de séries temporais recebidos do cliente.

max_encoder_length e max_prediction_length são, respectivamente, o comprimento máximo de codificação e o comprimento máximo de previsão.

2. Treinamento

Também precisamos salvar os resultados ótimos atuais do treinamento em um arquivo local após a conclusão do treinamento, por isso definimos um arquivo json para salvar essas informações:

info_file='results.json'

Para tornar nosso processo de treinamento mais claro, precisamos evitar a exibição de informações de aviso desnecessárias durante o treinamento, então adicionaremos o seguinte código:

warnings.filterwarnings("ignore")

Segue nossa lógica de treinamento:

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

Quando o treinamento estiver concluído, você poderá encontrar a localização da nossa melhor modelo e melhor resultado no arquivo results.json no diretório raiz.

Durante o processo de treinamento, você verá um indicador de progresso que mostra o progresso de cada época. 

Treinamento:

treinamento

Treinamento concluído:

ts

3. Verificação do modelo

Após o treinamento, queremos validar o modelo e visualizá-lo. Podemos adicionar o seguinte código:

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

Você também pode usar o TensorBoard para visualizar o processo em tempo real durante o treinamento.

Resultado:

ref

4. Teste do modelo

Primeiro, abrimos o arquivo json para encontrar o local ótimo de armazenamento do modelo:

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

Então, carregamos o modelo:

best_model = NHiTS.load_from_checkpoint(best_m_p)

Em seguida, obtemos dados em tempo real do cliente para testar o modelo:

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

E aqui está o resultado:

pref

5.  Avaliação do modelo

Claro, podemos usar algumas métricas da biblioteca PyTorch Forecasting para avaliar o desempenho do modelo. Aqui está como avaliar usando o erro absoluto médio (MAE) e o Erro Absoluto Percentual Médio Simétrico (SMAPE), e exibir os resultados da avaliação:

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

Neste trecho de código, primeiro importamos as métricas MAE e SMAPE. Então, usamos essas métricas para calcular o erro entre os valores previstos (raw_predictions["prediction"]) e os valores reais (raw_predictions["target"]). Estas métricas podem nos ajudar a ver o desempenho do nosso modelo e indicar direções para sua melhoria contínua.



Considerações finais

Neste artigo, exploramos como usar os dados de rótulos mencionados nos dois artigos anteriores e demonstramos como criar um modelo N-HiTS usando nossos dados. Depois, treinamos e verificamos o modelo. Como vimos, alcançamos bons resultados. Também demonstramos como usar este modelo no MetaTrader 5 para previsão de 30 velas. Claro, não mencionamos como colocar ordens com base nos resultados de previsão, pois a negociação real requer que os leitores realizem uma grande quantidade de testes de acordo com sua situação real e estabeleçam regras de negociação apropriadas.

Obrigado pela atenção!


Anexo:

Código completo:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()


Traduzido do Inglês pela MetaQuotes Ltd.
Artigo original: https://www.mql5.com/en/articles/13255

Arquivos anexados |
n_hits.py (9.7 KB)
Algoritmos de otimização populacional: busca por difusão estocástica (Stochastic Diffusion Search, SDS) Algoritmos de otimização populacional: busca por difusão estocástica (Stochastic Diffusion Search, SDS)
O artigo aborda a busca por difusão estocástica, SDS, um algoritmo de otimização muito poderoso e prático, baseado nos princípios de passeio aleatório. O algoritmo permite encontrar soluções ótimas em espaços multidimensionais complexos, possuindo uma alta velocidade de convergência e a capacidade de evitar extremos locais.
Teoria das Categorias em MQL5 (Parte 23): uma nova perspectiva sobre a média móvel exponencial dupla Teoria das Categorias em MQL5 (Parte 23): uma nova perspectiva sobre a média móvel exponencial dupla
Neste artigo, continuamos a explorar indicadores de negociação populares sob uma nova ótica. Vamos processar a composição horizontal de transformações naturais. O melhor indicador para isso é a média móvel exponencial dupla (Double Exponential Moving Average, DEMA).
Fatorando Matrizes — O Básico Fatorando Matrizes — O Básico
Como o intuito aqui é ser didático. Vou manter a coisa no seu padrão mais simples. Ou seja, iremos implementar apenas e somente o que será preciso. A multiplicação de matrizes. E você verá que isto será o suficiente para simular a multiplicação de uma matriz por um escalar. A grande dificuldade que muita gente tem em implementar um código usando fatoração de matrizes, é que diferente de uma fatoração escalar, onde em quase todos os casos a ordem dos fatores não altera o resultado. Quando se usa matrizes, a coisa não é bem assim.
Indicadores alternativos de risco e rentabilidade em MQL5 Indicadores alternativos de risco e rentabilidade em MQL5
Neste artigo, apresentaremos a implementação de vários indicadores de rentabilidade e risco, considerados alternativas ao índice de Sharpe, e exploraremos curvas de patrimônio líquido hipotéticas para analisar suas características.