Anotação de dados na análise de série temporal (Parte 3): Exemplo de uso de anotação de dados

Yuqiang Pan | 12 março, 2024

Introdução

O artigo descreve como usar PyTorch Lightning e o framework PyTorch Forecasting através da plataforma de negociação MetaTrader 5 para implementar a previsão de séries temporais financeiras baseadas em redes neurais.

Neste artigo, também explicamos os motivos pelos quais escolhemos essas duas plataformas e o formato de dados que usamos.

Quanto aos dados, você pode usar os dados obtidos por meio da anotação de dados de meus dois artigos anteriores. Uma vez que eles têm o mesmo formato, você pode facilmente expandi-los, seguindo a metodologia descrita neste artigo.

Links para os dois artigos anteriores: 

  1. Anotação de dados na análise de série temporal (Parte 1): Criação de um conjunto de dados com rótulos de tendência usando um gráfico EA
  2. Anotação de dados na análise de série temporal (Parte 2): Criação de conjuntos de dados com rótulos de tendência usando Python

Conteúdo:


Algumas bibliotecas importantes do Python

Primeiramente, vamos apresentar as principais bibliotecas Python que usaremos.

1. PyTorch Lightning

PyTorch Lightning é um ambiente de aprendizado profundo, especialmente projetado para pesquisadores e engenheiros de inteligência artificial e aprendizado de máquina que precisam de máxima flexibilidade sem comprometer a escalabilidade.

A ideia principal é separar o código acadêmico (por exemplo, definições de modelos, propagação/retropropagação, otimizadores, validação etc.) do código de engenharia (por exemplo, laços for, mecanismos de salvamento, logs do TensorBoard, estratégias de treinamento etc.), resultando em um código mais organizado e compreensível.

Entre as principais vantagens estão:

No geral, o PyTorch Lightning é uma biblioteca extremamente poderosa que oferece um método eficiente para elaborar e gerenciar seu código PyTorch. Além disso, ele proporciona uma abordagem estruturada para resolver tarefas comuns, mas complexas, como treinamento, validação e teste do modelo.

O uso detalhado dessa biblioteca pode ser encontrado na documentação oficial: https://lightning.ai/docs.

2. PyTorch Forecasting

Trata-se de uma biblioteca Python especificamente desenvolvida para a previsão de séries temporais. Como é baseada no PyTorch, você pode aproveitar as poderosas bibliotecas de diferenciação automática e otimização do PyTorch, bem como desfrutar da usabilidade que o PyTorch Forecasting oferece para a previsão de séries temporais.

No PyTorch Forecasting, você pode encontrar implementações de vários modelos de previsão, incluindo, entre outros, modelos de autorregressão (AR, ARIMA), modelos de espaço de estados (SARIMAX), redes neurais (LSTM, GRU) e métodos de ensemble (Prophet, N-Beats). Isso significa que você pode experimentar e comparar diferentes abordagens de previsão no mesmo ambiente sem a necessidade de escrever extenso código padrão para cada abordagem.

A biblioteca também oferece uma série de ferramentas de pré-processamento de dados, que podem ajudar a resolver tarefas típicas em séries temporais. Essas ferramentas incluem, entre outras, a substituição de valores ausentes, escalonamento, extração de características e transformações de janela deslizante. Isso significa que você pode se concentrar mais no desenvolvimento e otimização do seu modelo, sem gastar muito tempo no processamento de dados.

A biblioteca também possui uma interface unificada para avaliação de desempenho do modelo. Ela implementa funções de perda e métricas de validação para séries temporais, como QuantileLoss e SMAPE, e também suporta metodologias de treinamento como parada precoce e validação cruzada. Isso permite que você monitore e melhore a performance do seu modelo de maneira mais simples.

Se você está procurando uma maneira de aumentar a eficiência e a facilidade de manutenção do seu projeto de previsão de séries temporais, então o PyTorch Forecasting pode ser uma excelente escolha. A biblioteca oferece meios eficazes e flexíveis para organizar e gerenciar seu código PyTorch, permitindo que você se concentre no aspecto mais importante: o próprio modelo de aprendizado de máquina.

