English Русский 中文 Deutsch 日本語 Português
preview
Marcado de datos en el análisis de series temporales (Parte 3): Ejemplo de uso del marcado de datos

Marcado de datos en el análisis de series temporales (Parte 3): Ejemplo de uso del marcado de datos

MetaTrader 5Asesores Expertos | 11 marzo 2024, 16:53
210 0
Yuqiang Pan
Yuqiang Pan

Introducción

El presente artículo explica cómo utilizar PyTorch Lightning y el marco PyTorch Forecasting a través de la plataforma comercial MetaTrader 5 para implementar el pronóstico de series temporales financieras basada en redes neuronales.

En el artículo también explicaremos los motivos por los que hemos elegido estas dos plataformas y el formato de datos utilizado.

En cuanto a los datos, podemos usar los datos obtenidos marcando los datos de los dos artículos anteriores. Como comparten el mismo formato, podemos ampliarlo fácilmente siguiendo la metodología descrita en este artículo.

Enlaces a los dos artículos anteriores: 

  1. Marcado de datos en el análisis de series temporales (Parte 1): Creamos un conjunto de datos con marcadores de tendencia utilizando el gráfico de un asesor
  2. Marcado de datos en el análisis de series temporales (Parte 2): Creando conjuntos de datos con marcadores de tendencias utilizando Python

Contenido:


Algunas bibliotecas importantes de Python

En primer lugar, vamos a introducir las bibliotecas básicas de Python que vamos a usar.

1. PyTorch Lightning

PyTorch Lightning es un entorno de aprendizaje profundo diseñado específicamente para investigadores profesionales de inteligencia artificial e ingenieros de aprendizaje automático que necesitan la máxima flexibilidad sin sacrificar la escalabilidad.

La idea básica consiste en separar el código académico (por ejemplo, las definiciones de modelos, la propagación directa/inversa, los optimizadores, la validación, etc.) del código de ingeniería (por ejemplo, los ciclos for, los mecanismos de persistencia, los diarios de registro de TensorBoard, las estrategias de aprendizaje, etc.), lo cual nos dará un código más racionalizado y comprensible.

Las principales ventajas son:

  • Alta reutilización. El diseño permite reutilizar el código en distintos proyectos.
  • Facilidad de mantenimiento. Su diseño estructurado facilita el mantenimiento del código.
  • Lógica clara. Abstrayendo el código de ingeniería de las plantillas, el código de aprendizaje automático resulta más fácil de identificar y comprender.

En general, PyTorch Lightning supone una biblioteca extremadamente potente que ofrece un método eficiente de organizar y gestionar nuestro código PyTorch. También ofrece un enfoque estructurado de tareas comunes pero complejas, tales como el entrenamiento, la validación y la prueba de modelos.

Encontrará información detallada sobre el uso de esta biblioteca en la documentación oficial: https://lightning.ai/docs.

2. PyTorch Forecasting

Es una biblioteca de Python diseñada específicamente para la previsión de series temporales. Como está construida sobre PyTorch, podemos utilizar las potentes bibliotecas de diferenciación automática y optimización de PyTorch, y aprovechar la comodidad que ofrece PyTorch Forecasting para la previsión de series temporales.

En PyTorch Forecasting podremos encontrar implementaciones de varios modelos de previsión, incluyendo (pero no solo) modelos autorregresivos (AR, ARIMA), modelos de espacio de estados (SARIMAX), redes neuronales (LSTM, GRU) y métodos de ensamblaje (Prophet, N-Beats). Esto significa que podemos experimentar y comparar diferentes enfoques predictivos en el mismo entorno sin tener que escribir un extenso código estándar para cada enfoque.

La biblioteca también ofrece una serie de herramientas de preprocesamiento de datos que pueden ayudarnos a resolver problemas típicos de series temporales. Estas herramientas incluyen la sustitución de valores perdidos, el escalado, la extracción de características y las transformaciones de ventanas deslizantes, entre otras cosas. Esto significa que podemos centrarnos más en desarrollar y optimizar nuestro modelo sin dedicar mucho tiempo a procesar datos.

La biblioteca también dispone de una interfaz unificada para evaluar el rendimiento de los modelos. Asimismo, implementa funciones de pérdida y métricas de validación para series temporales como QuantileLoss y SMAPE, y admite metodologías de aprendizaje como la detención temprana y la validación cruzada. Esto nos permite controlar y mejorar más cómodamente el rendimiento de nuestro modelo.

