# Copyright 2026, MetaQuotes Ltd.
# https://www.mql5.com

# ... (Previous get_mt5_data and apply_hampel_filter code) ...
import MetaTrader5 as mt5
import polars as pl
import numpy as np
import sqlite3
from datetime import datetime


mt5.initialize()

# you code here
# 

# --- CONFIGURATION ---
SYMBOL = "NVDA"
TIMEFRAME = mt5.TIMEFRAME_H1
COUNT = 2000
DB_PATH = "C:/Users/ADMIN P/AppData/Roaming/MetaQuotes/Terminal/Common/Files/market_data.sqlite"

def get_mt5_data(symbol, timeframe, count):
    """Initializes MT5 and retrieves raw rates."""
    if not mt5.initialize():
        print(f"MT5 Initialize failed, error code: {mt5.last_error()}")
        return None
    
    rates = mt5.copy_rates_from_pos(symbol, timeframe, 0, count)
    mt5.shutdown()
    
    if rates is None:
        return None
        
    # Convert to Polars DataFrame immediately for O(N) performance [cite: 4]
    return pl.from_numpy(rates)

def apply_hampel_filter(df, window_size=10, n_sigmas=3):
    """
    Identifies outliers using the Hampel Filter (Median Absolute Deviation).
    Formula: |x_i - median| > 3 * MAD
    """
    # Calculate rolling median and MAD using Polars expressions
    df = df.with_columns([
        pl.col("close").rolling_median(window_size=window_size).alias("median"),
    ])
    
    # Calculate MAD: Median(|x_i - median|)
    df = df.with_columns([
        (pl.col("close") - pl.col("median")).abs().rolling_median(window_size=window_size).alias("mad")
    ])
    
    # Define Outlier Mask [cite: 21]
    # Standard deviation constant for MAD is ~1.4826
    df = df.with_columns([
        pl.when((pl.col("close") - pl.col("median")).abs() > (n_sigmas * 1.4826 * pl.col("mad")))
        .then(pl.lit(None))
        .otherwise(pl.col("close"))
        .alias("cleaned_close")
    ])
    
    # Interpolate missing values (outliers and gaps)
    return df.with_columns(pl.col("cleaned_close").interpolate())


def validate_stationarity(df):
    """
    Implements Log-Return Validation to ensure data quality.
    Formula: r_t = ln(P_t / P_{t-1})
    """
    # 1. Calculate Log Returns
    df = df.with_columns([
        (pl.col("cleaned_close") / pl.col("cleaned_close").shift(1)).ln().alias("log_return")
    ])

    # 2. Validation Check: Ensure no infinite values or extreme outliers remain
    # In a production "Data-First" pipeline, we check for 'stationarity' 
    # by ensuring the mean of returns is near zero.
    mean_return = df["log_return"].mean()
    
    if mean_return is not None and abs(mean_return) > 0.01:
        print(f"Warning: Data may be non-stationary. Mean log-return: {mean_return}")
        
    return df

def upload_to_sqlite(df, db_path):
    """Bulk imports the refined and validated Polars output."""
    conn = sqlite3.connect(db_path)
    
    # We now include the log_return column for potential use in the RMQ Gatekeeper
    data_to_save = df.select([
        pl.col("time"),
        pl.col("open"),
        pl.col("high"),
        pl.col("low"),
        pl.col("cleaned_close").alias("close"),
        pl.col("log_return").fill_null(0) # Ensure no NULLs enter the SQLite bridge
    ]).to_dicts()

    cursor = conn.cursor()
    cursor.execute("DROP TABLE IF EXISTS MarketData")
    cursor.execute("""
        CREATE TABLE MarketData (
            time INTEGER PRIMARY KEY,
            open REAL,
            high REAL,
            low REAL,
            close REAL,
            log_return REAL
        )
    """)
    
    cursor.executemany("""
        INSERT INTO MarketData (time, open, high, low, close, log_return) 
        VALUES (:time, :open, :high, :low, :close, :log_return)
    """, data_to_save)
    
    conn.commit()
    conn.close()

if __name__ == "__main__":
    raw_df = get_mt5_data(SYMBOL, TIMEFRAME, COUNT)
    if raw_df is not None:
        # Step 1: Cleaning
        refined_df = apply_hampel_filter(raw_df)
        # Step 2: Log-Return Validation (The missing piece)
        validated_df = validate_stationarity(refined_df)
        # Step 3: Upload
        upload_to_sqlite(validated_df, DB_PATH)

mt5.shutdown()