Uma descrição detalhada do uso da biblioteca pode ser encontrada na documentação oficial: https://pytorch-forecasting.readthedocs.io/en/stable.

3. Modelo N-HiTS

O modelo N-HiTS resolve os problemas de volatilidade das previsões e complexidade computacional em previsões de longo prazo através da implementação de métodos inovadores de interpolação hierárquica e amostragem de dados com várias taxas. Isso permite que o modelo N-HiTS aproxime eficientemente um intervalo de previsão de qualquer comprimento.

Além disso, experimentos abrangentes realizados em conjuntos de dados de grande escala mostraram que o modelo N-HiTS aumenta a precisão em média quase 20% em comparação com a última arquitetura Transformer, além de reduzir em 50 vezes o tempo de cálculo.

Link para o artigo: https://doi.org/10.48550/arXiv.2201.12886.


Inicialização

Primeiramente, precisamos importar as bibliotecas necessárias. Entre essas bibliotecas estão o MetaTrader 5 (para interagir com o terminal de mesmo nome), PyTorch Lightning (para treinar o modelo) e algumas outras bibliotecas para processamento e visualização de dados.

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

Em seguida, precisamos inicializar o MetaTrader 5. Isso é feito chamando a função mt.initialize(). Se você não conseguir inicializá-lo apenas usando isso, você precisará passar o caminho para o terminal MetaTrader 5 como um parâmetro desta função (no exemplo "D:\Project\mt\MT5\terminal64.exe" — esse é o meu caminho, no aplicativo real você precisará definir seu próprio caminho). Se a inicialização for bem-sucedida, a função retornará True, caso contrário, retornará False.

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

A função mt.symbols_total() é usada para obter o número total de símbolos negociáveis disponíveis no terminal MetaTrader 5. Podemos usá-la para determinar se podemos obter os dados corretamente. Se o número total for maior que 0, podemos usar a função mt.copy_rates_from_pos() para obter os dados históricos do símbolo especificado. Neste exemplo, obtivemos os dados mais recentes sobre o comprimento do período "mt_data_len" M15 (15 minutos) do símbolo GOLD_micro.

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

Por fim, usamos a função mt.shutdown() para fechar a conexão com o terminal MetaTrader 5 e converter os dados recebidos para o formato Pandas DataFrame.

mt.shutdown()
rts_fm=pd.DataFrame(rts)

Agora, vamos considerar como pré-processar os dados obtidos do terminal.

Primeiro, precisamos converter os rótulos temporais em datas:

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

Aqui, não descrevemos mais como anotar os dados. Você pode encontrar métodos nos meus dois artigos anteriores (os links estão na introdução deste artigo). Para uma breve demonstração de como usar modelos de previsão, simplesmente dividimos todos os fragmentos de dados por max_encoder_length+2max_prediction_length em grupos. Em cada grupo, há uma sequência de 0 a "max_encoder_length+2max_prediction_length-1". Vamos preenchê-la. Dessa forma, adicionamos os rótulos necessários aos dados de entrada. Primeiro, precisamos converter o índice de tempo inicial (ou seja, o índice do DataFrame). Vamos calcular o resto do índice de tempo inicial, dividido por (max_encoder_length+2max_prediction_length), e usar o resultado como o novo índice de tempo. Associaremos o índice de tempo ao intervalo de 0 a "max_encoder_length+2max_prediction_length-1":

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

Também precisamos converter o índice de tempo inicial em um grupo. Calcularemos o índice de tempo inicial, dividido por "max_encoder_length+2*max_prediction_length", e usaremos o resultado como o novo grupo:

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

Encapsularemos a parte de pré-processamento de dados em uma função. Precisamos passar para ela apenas o comprimento dos dados que precisamos recuperar, e ela poderá completar o trabalho de pré-processamento de dados:

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


Reescrevendo a classe pytorch_forecasting.TimeSeriesDataSet