Si estamos buscando un método para mejorar la eficacia y la facilidad de mantenimiento de nuestro proyecto de previsión de series temporales, PyTorch Forecasting podría resultar una gran elección. La biblioteca ofrece herramientas eficientes y flexibles para organizar y gestionar nuestro código PyTorch, permitiéndonos centrarnos en el aspecto más importante: el propio modelo de aprendizaje automático.

Encontrará una descripción detallada de cómo usar la biblioteca en la documentación oficial: https://pytorch-forecasting.readthedocs.io/en/stable.

3. Modelo N-HiTS

El modelo N-HiTS resuelve los problemas de la volatilidad de las previsiones y la complejidad computacional de las previsiones a largo plazo introduciendo técnicas innovadoras de interpolación jerárquica y muestreo de datos multivariable. Esto permite al modelo N-HiTS aproximarse con eficacia a un rango de predicción de cualquier longitud.

Además, extensos experimentos realizados en conjuntos de datos a gran escala han demostrado que el modelo N-HiTS mejora la precisión en casi un 20% de media en comparación con la última arquitectura de Transformer, y reduce el tiempo de cálculo en un orden de magnitud (50 veces).

Enlace al artículo: https://doi.org/10.48550/arXiv.2201.12886.


Inicialización

Primero tendremos que importar las bibliotecas necesarias. Entre estas bibliotecas se encuentran MetaTrader 5 (para la interacción con el terminal del mismo nombre), PyTorch Lightning (para el entrenamiento de modelos) y algunas otras bibliotecas para el tratamiento y la visualización de datos.

import MetaTrader5 as mt5
import lightning.pytorch as pl
from lightning.pytorch.callbacks import EarlyStopping
import matplotlib.pyplot as plt
import pandas as pd
from pytorch_forecasting import Baseline, NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder
from pytorch_forecasting.metrics import MAE, SMAPE, MQF2DistributionLoss, QuantileLoss
from lightning.pytorch.tuner import Tuner

A continuación, deberemos inicializar MetaTrader 5. Esto se logra llamando a la función mt.initialise(). Si no podemos inicializarlo simplemente usándolo, necesitaremos transmitir la ruta al terminal MetaTrader 5 como parámetro a esta función (en el ejemplo "D:\Project\mt\MT5\terminal64.exe" es mi ruta, en una aplicación real usted necesitará establecer la suya propia). Si la inicialización se ha realizado correctamente, la función retornará True; en caso contrario, devolverá False.

if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
    print('initialize() failed!')
else:
    print(mt.version())

La función mt.symbols_total() se utiliza para obtener el número total de variedades comerciadas disponibles en el terminal MetaTrader 5. Podemos usarlo para determinar si podemos obtener los datos correctos. Si el número total es superior a 0, podremos utilizar la función mt.copy_rates_from_pos() para recuperar los datos históricos de la variedad comerciada especificada. En este ejemplo, tendremos los datos más recientes sobre la duración del periodo "mt_data_len" M15 (15 minutos) de la variedad GOLD_micro.

sb=mt.symbols_total()
rts=None
if sb > 0:
    rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
    mt.shutdown()

Finalmente, usaremos la función mt.shutdown() para cerrar la conexión con el terminal MetaTrader 5 y convertir los datos recibidos al formato Pandas DataFrame.

mt.shutdown()
rts_fm=pd.DataFrame(rts)

Veamos ahora el preprocesamiento de los datos obtenidos del terminal.

Primero deberemos convertir las marcas de tiempo en fechas:

rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

Aquí ya no describiremos cómo marcar los datos. Encontrará los métodos en los dos artículos anteriores (enlazados en la introducción de este artículo). Para demostrar brevemente cómo utilizar los modelos de predicción, simplemente categorizaremos todos los fragmentos de datos de max_encoder_length+2max_prediction_length en grupos. Cada grupo tendrá una secuencia de 0 a "max_encoder_length+2max_prediction_length-1". Vamos a rellenarla. De esta forma, añadiremos las marcas necesarias a los datos originales. Primero tendremos que transformar el índice temporal original (es decir, el índice DataFrame). Luego calcularemos el resto del índice de tiempo original dividido por (max_encoder_length+2max_prediction_length), y utilizaremos el resultado como nuevo índice de tiempo. Después haremos coincidir el índice de tiempo con un rango de 0 a "max_encoder_length+2*max_prediction_length-1":

rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 

También tendremos que convertir el índice de tiempo original en un grupo. A continuación, calcularemos el índice de tiempo original dividido por "max_encoder_length+2*max_prediction_length" y utilizaremos el resultado como nuevo grupo:

rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)

