
Robô de trading multimódulo em Python e MQL5 (Parte I): Criando a arquitetura básica e os primeiros módulos
Introdução
Um dia tive uma ideia: os robôs de trading são simples demais para o mercado atual, precisamos de algo mais flexível e inteligente.
O mercado muda constantemente. Hoje uma estratégia funciona, amanhã já não serve mais. Observei isso por muito tempo e percebi que era necessário um novo caminho. A solução veio de repente. E se a gente criasse um sistema modular? Imagine uma equipe de profissionais: um monitora tendências, outro analisa volumes de negociação, outro cuida dos riscos. É assim que um robô de trading moderno deve funcionar!
A escolha da tecnologia foi óbvia. Python era perfeito para análise de dados, com suas bibliotecas que fazem maravilhas. O MQL5 ficou responsável pela execução das ordens. Criamos um ótimo time. Começamos com o básico: primeiro criamos uma base sólida, ou seja, uma arquitetura que pudesse crescer e evoluir, depois adicionamos a interação entre Python e MQL5. O sistema de gestão de dados ficou surpreendentemente simples e eficiente.
A assíncronia foi um verdadeiro avanço! Agora o robô conseguia monitorar vários instrumentos ao mesmo tempo. A performance subiu enormemente.
Sabe o que é mais interessante? Esse sistema realmente funciona no mercado. Não é só um exemplo teórico, mas uma ferramenta real de uso prático. Claro que começamos com uma versão básica, mas mesmo assim ela já impressiona. Temos uma longa jornada pela frente. Vamos criar um sistema capaz de aprender e se adaptar. Iremos melhorá-lo passo a passo. Mas por agora, vamos começar pelo mais importante: construir uma base confiável.
Arquitetura básica do sistema. Em busca do equilíbrio ideal
Passei três anos tentando criar robôs de trading. E sabe o que percebi? O mais importante não são os algoritmos em si, mas como eles funcionam em conjunto. Essa descoberta mudou tudo.
Imagine uma orquestra sinfônica. Cada músico é excelente, mas sem um maestro não há música. No meu sistema, esse maestro é o MarketMaker. Ele gerencia quatro módulos, e cada um deles sabe exatamente o que fazer:
- O primeiro módulo acompanha os volumes de negociação: quando e a que preços os negócios estão sendo fechados.
- O segundo módulo busca oportunidades de arbitragem.
- O terceiro módulo analisa a economia.
- O quarto módulo impede o sistema de se empolgar e controla os riscos.
O mercado não espera por ninguém. Ele muda num piscar de olhos, por isso todos os módulos operam ao mesmo tempo, em constante comunicação. O módulo de arbitragem vê uma oportunidade e avisa o centro. O centro verifica a informação com os outros módulos e toma uma decisão.
No início, pensei em definir regras rígidas de entrada no mercado. Mas a realidade logo mostrou que isso não funciona. Às vezes, um sinal forte vale mais do que vários sinais fracos. E quanto tempo foi gasto na organização dos dados! Cada módulo tem suas próprias informações: cotações, indicadores macroeconômicos, histórico de negociações. Tudo isso precisa ser armazenado, atualizado e compartilhado com os outros. Foi necessário criar um sistema especial de sincronização.
Engraçado como quanto mais independentes eram os módulos, melhor o sistema funcionava. A falha de um componente não parava os demais. E falhas acontecem: às vezes a conexão cai, às vezes as cotações travam. A principal vantagem dessa arquitetura é que ela pode ser expandida. Quer adicionar uma análise de notícias? Sem problema! Crie um módulo, conecte ao MarketMaker, e tudo funciona.
O sistema está vivo e em constante evolução. Ele não é perfeito, mas sua base é modularidade, operação paralela e flexibilidade, o que nos permite olhar para o futuro com confiança. Em breve falarei com mais detalhes sobre cada componente.
Classe principal do sistema
Depois de muitos testes com diferentes abordagens para a arquitetura de robôs de trading, cheguei à conclusão de que o sucesso do sistema depende muito de como seu núcleo está estruturado. O MarketMaker foi a materialização desse entendimento, incorporando todas as melhores práticas que acumulei ao longo dos anos desenvolvendo sistemas algorítmicos.
Vamos começar com a estrutura básica da classe. Veja como é sua inicialização:
def __init__(self, pairs: List[str], terminal_path: str, volume: float = 1.0, levels: int = 5, spacing: float = 3.0): # Main parameters self.pairs = pairs self.base_volume = volume self.levels = levels self.spacing = spacing self.magic = 12345 # Trading parameters self.portfolio_iterations = 10 self.leverage = 50 self.min_profit_pips = 1.0 self.max_spread_multiplier = 2.0 # Data warehouses self.symbols_info = {} self.trading_parameters = {} self.optimal_horizons = {}
À primeira vista, tudo parece bem simples. Mas cada parâmetro tem sua história. Pegue, por exemplo, o portfolio_iterations: esse parâmetro surgiu depois que percebi que abrir posições de forma agressiva demais podia causar problemas de liquidez. Agora, o sistema divide o volume disponível em partes, o que torna a negociação mais equilibrada.
Dediquei atenção especial à inicialização dos dados históricos. Veja como isso funciona:
def _initialize_history(self, pair: str): """Initializing historical data for a pair""" try: rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, datetime.now()-timedelta(days=1), 1440) if rates is not None: df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) returns = np.log(df['close'] / df['close'].shift(1)).dropna() self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:]) except Exception as e: logger.error(f"Error initializing history for {pair}: {e}")
O ponto interessante aqui é o uso de retornos logarítmicos em vez de variações percentuais simples. Essa escolha não foi por acaso. Na prática, percebi que os retornos logarítmicos produzem resultados mais estáveis ao calcular métricas estatísticas, especialmente quando estamos falando de volatilidade.
Um dos pontos mais difíceis foi implementar a previsão de volumes. Depois de muitos experimentos, surgiu o seguinte código:
async def update_volume_predictions(self): """Updating volume predictions for each pair""" for pair in self.pairs: try: df = volume_model.get_volume_data( symbol=pair, timeframe=mt5.TIMEFRAME_H1, n_bars=100 ) if pair in self.volume_models: feature_columns = [ 'volume_sma_5', 'volume_sma_20', 'relative_volume', 'volume_change', 'volume_volatility', 'price_sma_5', 'price_sma_20', 'price_change', 'price_volatility', 'rsi', 'macd', 'macd_signal', 'bb_upper', 'bb_lower' ] X = df[feature_columns].iloc[-1:].copy() prediction = self.volume_models[pair].predict(X)[0] current_price = df['close'].iloc[-1] predicted_change = (prediction - current_price) / current_price self.volume_predictions[pair] = predicted_change except Exception as e: logger.error(f"Error updating prediction for {pair}: {e}")
Repare no conjunto de características, pois não é apenas um agrupamento aleatório de indicadores. Cada um foi adicionado de forma gradual, após testes rigorosos. Por exemplo, o volume relativo (relative_volume) mostrou-se especialmente útil para identificar atividade anormal no mercado.
Veja agora como é o coração do sistema, ou seja, o ciclo de negociação:
async def trade_cycle(self): """Main trading loop""" try: await self.update_volume_predictions() await self.economic_module.update_forecasts() all_positions = mt5.positions_get() or [] open_positions = [pos for pos in all_positions if pos.magic == self.magic] if open_positions: await self.manage_positions() return valid_signals = [] available_volume = self.calculate_available_volume() * len(self.pairs) for pair in self.pairs: signal = await self.get_combined_signal(pair) if signal and self._validate_signal(signal): valid_signals.append(signal) if valid_signals: volume_per_trade = available_volume / len(valid_signals) for signal in valid_signals: signal['adjusted_volume'] = volume_per_trade await self.place_order(signal) except Exception as e: logger.error(f"Error in trade cycle: {e}")
Esse código é o resultado de muita reflexão sobre como estruturar corretamente o processo de negociação. A natureza assíncrona do ciclo permite processar eficientemente múltiplos pares ao mesmo tempo, e a sequência clara de ações (atualização de previsões → verificação de posições → busca de sinais → execução) garante um comportamento previsível do sistema.
Vale destacar também o mecanismo de validação dos sinais:
def _validate_signal(self, signal: Dict) -> bool: """Trading signal check""" spread = signal['spread'] diff_pips = signal['diff_pips'] # Basic checks if spread > self.max_spread_multiplier * diff_pips: return False if diff_pips < self.min_profit_pips: return False # Check economic factors if signal['economic_volatility'] > self.volatility_threshold: return False # Check the volume prediction if abs(signal['volume_prediction']) < self.min_volume_change: return False return True
Cada verificação aqui surgiu da experiência prática de negociação. Por exemplo, a checagem da volatilidade econômica foi implementada depois que percebi que negociar durante anúncios de notícias importantes frequentemente resultava em prejuízos maiores, devido a movimentos bruscos de preço.
Para concluir, quero reforçar que o MarketMaker é um sistema vivo, que continua evoluindo. Cada dia de operação traz novas ideias e melhorias. A arquitetura modular facilita a implementação dessas melhorias sem afetar o funcionamento dos componentes principais.
Trabalho com dados
O trabalho com dados sempre foi um dos aspectos mais complexos do trading algorítmico. Lembro que, no início do desenvolvimento, me deparei com uma questão aparentemente simples: como organizar corretamente o armazenamento e o processamento das informações de mercado? Logo ficou claro que um banco de dados comum ou simples arrays não seriam suficientes.
Tudo começou com a criação de uma estrutura básica para a obtenção dos dados. Depois de algumas iterações, surgiu o seguinte método:
def _initialize_history(self, pair: str): try: rates = mt5.copy_rates_from(pair, mt5.TIMEFRAME_M1, datetime.now()-timedelta(days=1), 1440) if rates is None: logger.error(f"Failed to get history data for {pair}") return df = pd.DataFrame(rates) df['time'] = pd.to_datetime(df['time'], unit='s') df.set_index('time', inplace=True) # Calculate logarithmic returns returns = np.log(df['close'] / df['close'].shift(1)).dropna() # Add new metrics df['typical_price'] = (df['high'] + df['low'] + df['close']) / 3 df['price_velocity'] = df['close'].diff() / df['time'].diff().dt.total_seconds() df['volume_intensity'] = df['tick_volume'] / df['time'].diff().dt.total_seconds() self.returns_history[pair] = pd.Series(returns.values, index=df.index[1:]) self.price_data[pair] = df except Exception as e: logger.error(f"Error initializing history for {pair}: {e}")
Um ponto interessante aqui é o cálculo da “velocidade” de variação do preço (price_velocity) e da intensidade de volume (volume_intensity). Essas métricas não surgiram logo de cara. Inicialmente eu trabalhava apenas com os dados de preço normais, mas logo percebi que o mercado não é apenas uma sequência de preços, e sim um sistema dinâmico complexo, onde não importa só a magnitude das mudanças, mas também a velocidade com que elas acontecem.
Foi necessário dar atenção especial ao tratamento de dados ausentes. Veja como funciona o sistema de validação e limpeza:
def _validate_and_clean_data(self, df: pd.DataFrame) -> pd.DataFrame: """Validation and data cleaning""" if df.empty: raise ValueError("Empty dataset received") # Check gaps missing_count = df.isnull().sum() if missing_count.any(): logger.warning(f"Found missing values: {missing_count}") # Use 'forward fill' for prices price_cols = ['open', 'high', 'low', 'close'] df[price_cols] = df[price_cols].ffill() # Use interpolation for volumes df['tick_volume'] = df['tick_volume'].interpolate(method='linear') # Check outliers for col in ['high', 'low', 'close']: zscore = stats.zscore(df[col]) outliers = abs(zscore) > 3 if outliers.any(): logger.warning(f"Found {outliers.sum()} outliers in {col}") # Replace extreme outliers df.loc[outliers, col] = df[col].mean() + 3 * df[col].std() * np.sign(zscore[outliers]) return df
Lembro de um caso em que a ausência de apenas um tick levou a um cálculo incorreto dos indicadores e, consequentemente, a um sinal de negociação errado. Depois disso, o sistema de limpeza de dados foi bastante aprimorado.
Veja como trabalhamos com volumes, uma das características mais importantes do mercado:
def analyze_volume_profile(self, pair: str, window: int = 100) -> Dict: """Volume profile analysis""" try: df = self.price_data[pair].copy().last(window) # Normalize volumes volume_mean = df['tick_volume'].rolling(20).mean() volume_std = df['tick_volume'].rolling(20).std() df['normalized_volume'] = (df['tick_volume'] - volume_mean) / volume_std # Calculate volume clusters price_levels = pd.qcut(df['close'], q=10) volume_clusters = df.groupby(price_levels)['tick_volume'].sum() # Find support/resistance levels by volume significant_levels = volume_clusters[volume_clusters > volume_clusters.mean() + volume_clusters.std()] # Analyze imbalances buy_volume = df[df['close'] > df['open']]['tick_volume'].sum() sell_volume = df[df['close'] <= df['open']]['tick_volume'].sum() volume_imbalance = (buy_volume - sell_volume) / (buy_volume + sell_volume) return { 'normalized_profile': volume_clusters.to_dict(), 'significant_levels': significant_levels.index.to_list(), 'volume_imbalance': volume_imbalance, 'current_percentile': stats.percentileofscore(df['tick_volume'], df['tick_volume'].iloc[-1]) } except Exception as e: logger.error(f"Error analyzing volume profile: {e}") return None
Esse código é o resultado de um estudo aprofundado da microestrutura do mercado. O cálculo do desequilíbrio de volumes entre compras e vendas é especialmente interessante. Comecei a estudar isso no mercado cripto, e não sei se a administração do MQL5 permitirá a publicação do código com integração entre corretora cripto, MetaTrader 5 e Python...
Mas enfim, me desviei do assunto. À primeira vista pode parecer que uma simples comparação entre volumes em barras de alta e de baixa não traz informações úteis. Mas a prática mostrou que esse indicador simples frequentemente antecipa uma reversão de tendência.
O trabalho com dados econômicos é uma história à parte. Aqui foi necessário criar um sistema inteiro de sincronização:
async def synchronize_market_data(self): """Market data synchronization""" while True: try: # Update basic data for pair in self.pairs: latest_data = await self._get_latest_ticks(pair) if latest_data is not None: self._update_price_data(pair, latest_data) # Update derived metrics await self._update_derivatives() # Check data integrity self._verify_data_integrity() await asyncio.sleep(1) # Dynamic delay except Exception as e: logger.error(f"Error in data synchronization: {e}") await asyncio.sleep(5) # Increased delay on error
O ponto principal aqui é que a atualização dos dados acontece de forma assíncrona. Nas versões iniciais do sistema eu usava requisições síncronas, mas isso gerava atrasos no processamento de um grande número de pares. A migração para um modelo assíncrono permitiu um salto significativo na performance.
Para finalizar, quero destacar que a organização correta do trabalho com dados não é apenas uma questão técnica. É a base sobre a qual toda a estratégia de trading é construída. Dados limpos e bem estruturados permitem enxergar padrões de mercado que ficam ocultos numa análise superficial.
Primeiro módulo: análise de volumes
A criação do módulo de análise de volumes começou com uma observação simples: os indicadores clássicos muitas vezes têm atraso, pois trabalham apenas com preços. Mas o mercado não é só preço, ele também é volume de negociação, que muitas vezes antecipa o movimento das cotações. Por isso, o primeiro módulo do nosso sistema foi o analisador de volumes.
Comecemos com a função básica de obtenção de dados:
def get_volume_data(symbol, timeframe=mt5.TIMEFRAME_H1, n_bars=2000): """Getting volume and price data from MT5""" try: bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, n_bars) if bars is None: logger.error(f"Failed to get data for {symbol}") return None df = pd.DataFrame(bars) df['time'] = pd.to_datetime(df['time'], unit='s') return df except Exception as e: logger.error(f"Error getting data for {symbol}: {e}") return None
À primeira vista, a função parece simples. Mas por trás dessa simplicidade está uma decisão importante: usamos exatamente 2000 barras de histórico. Por quê? Com testes, descobri que essa quantidade é suficiente para construir um modelo de qualidade, sem sobrecarregar a memória do servidor, mesmo no caso de treinar modelos muito grandes, com alta dimensionalidade de datasets e entrada de características como sequências em batchs.
A parte mais interessante do módulo está na criação dos atributos para análise. Veja como isso funciona:
def create_features(df, forecast_periods=None): """Create features for the forecasting model""" try: # Basic volume indicators df['volume_sma_5'] = df['tick_volume'].rolling(window=5).mean() df['volume_sma_20'] = df['tick_volume'].rolling(window=20).mean() df['relative_volume'] = df['tick_volume'] / df['volume_sma_20'] # Volume dynamics df['volume_change'] = df['tick_volume'].pct_change() df['volume_acceleration'] = df['volume_change'].diff() # Volume volatility df['volume_volatility'] = df['tick_volume'].rolling(window=20).std() df['volume_volatility_5'] = df['tick_volume'].rolling(window=5).std() df['volume_volatility_ratio'] = df['volume_volatility_5'] / df['volume_volatility']
Um destaque especial aqui vai para o volume_volatility_ratio. Esse indicador surgiu depois que notei um padrão curioso: antes de movimentos fortes, a volatilidade de curto prazo dos volumes frequentemente começa a subir mais rápido do que a de longo prazo. Esse indicador se tornou um dos principais na identificação de possíveis pontos de entrada.
Uma história interessante está ligada ao cálculo do perfil de volume:
# Volume profile df['volume_percentile'] = df['tick_volume'].rolling(window=100).apply( lambda x: pd.Series(x).rank(pct=True).iloc[-1] ) df['volume_density'] = df['tick_volume'] / (df['high'] - df['low']) df['volume_density_ma'] = df['volume_density'].rolling(window=20).mean() df['cumulative_volume'] = df['tick_volume'].rolling(window=20).sum() df['volume_ratio'] = df['tick_volume'] / df['cumulative_volume']
O indicador volume_density não surgiu por acaso. Percebi que o volume, isoladamente, pode ser enganoso, por isso é essencial considerar em qual faixa de preços ele foi acumulado. Um volume alto concentrado numa faixa estreita de preços geralmente indica a formação de um nível importante de suporte ou resistência.
Para prever a direção do movimento do preço, desenvolvi uma função específica:
def predict_direction(model, X): """Price movement direction forecast""" try: prediction = model.predict(X)[0] current_price = X['close'].iloc[-1] if 'close' in X else None if current_price is None: return 0 # Return 1 for rise, -1 for fall, 0 for neutral price_change = (prediction - current_price) / current_price if abs(price_change) < 0.0001: # Minimum change threshold return 0 return 1 if price_change > 0 else -1 except Exception as e: logger.error(f"Error predicting direction: {e}") return 0
Repare no limiar de variação de 0.0001. Esse número não é aleatório: ele foi definido com base na análise do menor movimento que pode ser compensado, considerando o spread e diferentes tipos de comissões. Para o mercado de ações, esse valor precisa ser ajustado separadamente.
A etapa final é o treinamento do modelo.
def train_model(X_train, X_test, y_train, y_test, model_params=None): try: if model_params is None: model_params = {'n_estimators': 400, 'random_state': 42} model = RandomForestRegressor(**model_params) model.fit(X_train, y_train) # Model evaluation train_predictions = model.predict(X_train) test_predictions = model.predict(X_test) train_rmse = np.sqrt(mean_squared_error(y_train, train_predictions)) test_rmse = np.sqrt(mean_squared_error(y_test, test_predictions)) test_r2 = r2_score(y_test, test_predictions)
Sabe por que escolhi justamente o RandomForest com 400 árvores? Testei de tudo, desde regressão simples até redes neurais com arquiteturas incrivelmente complexas, e cheguei à conclusão de que esse método é o mais confiável. Talvez não seja o mais preciso, mas é estável. O mercado é barulhento, instável, e o RandomForest se mantém firme.
Isso é só o começo, claro. O mais interessante vem depois: como juntar todos esses sinais, como configurar o sistema para que ele aprenda de forma autônoma? Mas isso fica para a próxima.
Gerenciamento de riscos: a arte de preservar o capital
Agora vamos falar do mais importante: riscos. É curioso ouvir as pessoas discutindo estratégias incríveis e redes neurais sofisticadas. Em dez anos de mercado, aprendi uma lição essencial: sem controle de risco, nenhuma estratégia vale nada. Você pode ter o algoritmo de negociação mais avançado, mas sem um bom gerenciamento de risco, vai acabar no prejuízo.
Por isso, no nosso sistema, a proteção do capital é prioridade absoluta. E sabe de uma coisa? É justamente essa abordagem conservadora que nos permite lucrar de forma constante, enquanto outros perdem dinheiro com estratégias “perfeitas”.
def calculate_available_volume(self) -> float: try: account = mt5.account_info() if not account: return 0.01 # Use balance and free margin balance = account.balance free_margin = account.margin_free # Take the minimum value for safety available_margin = min(balance, free_margin) # Calculate the maximum volume taking into account the margin margin_ratio = 0.1 # Use only 10% of the available margin base_volume = (available_margin * margin_ratio) / 1000 # Limit to maximum volume max_volume = min(base_volume, 1.0) # max 1 lot
Repare no valor margin_ratio = 0.1 . Isso não é aleatório. Depois de meses de testes, percebi que utilizar mais de 10% da margem disponível aumenta muito o risco de chamada de margem durante grandes movimentos do mercado. Isso é especialmente crítico ao operar com vários pares ao mesmo tempo.
Outro ponto importante é o cálculo de stop-loss e take-profit:
async def calculate_position_limits(self, signal: Dict) -> Tuple[float, float]: try: pair = signal['pair'] direction = signal['direction'] # Get volatility volatility = signal['price_volatility'] economic_volatility = signal['economic_volatility'] # Base values in pips base_sl = 20 base_tp = 40 # Adjust for volatility volatility_factor = 1 + (volatility * 2) sl_points = base_sl * volatility_factor tp_points = base_tp * volatility_factor # Take economic volatility into account if economic_volatility > 0.5: sl_points *= 1.5 tp_points *= 1.2 # Check minimum distances info = self.symbols_info[pair] min_stop_level = info.trade_stops_level if hasattr(info, 'trade_stops_level') else 0 return max(sl_points, min_stop_level), max(tp_points, min_stop_level) except Exception as e: logger.error(f"Error calculating position limits: {e}") return 20, 40 # return base values in case of an error
A história do volatility_factor é especialmente interessante. No início eu usava stops fixos, mas logo percebi que, em períodos de alta volatilidade, eles eram acionados cedo demais, sendo derrubados com frequência. A correção dinâmica dos stops com base na volatilidade atual melhorou muito os resultados das operações.
Veja como é estruturado o sistema de gerenciamento de posições:
async def manage_positions(self): """Managing open positions""" try: positions = mt5.positions_get() or [] for position in positions: if position.magic == self.magic: # Check the time in the position time_in_trade = datetime.now() - pd.to_datetime(position.time, unit='s') # Get current market data signal = await self.get_combined_signal(position.symbol) # Check the need to modify the position if self._should_modify_position(position, signal, time_in_trade): await self._modify_position(position, signal) # Check the closing conditions if self._should_close_position(position, signal, time_in_trade): await self.close_position(position) except Exception as e: logger.error(f"Error managing positions: {e}")
O foco aqui está no tempo de permanência na posição. A prática mostrou que, quanto mais tempo uma posição fica aberta, mais rigorosas devem ser as condições para mantê-la. Isso foi implementado por meio de um endurecimento dinâmico dos critérios de manutenção da posição conforme o tempo passa.
Um aspecto interessante é o fechamento parcial de posições:
def calculate_partial_close(self, position, profit_threshold: float = 0.5) -> float: """Volume calculation for partial closure""" try: # Check the current profit if position.profit <= 0: return 0.0 profit_ratio = position.profit / (position.volume * 1000) # approximate ROI estimate if profit_ratio >= profit_threshold: # Close half of the position when the profit threshold is reached return position.volume * 0.5 return 0.0 except Exception as e: logger.error(f"Error calculating partial close: {e}") return 0.0
Essa função surgiu depois da análise de operações. Notei que o fechamento parcial ao atingir um certo nível de lucro melhora bastante as estatísticas gerais da estratégia. Isso permite realizar parte do lucro enquanto ainda se mantém potencial para continuar ganhando caso o mercado siga na direção desejada.
Para encerrar, quero dizer que o sistema de gerenciamento de risco é um organismo vivo, que está sempre evoluindo. Cada operação malsucedida, cada movimento inesperado do mercado representa uma nova experiência, usada para aprimorar os algoritmos de proteção do capital. Nas próximas versões do sistema, está previsto o uso de aprendizado de máquina para otimização dinâmica dos parâmetros de risco, além de uma combinação entre sistema híbrido de VaR e a teoria de portfólio de Markowitz — mas isso já é outra história...
Módulo econômico: quando a análise fundamental encontra o aprendizado de máquina
Durante o desenvolvimento do sistema de trading, percebi uma regularidade interessante: até os sinais técnicos mais fortes podem falhar se forem contrários aos fatores fundamentais. Essa observação levou à criação do módulo econômico, um componente que analisa os indicadores macroeconômicos e seu impacto sobre os movimentos dos pares de moedas.
Vamos começar pela estrutura básica do módulo. Veja como é feita a inicialização dos principais indicadores econômicos:
def __init__(self): self.indicators = { 'NY.GDP.MKTP.KD.ZG': 'GDP growth', 'FP.CPI.TOTL.ZG': 'Inflation', 'FR.INR.RINR': 'Real interest rate', 'NE.EXP.GNFS.ZS': 'Exports', 'NE.IMP.GNFS.ZS': 'Imports', 'BN.CAB.XOKA.GD.ZS': 'Current account balance', 'GC.DOD.TOTL.GD.ZS': 'Government debt', 'SL.UEM.TOTL.ZS': 'Unemployment rate', 'NY.GNP.PCAP.CD': 'GNI per capita', 'NY.GDP.PCAP.KD.ZG': 'GDP per capita growth' }
A escolha desses indicadores não foi aleatória. Após analisar milhares de operações, percebi que esses dados são os que mais influenciam as tendências de longo prazo nos pares de moedas. A correlação entre taxa de juros real e movimento da moeda é particularmente interessante, pois mudanças nessa taxa geralmente antecedem reversões de tendência.
Para obter os dados econômicos, desenvolvi um método específico:
def fetch_economic_data(self): data_frames = [] for indicator, name in self.indicators.items(): try: data_frame = wbdata.get_dataframe({indicator: name}, country='all') data_frames.append(data_frame) except Exception as e: logger.error(f"Error fetching data for indicator '{indicator}': {e}") if data_frames: self.economic_data = pd.concat(data_frames, axis=1) return self.economic_data
Um ponto interessante aqui é o uso da biblioteca wbdata para coletar informações do Banco Mundial. Escolhi essa fonte depois de testar várias APIs, pois ela oferece dados mais completos e validados.
Dediquei atenção especial à preparação dos dados para análise:
def prepare_data(self, symbol_data): data = symbol_data.copy() data['close_diff'] = data['close'].diff() data['close_corr'] = data['close'].rolling(window=30).corr(data['close'].shift(1)) for indicator in self.indicators.keys(): if indicator in self.economic_data.columns: data[indicator] = self.economic_data[indicator].ffill() data.dropna(inplace=True) return data
Repare no uso do forward fill para os indicadores econômicos. Essa abordagem não surgiu de imediato — primeiro tentei usar interpolação, mas descobri que, para dados econômicos, faz mais sentido utilizar o último valor conhecido.
O coração do módulo é o sistema de previsão:
def forecast(self, symbol, symbol_data): if len(symbol_data) < 50: return None, None X = symbol_data.drop(columns=['close']) y = symbol_data['close'] X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False) model = CatBoostRegressor(iterations=1000, learning_rate=0.1, depth=8, loss_function='RMSE') model.fit(X_train, y_train, verbose=False)
A escolha do CatBoost como algoritmo de aprendizado de máquina também não foi aleatória. Após testar diversos modelos (de regressão linear simples a redes neurais complexas), descobri que o CatBoost lida melhor com a natureza irregular dos dados econômicos.
A etapa final é a interpretação dos resultados:
def interpret_results(self, symbol): forecast = self.forecasts.get(symbol) importance_df = self.feature_importances.get(symbol) if forecast is None or importance_df is None: return f"Insufficient data for interpretation of {symbol}" trend = "upward" if forecast[-1] > forecast[0] else "downward" volatility = "high" if forecast.std() / forecast.mean() > 0.1 else "low" top_feature = importance_df.iloc[0]['feature']
O cálculo da volatilidade é particularmente interessante. O limiar de 0.1 para definir alta volatilidade foi estabelecido com base na análise de dados históricos. Descobri que esse valor separa bem os períodos de mercado calmo daqueles com turbulência elevada.
Durante o desenvolvimento do módulo, fiz uma observação relevante: os fatores econômicos geralmente atuam com atraso, mas seu impacto é mais duradouro que o dos fatores técnicos. Isso levou à criação de um sistema de pesos, no qual a importância dos sinais econômicos aumenta nos timeframes mais longos.
Claro, o módulo econômico não é uma varinha mágica, ele não prevê todos os movimentos do mercado. Mas quando combinado com análise técnica e de volumes, ele oferece uma dimensão adicional para compreender os processos do mercado. Nas próximas versões do sistema, está prevista a inclusão da análise de fluxos de notícias e seu impacto nos indicadores econômicos — mas isso é tema para outro momento.
Módulo de arbitragem: em busca da verdade no preço
A ideia de criar o módulo de arbitragem surgiu depois de longas observações sobre o mercado cambial. Notei uma regularidade interessante: frequentemente os preços reais dos pares de moedas se desviam de seu valor teórico, calculado por meio de cotações cruzadas. Esses desvios criam oportunidades de arbitragem, mas o mais importante é que podem servir como indicador do movimento futuro dos preços.
Vamos começar pela estrutura básica do módulo:
class ArbitrageModule: def __init__(self, terminal_path: str = "C:/Program Files/RannForex MetaTrader 5/terminal64.exe", max_trades: int = 10): self.terminal_path = terminal_path self.MAX_OPEN_TRADES = max_trades self.symbols = [ "AUDUSD.ecn", "AUDJPY.ecn", "CADJPY.ecn", "AUDCHF.ecn", "AUDNZD.ecn", "USDCAD.ecn", "USDCHF.ecn", "USDJPY.ecn", "NZDUSD.ecn", "GBPUSD.ecn", "EURUSD.ecn", "CADCHF.ecn", "CHFJPY.ecn", "NZDCAD.ecn", "NZDCHF.ecn", "NZDJPY.ecn", "GBPCAD.ecn", "GBPCHF.ecn", "GBPJPY.ecn", "GBPNZD.ecn", "EURCAD.ecn", "EURCHF.ecn", "EURGBP.ecn", "EURJPY.ecn", "EURNZD.ecn" ]
Vale dar atenção especial à estrutura das pares-base usados para o cálculo dos cross-rates:
self.usd_pairs = { "EUR": "EURUSD.ecn", "GBP": "GBPUSD.ecn", "AUD": "AUDUSD.ecn", "NZD": "NZDUSD.ecn", "USD": None, "CAD": ("USDCAD.ecn", True), "CHF": ("USDCHF.ecn", True), "JPY": ("USDJPY.ecn", True) }
Aqui temos um detalhe curioso: algumas pares são marcadas como invertidas (True). Isso não é por acaso, já que para certas moedas como CAD, CHF e JPY, a cotação base é USD/XXX, e não XXX/USD. Esse é um detalhe importante que muitos ignoram ao calcular os cross-rates.
O coração do módulo é a função de cálculo de preços sintéticos.
def calculate_synthetic_prices(self, data: Dict[str, pd.DataFrame]) -> pd.DataFrame: """Calculation of synthetic prices through cross rates""" synthetic_prices = {} try: for symbol in self.symbols: base = symbol[:3] quote = symbol[3:6] # Calculate the synthetic price using cross rates fair_price = self.calculate_cross_rate(base, quote, data) synthetic_prices[f'{symbol}_fair'] = pd.Series([fair_price])
Lembro como me dediquei para otimizar esse código. No começo, tentei calcular todos os caminhos possíveis de conversão e escolher o mais eficiente. Mas no fim, descobri que o cálculo direto via USD oferece resultados mais estáveis, especialmente em ambientes de alta volatilidade.
Interessante também é a função de cálculo da taxa para USD:
def get_usd_rate(self, currency: str, data: dict) -> float: """Get exchange rate to USD""" if currency == "USD": return 1.0 pair_info = self.usd_pairs[currency] if isinstance(pair_info, tuple): pair, inverse = pair_info rate = data[pair]['close'].iloc[-1] return 1 / rate if inverse else rate else: pair = pair_info return data[pair]['close'].iloc[-1]
Essa função surgiu depois de longos experimentos com diferentes métodos de cálculo de cross-rates. O ponto crucial aqui é o tratamento correto dos pares invertidos. Um cálculo incorreto em apenas um par pode levar a uma cascata de erros nos preços sintéticos.
Para trabalhar com dados reais, desenvolvi uma função especial:
def get_mt5_data(self, symbol: str, count: int = 1000) -> Optional[pd.DataFrame]:
try:
timezone = pytz.timezone("Etc/UTC")
utc_from = datetime.now(timezone) - timedelta(days=1)
ticks = mt5.copy_ticks_from(symbol, utc_from, count, mt5.COPY_TICKS_ALL)
if ticks is None:
logger.error(f"Failed to fetch data for {symbol}")
return None
ticks_frame = pd.DataFrame(ticks)
ticks_frame['time'] = pd.to_datetime(ticks_frame['time'], unit='s')
return ticks_frame
A escolha da quantidade de ticks (1000) é um compromisso entre precisão nos cálculos e velocidade de processamento de dados. Na prática, revelou-se suficiente para determinar de forma confiável o preço justo.
Durante o trabalho no módulo, fiz uma observação interessante: discrepâncias entre o preço real e o sintético frequentemente aparecem antes de movimentos significativos de mercado. Como se o dinheiro inteligente começasse a mover alguns pares, criando tensão no sistema de cross-rates, que depois se dissipa em um movimento forte.
Claro, o módulo de arbitragem não é uma varinha mágica, mas, em combinação com análise de volumes e indicadores econômicos, oferece uma dimensão extra para entender o mercado. Nas próximas versões planeja-se adicionar análise de correlações entre desvios em diferentes pares, e isso já é outra história.
Considerações finais
Sabe, quando comecei este projeto, eu nem fazia ideia do que sairia dali. Pensei que bastava conectar Python ao MQL5 e estaria tudo resolvido. Acabou que se transformou em toda uma plataforma de trading! Cada parte dela funciona como um componente de relógio suíço, e este artigo é apenas o primeiro de muitos.
Ao longo do desenvolvimento, aprendi muita coisa. Por exemplo, que não existem atalhos no algotrading. Pegue o cálculo do tamanho da posição: parece simples, mas quando você começa a considerar todos os riscos e o comportamento do mercado, a cabeça começa a girar.
E como a arquitetura modular é incrível! Se um módulo falha, os outros continuam funcionando. Dá para aprimorar cada parte com calma, sem temor de quebrar o sistema inteiro.
E como a arquitetura modular funciona bem! Um módulo busca arbitragem, outro monitora volumes, o terceiro analisa economia e o quarto controla riscos. Juntos, eles veem o mercado de uma forma que nenhum único tipo de análise consegue.
Claro, ainda há espaço para crescer. Quero adicionar análise de notícias, aprimorar o aprendizado de máquina, desenvolver novos modelos de avaliação de riscos. O aspecto mais fascinante de trabalhar com visualização 3D do mercado é a possibilidade de representar preço, volume e tempo em um mesmo espaço.
A principal lição deste projeto é que um sistema de trading deve ser vivo. O mercado não fica parado, e o sistema precisa mudar junto com ele. Aprender com os erros, descobrir novos padrões e abandonar abordagens obsoletas.
Espero que minha experiência seja útil a quem também desenvolve algoritmos de trading. E lembre-se, neste ramo não existe linha de chegada. Há apenas a jornada!
Traduzido do russo pela MetaQuotes Ltd.
Artigo original: https://www.mql5.com/ru/articles/16667
Aviso: Todos os direitos sobre esses materiais pertencem à MetaQuotes Ltd. É proibida a reimpressão total ou parcial.
Esse artigo foi escrito por um usuário do site e reflete seu ponto de vista pessoal. A MetaQuotes Ltd. não se responsabiliza pela precisão das informações apresentadas nem pelas possíveis consequências decorrentes do uso das soluções, estratégias ou recomendações descritas.






- Aplicativos de negociação gratuitos
- 8 000+ sinais para cópia
- Notícias econômicas para análise dos mercados financeiros
Você concorda com a política do site e com os termos de uso