Reescrevendo a função to_dataloader() em pytorch_forecasting. Isso permite controlar se os dados serão misturados e se o último grupo do pacote será descartado (principalmente para evitar erros imprevisíveis causados ​​por um comprimento insuficiente do último grupo de dados). Veja como você pode fazer isso:

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
Este código cria uma nova classe New_TmSrDt, que herda de TimeSeriesDataSet. Em seguida, nesta nova classe, a função to_dataloader() é sobrescrita para incluir os parâmetros shuffle e drop_last. Dessa forma, você pode controlar melhor o processo de carregamento dos dados. Não esqueça de substituir as instâncias de TimeSeriesDataSet por New_TmSrDt no código.


Criando conjuntos de dados para treinamento e validação

Primeiramente, precisamos definir um ponto de corte para os dados de treinamento. Isso é feito subtraindo o comprimento máximo de previsão do valor máximo de time_idx.

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

Então usamos a classe New_TmSrDt (que é a classe TimeSeriesDataSet reescrita por nós) para criar o conjunto de dados de treinamento. Essa classe requer os seguintes parâmetros:

context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

Em seguida, usamos a função New_TmSrDt.from_dataset() para criar um conjunto de dados de validação. Esta função requer os seguintes parâmetros:

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

Finalmente, usamos a função to_dataloader() para converter os conjuntos de dados de treinamento e validação em objetos PyTorch DataLoader. Esta função requer os seguintes parâmetros:

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

Finalmente, encapsulamos esta parte do código em uma função spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) e especificamos os seguintes parâmetros:

train_dataloader (instância do dataloader para o conjunto de dados de treinamento), val_dataloader (instância do dataloader para o conjunto de dados de validação) e training (instância do TimeSeriesDataSet para o conjunto de dados) são usados como valores de retorno desta função, pois serão utilizados posteriormente.

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


Criando e treinando o modelo

Vamos iniciar a criação do modelo NHiTS. Esta parte mostrará como configurar seus parâmetros e como treiná-lo.

1. Encontramos a melhor taxa de aprendizado

Antes de começarmos a criar o modelo, usamos o objeto Tuner do PyTorch Lightning para encontrar a melhor taxa de aprendizado.

Primeiramente, precisamos criar um objeto Trainer, onde o parâmetro accelerator é usado para indicar o tipo de dispositivo, e gradient_clip_val é usado para prevenir a explosão do gradiente.

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

Em seguida, usamos a função NHiTS.from_dataset() para criar a rede do modelo NHiTS. Esta função requer os seguintes parâmetros:

net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

Depois, criamos uma instância da classe Tuner e chamamos a função lr_find(). Ela treinará o modelo com várias taxas de aprendizado baseadas em nosso conjunto de dados e comparará as perdas de cada taxa de aprendizado para encontrar a melhor delas.

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

De forma similar, encapsulamos essa parte do código que obtém a melhor taxa de aprendizado em uma função get_learning_rate() e fazemos da taxa de aprendizado obtida seu valor de retorno:

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

Se quiser visualizar a taxa de aprendizado, você pode adicionar o seguinte código:

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

O resultado neste exemplo é o seguinte:

lr

Taxa de aprendizado recomendada: 0.003981071705534973.

2. Definição do EarlyStopping Callback

Esse callback é principalmente usado para monitorar as perdas de validação e parar o treinamento quando as perdas não melhoram ao longo de várias épocas consecutivas. Isso pode prevenir o sobreajuste do modelo.

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

Aqui, é importante notar o parâmetro patience, que essencialmente define quando parar durante o treinamento, se as perdas não diminuírem ao longo de várias épocas consecutivas. Nós o definimos como 10.

3. Definição do Callback ModelCheckpoint

Esse callback é principalmente usado para gerenciar o salvamento do modelo e o nome do arquivo salvo. Principalmente, definimos as seguintes duas variáveis.

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

save_top_k é usado para controlar o salvamento de múltiplos melhores modelos. Definimos o valor como 1 e mantemos apenas o melhor modelo.

4. Definição do Training Model

Primeiramente, precisamos criar uma instância da classe Trainer em Lightning.pytorch e adicionar os dois callbacks que definimos anteriormente.

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