Asimismo, encapsularemos la parte correspondiente al preprocesamiento de datos en una función. Solo tendremos que transmitirle la longitud de los datos que necesitamos recuperar, y ya podremos completar el trabajo de preprocesamiento de datos:

def get_data(mt_data_len:int):
    if not mt.initialize("D:\\Project\\mt\\MT5\\terminal64.exe"):
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 
    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
return rts_fm


Reescritura de la clase pytorch_forecasting.TimeSeriesDataSet

La reescritura de la función to_dataloader() en pytorch_forecasting controla si los datos se mezclarán y si se eliminará el último grupo de un paquete (principalmente para evitar errores impredecibles causados por una longitud insuficiente del último grupo de datos). Aquí tiene cómo hacerlo:

class New_TmSrDt(TimeSeriesDataSet):
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:
        default_kwargs = dict(
            shuffle=shuffle,
            drop_last=drop_last, #modification
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)
Este código creará una nueva clase New_TmSrDt que heredará de TimeSeriesDataSet. La función to_dataloader() se sobrescribirá en esta nueva clase para incluir los parámetros shuffle y drop_last. De este modo, podremos controlar mejor el proceso de carga de datos. No se olvide de sustituir los ejemplares de TimeSeriesDataSet por New_TmSrDt en el código.


Creación de conjuntos de datos para el entrenamiento y la prueba

En primer lugar, tendremos que definir un punto de corte para los datos de entrenamiento. Esto se hará restando la longitud máxima de la previsión del valor máximo de time_idx.

max_encoder_length = 2*96
max_prediction_length = 30
training_cutoff = rts_fm["time_idx"].max() - max_prediction_length

A continuación, utilizaremos la clase New_TmSrDt (que supone la reescritura de nuestra clase TimeSeriesDataSet) para crear el conjunto de datos de entrenamiento. Esta clase requerirá los siguientes parámetros:

  • DataFrame (en este caso - rts_fm)
  • La columna time_idx, que supone una secuencia entera continua.
  • La columna objetivo (en este caso close), que representa el valor que queremos pronosticar.
  • La columna del grupo (en este caso series) que representa diferentes series temporales.
  • Las longitudes máximas del codificador y el predictor
context_length = max_encoder_length
prediction_length = max_prediction_length

training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )
validation = New_TmSrDt.from_dataset(training, 
                                  data, 
                                  min_prediction_idx=training_cutoff + 1)

A continuación, utilizaremos la función New_TmSrDt.from_dataset() para crear el conjunto de datos de comprobación. Los siguientes parámetros serán necesarios para esta función:

  • Conjunto de datos de entrenamiento
  • DataFrame
  • Índice mínimo de predicción; deberá ser 1 unidad mayor que el valor máximo de time_idx de los datos de entrenamiento.

validation = New_TmSrDt.from_dataset(training, rts_fm, min_prediction_idx=training_cutoff + 1)

Por último, utilizaremos la función to_dataloader() para convertir los conjuntos de datos de entrenamiento y validación en objetos PyTorch DataLoader. Los siguientes parámetros serán necesarios para esta función:

  • El parámetro train, que indica si los datos deben mezclarse o no.
  • El parámetro batch_size, que indica el número de muestras del lote.
  • El parámetro num_workers, que especifica el número de procesos de trabajo para cargar los datos.

train_dataloader = training.to_dataloader(train=True,
                                          shuffle=t_shuffle, 
                                          drop_last=t_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0,)
val_dataloader = validation.to_dataloader(train=False, 
                                          shuffle=v_shuffle,
                                          drop_last=v_drop_last,
                                          batch_size=batch_size, 
                                          num_workers=0)

Finalmente, encapsularemos este trozo de código en la función spilt_data(data:pd.DataFrame,t_drop_last:bool,t_shuffle:bool,v_drop_last:bool,v_shuffle:bool) y especificaremos los siguientes parámetros:

  • El parámetro data se utilizará para obtener el conjunto de datos que se va a procesar.
  • El parámetro t_drop_last especificará si debemos eliminar el último grupo del conjunto de datos de entrenamiento.
  • El parámetro t_shuffle especificará si debemos barajar los datos de entrenamiento.
  • El parámetro v_drop_last indicará si debemos eliminar el último grupo del conjunto de datos de validación.
  • El parámetro v_shuffle especificará si barajaremos los datos de validación.

train_dataloader (el ejemplar de dataloader para el conjunto de datos de entrenamiento), val_dataloader (el ejemplar de dataloader para el conjunto de datos de validación) y training (el ejemplar de TimeSeriesDataSet para el conjunto de datos) se utilizarán como valores de retorno de esta función porque los usaremos más adelante.

def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training


Creación y entrenamiento del modelo

Vamos a crear el modelo NHiTS. En esta parte veremos cómo configurar nuestros propios parámetros y cómo entrenar el modelo.

1. Encuentre la mejor curva de aprendizaje

Antes de empezar a construir el modelo, utilizaremos el objeto PyTorch Lightning Tuner para encontrar la mejor velocidad de aprendizaje.

En primer lugar, tendremos que crear un objeto Trainer donde el parámetro accelerator se utilizará para especificar el tipo de dispositivo, mientras que el gradient_clip_val se utilizará para evitar que la explosión del gradiente.

pl.seed_everything(42)
trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1)

A continuación, utilizaremos la función NHiTS.from_dataset() para crear la red modelo NHiTS. Los siguientes parámetros serán necesarios para esta función:

  • Conjunto de datos de entrenamiento
  • Velocidad de aprendizaje
  • Reducción del peso
  • Función de pérdida
  • Tamaño de la capa oculta
  • Optimizador
net = NHiTS.from_dataset(
    training,
    learning_rate=3e-2,
    weight_decay=1e-2,
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
)

A continuación crearemos un ejemplar de la clase Tuner y llamaremos a la función lr_find(). Luego entrenaremos el modelo basándonos en una serie de velocidades de aprendizaje de nuestro conjunto de datos y compararemos la pérdida de cada velocidad de aprendizaje para obtener la mejor.

res = Tuner(trainer).lr_find(
    net, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader, min_lr=1e-5, max_lr=1e-1
)
lr_=res.suggestion()

Del mismo modo, encapsularemos esta parte del código que obtiene la mejor velocidad de aprendizaje en la función get_learning_rate() y haremos que la mejor velocidad de aprendizaje resultante sea su valor de retorno:

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    lr_=res.suggestion()
    return lr_

Si queremos visualizar el ritmo de aprendizaje, podemos añadir el siguiente código:

print(f"suggested learning rate: {res.suggestion()}")
fig = res.plot(show=True, suggest=True)
fig.show()

El resultado en este ejemplo será el siguiente:

lr

velocidad de entrenamiento recomendada: 0.003981071705534973.

2. Definición de EarlyStopping Callback

Esta llamada de retorno se utilizará principalmente para controlar las pérdidas de comprobación y detener el entrenamiento cuando las pérdidas no mejoren durante varias épocas consecutivas. Esto podría impedir el sobreentrenamiento el modelo.

early_stop_callback = EarlyStopping(monitor="val_loss", 
                                    min_delta=1e-4, 
                                    patience=10,  
                                    verbose=True, 
                                    mode="min")

Aquí cabe destacar el parámetro patience, que básicamente determinará cuándo parar durante el entrenamiento si las pérdidas no disminuyen durante varias épocas consecutivas. Lo pondremos a 10.

3. Definición de ModelCheckpoint Callback

Esta llamada de retorno se utilizará principalmente para controlar el archivo del modelo y el nombre del archivo. Básicamente estableceremos las dos variables siguientes.

ck_callback=ModelCheckpoint(monitor='val_loss',
                            mode="min",
                            save_top_k=1,  
                            filename='{epoch}-{val_loss:.2f}')

save_top_k se utilizará para controlar el guardado de varios de los mejores modelos. Fijaremos el valor en 1 y nos quedaremos solo con el mejor modelo.

4. Definición de Training Model

Primero tendremos que crear un ejemplar de la clase Trainer en Lightning.pytorch y añadir las dos llamadas de retorno que hemos definido antes.

trainer = pl.Trainer(
    max_epochs=ep,
    accelerator="cpu",
    enable_model_summary=True,
    gradient_clip_val=1.0,
    callbacks=[early_stop_callback,ck_callback],
    limit_train_batches=30,
    enable_checkpointing=True,
)

Aquí deberemos prestar atención a los parámetros max_epochs (número máximo de épocas de entrenamiento), gradient_clip_val (utilizado para evitar la explosión del gradiente) y callbacks. Aquí max_epochs utilizará la variable global ep, que definiremos más adelante, mientras que callbacks será nuestra colección de callbacks.

A continuación, también tendremos que definir el modelo NHiTS y crear un ejemplar del mismo:

net = NHiTS.from_dataset(
    training,
    learning_rate=lr,
    log_interval=10,
    log_val_interval=1,
    weight_decay=1e-2,
    backcast_loss_ratio=0.0,
    hidden_size=64,
    optimizer="AdamW",
    loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
)