Aqui, precisamos prestar atenção aos parâmetros max_epochs (número máximo de épocas de treinamento), gradient_clip_val (usado para prevenir a explosão do gradiente) e callbacks. Aqui, max_epochs usa a variável global ep, que definiremos mais tarde, e callbacks é nossa coleção de callbacks.

Em seguida, também precisamos definir o modelo NHiTS e criar sua instância:

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

Aqui, não é necessário alterar os parâmetros, basta usar os valores padrão. Aqui, apenas definimos loss para a função de perda MQF2DistributionLoss.

5. Módulo de treinamento 

Usamos a função fit() do objeto Trainer para treinar o modelo:

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

Da mesma forma, encapsulamos essa parte do código em uma função train():

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

Essa função retornará o modelo treinado, que você pode usar para tarefas de previsão.


Definindo a lógica de execução

1. Definição de variáveis globais:

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__train é usado para controlar se estamos treinando ou testando o modelo no momento.

Vale destacar que ep é usado para gerenciar a época máxima de treinamento. Como definimos EarlyStopping, esse valor pode ser aumentado, pois o modelo automaticamente parará quando a convergência cessar.

mt_data_len é o número dos últimos dados de séries temporais recebidos do cliente.

max_encoder_length e max_prediction_length são, respectivamente, o comprimento máximo de codificação e o comprimento máximo de previsão.

2. Treinamento

Também precisamos salvar os resultados ótimos atuais do treinamento em um arquivo local após a conclusão do treinamento, por isso definimos um arquivo json para salvar essas informações:

info_file='results.json'

Para tornar nosso processo de treinamento mais claro, precisamos evitar a exibição de informações de aviso desnecessárias durante o treinamento, então adicionaremos o seguinte código:

warnings.filterwarnings("ignore")

Segue nossa lógica de treinamento:

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

Quando o treinamento estiver concluído, você poderá encontrar a localização da nossa melhor modelo e melhor resultado no arquivo results.json no diretório raiz.

Durante o processo de treinamento, você verá um indicador de progresso que mostra o progresso de cada época. 

Treinamento:

treinamento

Treinamento concluído:

ts

3. Verificação do modelo

Após o treinamento, queremos validar o modelo e visualizá-lo. Podemos adicionar o seguinte código:

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

Você também pode usar o TensorBoard para visualizar o processo em tempo real durante o treinamento.

Resultado:

ref

4. Teste do modelo

Primeiro, abrimos o arquivo json para encontrar o local ótimo de armazenamento do modelo:

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

Então, carregamos o modelo:

best_model = NHiTS.load_from_checkpoint(best_m_p)

Em seguida, obtemos dados em tempo real do cliente para testar o modelo:

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

E aqui está o resultado:

pref

5.  Avaliação do modelo

Claro, podemos usar algumas métricas da biblioteca PyTorch Forecasting para avaliar o desempenho do modelo. Aqui está como avaliar usando o erro absoluto médio (MAE) e o Erro Absoluto Percentual Médio Simétrico (SMAPE), e exibir os resultados da avaliação:

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

Neste trecho de código, primeiro importamos as métricas MAE e SMAPE. Então, usamos essas métricas para calcular o erro entre os valores previstos (raw_predictions["prediction"]) e os valores reais (raw_predictions["target"]). Estas métricas podem nos ajudar a ver o desempenho do nosso modelo e indicar direções para sua melhoria contínua.



Considerações finais

Neste artigo, exploramos como usar os dados de rótulos mencionados nos dois artigos anteriores e demonstramos como criar um modelo N-HiTS usando nossos dados. Depois, treinamos e verificamos o modelo. Como vimos, alcançamos bons resultados. Também demonstramos como usar este modelo no MetaTrader 5 para previsão de 30 velas. Claro, não mencionamos como colocar ordens com base nos resultados de previsão, pois a negociação real requer que os leitores realizem uma grande quantidade de testes de acordo com sua situação real e estabeleçam regras de negociação apropriadas.

Obrigado pela atenção!


Anexo:

Código completo:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()