Aquí no tendremos que cambiar los parámetros en absoluto, bastará con utilizar los valores por defecto. Aquí solo estableceremos la pérdida en la función de pérdida MQF2DistributionLoss.

5. Módulo de entrenamiento 

Utilizaremos la función fit() del objeto Trainer para entrenar el modelo:

trainer.fit(
    net,
    train_dataloaders=train_dataloader,
    val_dataloaders=val_dataloader,
)

Del mismo modo, encapsularemos esta parte del código en la función train():

def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  # The number of times without improvement will stop
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  # Save the top few best ones
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
return trainer

Esta función retornará el modelo entrenado que se podrá utilizar para tareas de predicción.


Definición de la lógica de ejecución

1. Definición de variables globales:

ep=200
__train=False
mt_data_len=200000
max_encoder_length = 2*96
max_prediction_length = 30
batch_size = 128

__train se utilizará para controlar si estamos entrenando o probando el modelo.

Cabe destacar que ep se usará para controlar la época máxima de entrenamiento. Como hemos establecido EarlyStopping, este valor podrá aumentarse porque el modelo se detendrá automáticamente cuando se detenga la convergencia.

mt_data_len — número de los últimos datos de series temporales recibidos del cliente.

max_encoder_length y max_prediction_length — longitud máxima de codificación y longitud máxima de predicción, respectivamente.

2. Entrenamiento

También necesitaremos guardar los resultados del entrenamiento óptimo actual en un archivo local una vez finalizado el entrenamiento, por lo que definiremos un archivo json para guardar esta información:

info_file='results.json'

Para que nuestro proceso de aprendizaje sea más comprensible, deberemos evitar mostrar información de advertencia innecesaria durante el entrenamiento, por lo que añadiremos el siguiente código:

warnings.filterwarnings("ignore")

A continuación vendrá nuestra lógica del aprendizaje:

dt=get_data(mt_data_len=mt_data_len)
if __train:
    # print(dt)
    # dt=get_data(mt_data_len=mt_data_len)
    t_loader,v_loader,training=spilt_data(dt,
                                    t_shuffle=False,t_drop_last=True,
                                    v_shuffle=False,v_drop_last=True)
    lr=get_learning_rate()
    trainer__=train()
    m_c_back=trainer__.checkpoint_callback
    m_l_back=trainer__.early_stopping_callback
    best_m_p=m_c_back.best_model_path
    best_m_l=m_l_back.best_score.item()
    # print(best_m_p)
    if os.path.exists(info_file):
        with open(info_file,'r+') as f1:
            last=json.load(fp=f1)
            last_best_model=last['last_best_model']
            last_best_score=last['last_best_score']
            if last_best_score > best_m_l:
                last['last_best_model']=best_m_p
                last['last_best_score']=best_m_l
                json.dump(last,fp=f1)
    else:               
        with open(info_file,'w') as f2:
            json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

Una vez finalizado el entrenamiento, podremos encontrar dónde se almacenan nuestro mejor modelo y nuestro mejor resultado en el archivo results.json del directorio raíz.

Durante el proceso de entrenamiento, veremos una barra de progreso que muestra el progreso de cada época. 

Entrenamiento:

entrenamiento

El entrenamiento ha finalizado:

ts

3. Comprobación del modelo

Tras el entrenamiento, deberemos validar el modelo y visualizarlo. Podemos añadir el código siguiente:

best_model = NHiTS.load_from_checkpoint(best_m_p)
predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
for idx in range(10):  # plot 10 examples
    best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
    # sample 500 paths
samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

# plot prediction
fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
ax = fig.get_axes()[0]
# plot first two sampled paths
ax.plot(samples[:, 0], color="g", label="Sample 1")
ax.plot(samples[:, 1], color="r", label="Sample 2")
fig.legend()
plt.show()

También podremos utilizar TensorBoard para visualizar el proceso en tiempo real durante el entrenamiento.

Resultado:

ref

4. Prueba del modelo entrenado

En primer lugar, abriremos el archivo json para encontrar la ubicación de almacenamiento óptima para el modelo:

with open(info_file) as f:
    best_m_p=json.load(fp=f)['last_best_model']
print('model path is:',best_m_p)

Luego cargaremos el modelo:

best_model = NHiTS.load_from_checkpoint(best_m_p)

A continuación, obtendremos los datos en tiempo real del cliente para probar el modelo:

offset=1
dt=dt.iloc[-max_encoder_length-offset:-offset,:]
last_=dt.iloc[-1] #get the last group of data
# print(len(dt))
for i in range(1,max_prediction_length+1):
    dt.loc[dt.index[-1]+1]=last_
dt['series']=0
# dt['time_idx']=dt.apply(lambda x:x.index,args=1)
dt['time_idx']=dt.index-dt.index[0]
# dt=get_data(mt_data_len=max_encoder_length)
predictions=best_model.predict(dt,mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
plt.show()

Y aquí tenemos el resultado:

pref

5.  Valoración del modelo

Obviamente, podemos usar algunas métricas de la biblioteca PyTorch Forecasting para evaluar el rendimiento del modelo. A continuación veremos cómo realizar una valoración utilizando el error medio absoluto (MAE) y el error porcentual absoluto medio simétrico (SMAPE) y obtener los resultados de la valoración:

from pytorch_forecasting.metrics import MAE, SMAPE
mae = MAE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Mean Absolute Error: {mae}")
smape = SMAPE()(raw_predictions["prediction"], raw_predictions["target"])
print(f"Symmetric Mean Absolute Percentage Error: {smape}")

En este fragmento de código, primero importaremos las métricas de MAE y SMAPE. A continuación, utilizaremos estas métricas para calcular el error entre los valores predichos (raw_predictions["prediction"]) y los valores reales (raw_predictions["target"]). Estas métricas pueden ayudarnos a ver el rendimiento de nuestro modelo y orientarnos hacia nuevas mejoras.



Conclusión

En este artículo, hemos visto cómo usar los datos de marcado mencionados en los dos artículos anteriores y hemos mostrado cómo crear un modelo de N-HiTs utilizando nuestros datos. Acto seguido, hemos entrenado el modelo y lo hemos comprobado. Como puede ver, hemos avanzado mucho. También hemos demostrado cómo utilizar este modelo en MetaTrader 5 para predecir 30 velas. Obviamente, no hemos mencionado cómo colocar órdenes basadas en los resultados de la predicción, ya que el trading real requiere que los lectores realicen muchas pruebas según su situación real y especifiquen las reglas comerciales correspondientes.

¡Gracias por su atención!


App:

Código completo:

# Copyright 2021, MetaQuotes Ltd.
# https://www.mql5.com


# from typing import Union
import lightning.pytorch as pl
import os
from lightning.pytorch.callbacks import EarlyStopping,ModelCheckpoint
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# import torch
from pytorch_forecasting import NHiTS, TimeSeriesDataSet
from pytorch_forecasting.data import NaNLabelEncoder,timeseries
from pytorch_forecasting.metrics import MQF2DistributionLoss
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from lightning.pytorch.tuner import Tuner
import MetaTrader5 as mt
import warnings
import json


from torch.utils.data import DataLoader
from torch.utils.data.sampler import Sampler,SequentialSampler

class New_TmSrDt(TimeSeriesDataSet):
    '''
    rewrite dataset class
    '''
    def to_dataloader(self, train: bool = True, 
                      batch_size: int = 64, 
                      batch_sampler: Sampler | str = None, 
                      shuffle:bool=False,
                      drop_last:bool=False,
                      **kwargs) -> DataLoader:

        default_kwargs = dict(
            shuffle=shuffle,
            # drop_last=train and len(self) > batch_size,
            drop_last=drop_last, #
            collate_fn=self._collate_fn,
            batch_size=batch_size,
            batch_sampler=batch_sampler,
        )
        default_kwargs.update(kwargs)
        kwargs = default_kwargs
        # print(kwargs['drop_last'])
        if kwargs["batch_sampler"] is not None:
            sampler = kwargs["batch_sampler"]
            if isinstance(sampler, str):
                if sampler == "synchronized":
                    kwargs["batch_sampler"] = TimeSynchronizedBatchSampler(
                        SequentialSampler(self),
                        batch_size=kwargs["batch_size"],
                        shuffle=kwargs["shuffle"],
                        drop_last=kwargs["drop_last"],
                    )
                else:
                    raise ValueError(f"batch_sampler {sampler} unknown - see docstring for valid batch_sampler")
            del kwargs["batch_size"]
            del kwargs["shuffle"]
            del kwargs["drop_last"]

        return DataLoader(self,**kwargs)

def get_data(mt_data_len:int):
    if not mt.initialize():
        print('initialize() failed!') 
    else:
        print(mt.version())
        sb=mt.symbols_total()
        rts=None
        if sb > 0:
            rts=mt.copy_rates_from_pos("GOLD_micro",mt.TIMEFRAME_M15,0,mt_data_len) 
        mt.shutdown()
        # print(len(rts))
    rts_fm=pd.DataFrame(rts)
    rts_fm['time']=pd.to_datetime(rts_fm['time'], unit='s') 

    rts_fm['time_idx']= rts_fm.index%(max_encoder_length+2*max_prediction_length) 
    rts_fm['series']=rts_fm.index//(max_encoder_length+2*max_prediction_length)
    return rts_fm


def spilt_data(data:pd.DataFrame,
               t_drop_last:bool,
               t_shuffle:bool,
               v_drop_last:bool,
               v_shuffle:bool):
    training_cutoff = data["time_idx"].max() - max_prediction_length #max:95
    context_length = max_encoder_length
    prediction_length = max_prediction_length
    training = New_TmSrDt(
        data[lambda x: x.time_idx <= training_cutoff],
        time_idx="time_idx",
        target="close",
        categorical_encoders={"series":NaNLabelEncoder().fit(data.series)},
        group_ids=["series"],
        time_varying_unknown_reals=["close"],
        max_encoder_length=context_length,
        # min_encoder_length=max_encoder_length//2,
        max_prediction_length=prediction_length,
        # min_prediction_length=1,
    )

    validation = New_TmSrDt.from_dataset(training, 
                                         data, 
                                         min_prediction_idx=training_cutoff + 1)
    
    train_dataloader = training.to_dataloader(train=True,
                                              shuffle=t_shuffle, 
                                              drop_last=t_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0,)
    val_dataloader = validation.to_dataloader(train=False, 
                                              shuffle=v_shuffle,
                                              drop_last=v_drop_last,
                                              batch_size=batch_size, 
                                              num_workers=0)
    return train_dataloader,val_dataloader,training

def get_learning_rate():
    
    pl.seed_everything(42)
    trainer = pl.Trainer(accelerator="cpu", gradient_clip_val=0.1,logger=False)
    net = NHiTS.from_dataset(
        training,
        learning_rate=3e-2,
        weight_decay=1e-2,
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
    )
    res = Tuner(trainer).lr_find(
        net, train_dataloaders=t_loader, val_dataloaders=v_loader, min_lr=1e-5, max_lr=1e-1
    )
    # print(f"suggested learning rate: {res.suggestion()}")
    lr_=res.suggestion()
    return lr_
def train():
    early_stop_callback = EarlyStopping(monitor="val_loss", 
                                        min_delta=1e-4, 
                                        patience=10,  
                                        verbose=True, 
                                        mode="min")
    ck_callback=ModelCheckpoint(monitor='val_loss',
                                mode="min",
                                save_top_k=1,  
                                filename='{epoch}-{val_loss:.2f}')
    trainer = pl.Trainer(
        max_epochs=ep,
        accelerator="cpu",
        enable_model_summary=True,
        gradient_clip_val=1.0,
        callbacks=[early_stop_callback,ck_callback],
        limit_train_batches=30,
        enable_checkpointing=True,
    )
    net = NHiTS.from_dataset(
        training,
        learning_rate=lr,
        log_interval=10,
        log_val_interval=1,
        weight_decay=1e-2,
        backcast_loss_ratio=0.0,
        hidden_size=64,
        optimizer="AdamW",
        loss=MQF2DistributionLoss(prediction_length=max_prediction_length),
    )
    trainer.fit(
        net,
        train_dataloaders=t_loader,
        val_dataloaders=v_loader,
        # ckpt_path='best'
    )
    return trainer

if __name__=='__main__':
    ep=200
    __train=False
    mt_data_len=200000
    max_encoder_length = 2*96
    max_prediction_length = 30
    batch_size = 128
    info_file='results.json'
    warnings.filterwarnings("ignore")
    dt=get_data(mt_data_len=mt_data_len)
    if __train:
        # print(dt)
        # dt=get_data(mt_data_len=mt_data_len)
        t_loader,v_loader,training=spilt_data(dt,
                                              t_shuffle=False,t_drop_last=True,
                                              v_shuffle=False,v_drop_last=True)
        lr=get_learning_rate()
        trainer__=train()
        m_c_back=trainer__.checkpoint_callback
        m_l_back=trainer__.early_stopping_callback
        best_m_p=m_c_back.best_model_path
        best_m_l=m_l_back.best_score.item()

        # print(best_m_p)
        
        if os.path.exists(info_file):
            with open(info_file,'r+') as f1:
                last=json.load(fp=f1)
                last_best_model=last['last_best_model']
                last_best_score=last['last_best_score']
                if last_best_score > best_m_l:
                    last['last_best_model']=best_m_p
                    last['last_best_score']=best_m_l
                    json.dump(last,fp=f1)
        else:               
            with open(info_file,'w') as f2:
                json.dump(dict(last_best_model=best_m_p,last_best_score=best_m_l),fp=f2)

        best_model = NHiTS.load_from_checkpoint(best_m_p)
        predictions = best_model.predict(v_loader, trainer_kwargs=dict(accelerator="cpu",logger=False), return_y=True)
        raw_predictions = best_model.predict(v_loader, mode="raw", return_x=True, trainer_kwargs=dict(accelerator="cpu",logger=False))
    
        for idx in range(10):  # plot 10 examples
            best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=idx, add_loss_to_title=True)
        samples = best_model.loss.sample(raw_predictions.output["prediction"][[0]], n_samples=500)[0]

        # plot prediction
        fig = best_model.plot_prediction(raw_predictions.x, raw_predictions.output, idx=0, add_loss_to_title=True)
        ax = fig.get_axes()[0]
        # plot first two sampled paths
        ax.plot(samples[:, 0], color="g", label="Sample 1")
        ax.plot(samples[:, 1], color="r", label="Sample 2")
        fig.legend()
        plt.show()
    else:
        with open(info_file) as f:
            best_m_p=json.load(fp=f)['last_best_model']
        print('model path is:',best_m_p)
        
        best_model = NHiTS.load_from_checkpoint(best_m_p)

        offset=1
        dt=dt.iloc[-max_encoder_length-offset:-offset,:]
        last_=dt.iloc[-1] 
        # print(len(dt))
        for i in range(1,max_prediction_length+1):
            dt.loc[dt.index[-1]+1]=last_
        dt['series']=0
        # dt['time_idx']=dt.apply(lambda x:x.index,args=1)
        dt['time_idx']=dt.index-dt.index[0]
        # dt=get_data(mt_data_len=max_encoder_length)
        predictions = best_model.predict(dt, mode='raw',trainer_kwargs=dict(accelerator="cpu",logger=False),return_x=True)
        best_model.plot_prediction(predictions.x,predictions.output,show_future_observed=False)
        plt.show()


Traducción del inglés realizada por MetaQuotes Ltd.
Artículo original: https://www.mql5.com/en/articles/13255

Archivos adjuntos |
n_hits.py (9.7 KB)
Algoritmos de optimización de la población: Búsqueda por difusión estocástica (Stochastic Diffusion Search, SDS) Algoritmos de optimización de la población: Búsqueda por difusión estocástica (Stochastic Diffusion Search, SDS)
En este artículo veremos la búsqueda por difusión estocástica, o SDS, que es un algoritmo de optimización muy potente y eficiente basado en los principios del paseo aleatorio. El algoritmo puede encontrar soluciones óptimas en espacios multidimensionales complejos, con una alta tasa de convergencia y la capacidad de evitar extremos locales.
Teoría de Categorías en MQL5 (Parte 23): Otra mirada a la media móvil exponencial doble Teoría de Categorías en MQL5 (Parte 23): Otra mirada a la media móvil exponencial doble
En este artículo, seguiremos analizando desde un nuevo ángulo los indicadores comerciales más populares. Vamos a procesar una composición horizontal de transformaciones naturales. El mejor indicador para ello será la media móvil exponencial doble (Double Exponential Moving Average, DEMA).
Redes neuronales: así de sencillo (Parte 59): Dicotomía de control (DoC) Redes neuronales: así de sencillo (Parte 59): Dicotomía de control (DoC)
En el artículo anterior nos familiarizamos con el transformador de decisión. Sin embargo, el complejo entorno estocástico del mercado de divisas no nos permitió aprovechar plenamente el potencial del método presentado. Hoy veremos un algoritmo que tiene como objetivo mejorar el rendimiento de los algoritmos en entornos estocásticos.
Creamos un asesor multidivisa sencillo utilizando MQL5 (Parte 2): Señales del indicador - Parabolic SAR de marco temporal múltiple Creamos un asesor multidivisa sencillo utilizando MQL5 (Parte 2): Señales del indicador - Parabolic SAR de marco temporal múltiple
En este artículo, entenderemos por asesor multidivisa un asesor o robot comercial que puede comerciar (abrir/cerrar órdenes, gestionar órdenes, por ejemplo, trailing-stop y trailing-profit, etc.) con más de un par de símbolos de un gráfico. Esta vez usaremos solo un indicador, a saber, Parabolic SAR o iSAR en varios marcos temporales, comenzando desde PERIOD_M15 y terminando con PERIOD_D1.