"""
model_development.py
--------------------
Production model development pipeline with optional Optuna HPO integration.

This module serves as the central orchestrator for the Machine Learning Blueprint,
integrating high-performance data processing, advanced labeling techniques,
and rigorous cross-validation protocols designed for non-IID financial data.

The pipeline now supports two training paths controlled by
`model_params['use_optuna']`:

    False (default): clf_hyper_fit_cached via GridSearchCV / RandomizedSearchCV.
    True:            optimize_trading_model via Optuna + HyperbandPruner.

When use_optuna=True the following changes apply:
  - Weight computation for HPO is handled internally by _WeightedEstimator;
    get_optimal_sample_weight is still run for meta-features and reporting.
  - train_model dispatches to _train_model_optuna.
  - self.study is populated with the completed Optuna study for visualization.

When calibrate=True (run() parameter):
  - CalibratorCV is fitted after train_model(), wrapping best_model.
  - The calibrator uses PurgedKFold with the same n_splits and pct_embargo
    as the HPO step to prevent temporal leakage.
  - self.calibrator_ is populated with the fitted CalibratorCV.
  - At inference time, best_model.predict_proba() returns calibrated
    probabilities directly — no additional step required.
  - ONNX export unwraps the calibrator and exports the inner estimator only.
    Apply calibrator_.calibrator_.predict() as a post-processing step on
    the ONNX model's raw probabilities in the deployed environment.

Architecture Overview:
This pipeline represents a production-grade implementation of the "Advances in
Financial Machine Learning" (AFML) framework. It orchestrates the complex
interaction between labeling, sample weighting, and cross-validation while
maintaining rigorous data integrity through time-aware caching.

Key AFML Methodologies Implemented:
----------------------------------
1. Triple-Barrier Method (TBM):
   Moves beyond fixed-horizon labeling by utilizing dynamic profit-taking,
   stop-loss, and time-exhaustion barriers. This captures the path-dependency
   essential for realistic trading strategy modeling.

2. Sample Weighting & Time Decay:
   Addresses the issue of overlapping outcomes in financial time series.
   The pipeline searches for optimal weights using:
     - Uniqueness (tW): Weights inverse to the concurrency of labels.
     - Return (w): Weights based on the absolute magnitude of the price move.
     - Time Decay: Both linear and exponential decay to prioritize recent data.

3. Purged & Embargoed Cross-Validation:
   Prevents information leakage by removing training observations that overlap
   with the test set (purging) and adding a buffer following the test set
   (embargo) to account for serial correlation.

4. Meta-Feature Engineering:
   The pipeline calculates rolling performance metrics (Accuracy, Precision,
   Recall, F1) using Numba-accelerated functions. These "self-referential"
   metrics are fed back into the model, allowing it to adapt to changing
   market regimes and its own recent performance.

Pipeline Workflow:
-----------------
1. Data Loading: Fetches tick data and constructs specialized bars.
2. Feature Engineering: Generates primary indicators and time-based features.
3. Label Generation: Applies the Triple-Barrier Method to define 'bin' targets.
4. Weight Optimization: Evaluates multiple weighting schemes to find the best
   fit for the current market environment.
5. Meta-Feature Integration: Joins rolling performance metrics to the feature set.
6. Training/HPO: Executes either Scikit-learn or Optuna-based hyperparameter
   optimization with Purged-KFold validation.
7. Calibration (optional): Wraps best_model in CalibratorCV to correct
   systematic overconfidence before position sizing.
8. Reporting: Generates HTML summaries and hyperparameter importance reports.

Meta-Labeling Flow:
------------------
Primary models (is_primary=True or auto-detected) can hand off to a secondary
pipeline via prepare_meta_labeling_inputs(), which returns the events DataFrame
annotated with the primary model's predicted side column. Pass the result
directly as the `events` argument when constructing the secondary pipeline.

When strategy=None the pipeline uses every bar as a potential entry point with
no directional side signal, producing a symmetric triple-barrier label space
(bin ∈ {-1, 0, 1}). This is the correct default for a purely ML-driven pipeline
with no pre-defined entry logic.
"""

import inspect
import json
import sys
import time
import warnings
from datetime import datetime
from pathlib import Path
from pprint import pformat
from typing import Dict, Optional, Tuple, Union

import numpy as np
import pandas as pd
from feature_engine.selection import DropConstantFeatures, DropDuplicateFeatures
from loguru import logger
from sklearn import clone
from sklearn.ensemble import BaggingClassifier, RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier
from tqdm import tqdm

from ..cache import (
    IncrementalBarCache,
    cacheable,
    get_cache_monitor,
    log_data_access,
    print_contamination_report,
)
from ..cross_validation.cross_validation import PurgedKFold, ml_cross_val_score
from ..data_structures import _INFO_BAR_TYPES, calculate_ticks_per_period, make_bars
from ..ensemble.sb_bagging import SequentiallyBootstrappedBaggingClassifier
from ..features.meta_label_features import (
    add_meta_label_features,
)
from ..features.microstructure import compute_all_microfeatures
from ..features.trading_session import get_time_features
from ..labeling.trend_scanning import trend_scanning_labels
from ..labeling.triple_barrier import add_vertical_barrier, get_event_weights, triple_barrier_labels
from ..mt5.tick_data_loader import tick_data_loader as loader
from ..sample_weights.optimized_attribution import get_weights_by_time_decay_optimized
from ..strategies.signal_processing import get_entries
from ..strategies.trading_strategies import BaseStrategy
from ..util.pipelines import MyPipeline, make_custom_pipeline, set_pipeline_params

# Bagging classifiers subsample X to NumPy internally, so their inner estimators
# are fitted without feature names. Passing a named DataFrame at inference time
# then triggers a benign "X has feature names, but ... was fitted without feature
# names" warning. Column order is preserved, so predictions are unaffected.
warnings.filterwarnings(
    "ignore",
    message="X has feature names, but .* was fitted without feature names",
    category=UserWarning,
)

# ============================================================================
# Cached data helpers
# ============================================================================

bar_cache = IncrementalBarCache()


def load_and_prepare_training_data(
    symbol: str,
    start_date: str,
    end_date: str,
    account_name: str,
    bar_type: str,
    bar_size: Union[int, str] = 100,
    price: str = "mid_price",
    path: Optional[Path] = None,
    # --- Information bar calibration (preferred entry point) ---
    target_timeframe: Optional[str] = None,
    ewm_span: int = 100,
    # --- Escape hatches (mutually exclusive with target_timeframe) ---
    exp_ticks_init: Optional[Union[int, float]] = None,
    exp_imbalance_init: Optional[float] = None,
    verbose: bool = True,
) -> pd.DataFrame:
    """
    Load tick data and construct OHLC bars for training or analysis.

    This wrapper encapsulates the full pipeline: tick data retrieval,
    calibration of information bar parameters, bar construction, and
    audit logging. It surfaces the target-timeframe API as the default
    entry point for information bars, reserving raw parameter control
    for advanced users.

    Parameters
    ----------
    symbol : str
        Trading instrument symbol (e.g., 'EURUSD', 'AAPL', 'BTCUSD').
    start_date : str
        Training window start date, ISO format ('YYYY-MM-DD').
    end_date : str
        Training window end date, ISO format ('YYYY-MM-DD').
    account_name : str
        MT5 account identifier for tick data source (e.g., 'live', 'icmarkets').
    bar_type : str
        Specifies the bar sampling method. Valid values:

        Standard bars (clock or tick-based):
            'tick'    : fixed number of ticks per bar
            'time'    : fixed time interval per bar
            'volume'  : fixed cumulative volume per bar
            'dollar'  : fixed cumulative dollar per bar

        Information bars (information-driven sampling):
            'tick_imbalance'   : sample on cumulative tick imbalance
            'volume_imbalance' : sample on cumulative volume imbalance
            'dollar_imbalance' : sample on cumulative dollar imbalance
            'tick_runs'        : sample on dominant directional run (ticks)
            'volume_runs'      : sample on dominant directional run (volume)
            'dollar_runs'      : sample on dominant directional run (dollar)

    bar_size : int or str, default=100
        Standard bars only; ignored for information bars.

            For 'time' bars: MT5 timeframe string (e.g., 'M5', 'M15', 'H1').
            For 'tick', 'volume', 'dollar' bars: numeric count.
            For 'tick' bars, a timeframe string is auto-converted via
            calculate_ticks_per_period.

    price : str, default='mid_price'
        Price column used for OHLC construction. One of:
            'mid_price' : (bid + ask) / 2
            'bid'       : bid prices
            'ask'       : ask prices
            'bid_ask'   : separate OHLC for bid and ask (not supported
                          for information bars)

    path : Path, optional
        Override the loader's default tick data directory. If None,
        uses the configured path.

    Information Bar Calibration
    ===========================

    For information bar types ('*_imbalance', '*_runs'), calibration of the
    initial threshold parameters is required. Choose exactly one path:

    Path A: Auto-calibration (recommended)
    ------
    Pass ``target_timeframe`` (e.g., 'M15'). The function derives E_0[T]
    and E_0[|imbalance|] from the tick data so that bars close at roughly
    the target clock-time cadence on average.

    The realized cadence fluctuates with volatility and information flow
    — it is an anchor, not a hard guarantee. After the first ~``ewm_span``
    bars, the threshold becomes data-driven and independent of initial values.

    Example:

        >>> bars = load_and_prepare_training_data(
        ...     symbol="EURUSD",
        ...     start_date="2022-01-01",
        ...     end_date="2023-12-31",
        ...     account_name="live",
        ...     bar_type="dollar_imbalance",
        ...     target_timeframe="M15",
        ... )

    Path B: Manual calibration (advanced)
    ------
    Pass ``exp_ticks_init`` and optionally ``exp_imbalance_init``.
    Both are the initial values for the AFML threshold:

        |θ_T| ≥ E_0[T] · |E_0[imbalance per tick]|

    If ``exp_imbalance_init`` is omitted, it defaults to 0.1.

    Example:

        >>> bars = load_and_prepare_training_data(
        ...     symbol="SPX",
        ...     start_date="2023-01-01",
        ...     end_date="2023-12-31",
        ...     account_name="icmarkets",
        ...     bar_type="tick_imbalance",
        ...     exp_ticks_init=5000,
        ...     exp_imbalance_init=0.02,
        ... )

    Constraint: Passing both ``target_timeframe`` and ``exp_ticks_init``
    raises ``ValueError``. The mutual exclusivity enforces a clear contract:
    either specify the desired cadence (Path A) or the threshold parameters
    directly (Path B), not both.

    target_timeframe : str, optional
        Information bars only. MT5 timeframe string representing the target
        bar cadence. Examples: 'M1', 'M5', 'M15', 'H1', 'D1'.
        Mutually exclusive with ``exp_ticks_init``.

    ewm_span : int, default=20
        Information bars only. Exponential-weighted-moving average span
        (in bars) controlling how quickly the threshold adapts after each
        bar closes. Smaller values (5–10) adapt faster to changing tick
        rates; larger values (30–50) provide more stable transient behavior.

        The adaptation primarily affects the first ``ewm_span`` bars;
        after that, the threshold converges to its data-driven equilibrium.

    exp_ticks_init : int, float, or None
        Information bars only. Initial E_0[T] (expected ticks per bar)
        for manual calibration. Must be a positive number; typical range
        is 100–10,000 depending on instrument tick rate and target cadence.

        Mutually exclusive with ``target_timeframe``. If neither
        ``target_timeframe`` nor ``exp_ticks_init`` is provided for an
        information bar type, raises ``ValueError``.

    exp_imbalance_init : float, optional
        Information bars only. Initial E_0[|imbalance per tick|] for
        manual calibration.

        For tick imbalance/runs bars: represents expected directional bias,
        bounded above by 1.0 (since b_t ∈ {-1, +1}). Typical range: 0.01–0.1.
        For volume/dollar variants: scaled by average tick volume or
        dollar value, so ranges are higher. See make_bars docstring.

        If ``exp_ticks_init`` is provided but this is omitted, defaults
        to 0.1. Ignored if ``target_timeframe`` is provided.

    verbose : bool, default=True
        Log bar construction diagnostics, bar/tick counts, and DataFrame
        structure. Set to False to suppress logging for batch processing.

    Returns
    -------
    pd.DataFrame
        OHLC bars indexed by bar-close time (DatetimeIndex, UTC).
        Columns (in order):

            open, high, low, close    : float64, OHLC prices
            spread                    : float64, mean bid-ask spread per bar
            spread_bps                : float64, mean spread in basis points
            tick_volume               : float64, number of ticks per bar
            volume                    : float64 (if present in source), sum
                                        of cumulative tick volumes per bar
            tick_num                  : int64 (always), 1-based global tick
                                        index at which the bar closed

    Raises
    ------
    ValueError
        If ``bar_type`` is information but both ``target_timeframe`` and
        ``exp_ticks_init`` are provided. Pass one or the other.

    ValueError
        If ``bar_type`` is information but neither ``target_timeframe`` nor
        ``exp_ticks_init`` is provided. At least one is required.

    KeyError
        If ``bar_type`` requires 'volume' column (volume_* or dollar_* variants)
        but tick data lacks it.

    TypeError
        If tick DataFrame cannot be indexed by datetime.

    Notes
    -----
    1. **Caching & reproducibility**: Tick data is loaded once and cached;
       repeated calls with identical (symbol, date range, account) retrieve
       from cache. The cache is time-aware to prevent forward-looking bias
       in walk-forward backtests.

    2. **Audit logging**: Each call logs the dataset name, date range, shape,
       and purpose ('train', 'val', 'test') to an access log. This trail
       enables contamination detection if the same data is used in multiple
       roles.

    3. **EWM transient**: The initial E_0[T] and E_0[|imbalance|] only
       influence bar closure for the first ~``ewm_span`` bars. After that,
       the threshold is entirely determined by the empirical tick dynamics.
       This is why auto-calibration is safe — poor initial guesses only
       cause a brief transient, not systematic miscalibration.

    4. **Information bars are adaptive**: Unlike standard bars, which have
       a fixed closure rule, information bars' closure thresholds adapt
       per-bar. This makes them robust to changing market microstructure
       but also means the realized bar count will fluctuate even with
       identical initial parameters across different time windows.

    Examples
    --------
    **Standard tick bars (no calibration needed):**

    >>> bars = load_and_prepare_training_data(
    ...     symbol="AAPL",
    ...     start_date="2023-01-01",
    ...     end_date="2023-12-31",
    ...     account_name="tws",
    ...     bar_type="tick",
    ...     bar_size=500,
    ... )

    **Dollar imbalance bars, auto-calibrated to M15 cadence:**

    >>> bars = load_and_prepare_training_data(
    ...     symbol="EURUSD",
    ...     start_date="2022-01-01",
    ...     end_date="2023-12-31",
    ...     account_name="live",
    ...     bar_type="dollar_imbalance",
    ...     target_timeframe="M15",
    ... )

    **Tick imbalance bars, manual calibration:**

    >>> bars = load_and_prepare_training_data(
    ...     symbol="BTC/USDT",
    ...     start_date="2023-06-01",
    ...     end_date="2023-12-31",
    ...     account_name="binance",
    ...     bar_type="tick_imbalance",
    ...     exp_ticks_init=300,
    ...     exp_imbalance_init=0.01,
    ... )

    **Volume runs bars with custom EWM adaptation:**

    >>> bars = load_and_prepare_training_data(
    ...     symbol="ES",
    ...     start_date="2024-01-01",
    ...     end_date="2024-03-31",
    ...     account_name="icmarkets",
    ...     bar_type="volume_runs",
    ...     target_timeframe="M5",
    ...     ewm_span=30,  # slower adaptation
    ... )
    """
    # -----------------------------------------------------------------------
    # Load tick data
    # -----------------------------------------------------------------------

    if path is not None:
        loader.path = Path(path)

    tick_df = loader.get_tick_data(symbol, start_date, end_date, account_name)

    # -----------------------------------------------------------------------
    # Resolve bar_size for standard bars (e.g., 'M5' → numeric tick count)
    # -----------------------------------------------------------------------

    if bar_type == "tick" and isinstance(bar_size, str):
        bar_size = calculate_ticks_per_period(tick_df, bar_size)

        # -----------------------------------------------------------------------
    # Validate calibration parameters for information bars
    # -----------------------------------------------------------------------

    is_info_bar = bar_type in _INFO_BAR_TYPES

    if is_info_bar:
        raw_params_given = (exp_ticks_init is not None) or (exp_imbalance_init is not None)

        if target_timeframe is not None and raw_params_given:
            raise ValueError(
                f"For information bar type '{bar_type}': pass either "
                f"'target_timeframe' (recommended, auto-calibrated) OR "
                f"'exp_ticks_init'/'exp_imbalance_init' (advanced, manual), "
                f"not both. See docstring for examples."
            )

        if target_timeframe is None and exp_ticks_init is None:
            raise ValueError(
                f"Information bar type '{bar_type}' requires calibration. "
                f"Pass either 'target_timeframe' (e.g., 'M15') for "
                f"auto-calibration or 'exp_ticks_init' for manual control. "
                f"See docstring for examples."
            )

        bar_size = (
            target_timeframe if target_timeframe is not None else bar_size
        )  # For info bars, bar_size is effectively the target_timeframe for logging

    if not _INFO_BAR_TYPES.intersection({bar_type}):
        target_timeframe = None
        ewm_span = None
        exp_ticks_init = None
        exp_imbalance_init = None

    # -----------------------------------------------------------------------
    # Construct bars via the caching layer
    # -----------------------------------------------------------------------

    data = bar_cache.get_or_extend(
        tick_df=tick_df,
        bar_type=bar_type,
        bar_size=bar_size,
        price=price,
        target_timeframe=target_timeframe,
        ewm_span=ewm_span,
        exp_ticks_init=exp_ticks_init,
        exp_imbalance_init=exp_imbalance_init,
        verbose=True,
    )

    # -----------------------------------------------------------------------
    # Audit log for contamination tracking
    # -----------------------------------------------------------------------

    log_data_access(
        dataset_name=f"{symbol}_{bar_type}_{bar_size}_{price}".lower(),
        start_date=data.index[0],
        end_date=data.index[-1],
        purpose="train",
        data_shape=data.shape,
    )

    if verbose:
        logger.info(
            f"Loaded {data.shape[0]:,} {bar_type} bars for {symbol} "
            f"({start_date} to {end_date}, {price})"
        )

    return data


@cacheable(time_aware=True)
def _make_bars(
    tick_df,
    bar_type,
    bar_size,
    price,
    target_timeframe,
    ewm_span,
    exp_ticks_init,
    exp_imbalance_init,
    exp_runs_buy_init,
    exp_runs_sell_init,
    add_tick_sign,
):
    return make_bars(
        tick_df=tick_df,
        bar_type=bar_type,
        bar_size=bar_size,
        price=price,
        verbose=True,
        target_timeframe=target_timeframe,
        ewm_span=ewm_span,
        exp_ticks_init=exp_ticks_init,
        exp_imbalance_init=exp_imbalance_init,
        exp_runs_buy_init=exp_runs_buy_init,
        exp_runs_sell_init=exp_runs_sell_init,
        add_tick_sign=add_tick_sign,
    )


@cacheable(time_aware=True)
def create_feature_engineering_pipeline(
    data: pd.DataFrame, ticks: pd.DataFrame, feature_config: Dict, data_config: Dict
) -> pd.DataFrame:
    """
    Compute engineered features with caching.

    Parameters
    ----------
    data : pd.DataFrame
        Input bar data.
    ticks : pd.DataFrame
        Input bar data.
    feature_config : dict
        Feature configuration.
        Expected keys:
        - func : callable
            Function that computes features from a DataFrame.
        - params : dict
            Parameters passed to `func`.
    data_config:
        Data configuration.
        Expected keys:
        - bar_size : str
            Bar size using MT5 naming conventions, e.g., M1, H1, D1.
        - bar_type : str
            Bar type should be one of "time", "tick", "volume", "dollar"

    Returns
    -------
    pd.DataFrame
        Feature matrix.

    Notes
    -----
    - Prevents data leakage via time-aware caching.
    """
    func = feature_config["func"]
    func_feat = func(data, **feature_config["params"])

    ## Determine if the symbol is a forex pair to apply appropriate time features.
    major_currencies = ["USD", "EUR", "JPY", "GBP", "AUD", "NZD", "CAD", "CHF"]
    is_forex = any(currency in data_config.get("symbol", "") for currency in major_currencies)

    time_feat = get_time_features(
        data,
        timeframe=data_config["bar_size"],
        bar_type=data_config["bar_type"],
        forex=is_forex,
    )
    micro_feat = compute_all_microfeatures(
        data, ticks, include_bar_features=(1 if "volume" in ticks.columns else 0)
    )
    trend_feat = (
        trend_scanning_labels(data["close"], span=(5, 100), lookforward=False)
        .drop(columns=["t1", "ret", "bin"])
        .rename(columns=lambda x: f"trend_{x}")
    )
    features = pd.concat([func_feat, time_feat, micro_feat.ffill(), trend_feat], axis=1)
    return features


@cacheable(time_aware=True)
def generate_events_triple_barrier(
    data: pd.DataFrame,
    strategy: Optional[BaseStrategy],
    target_config: dict,
    profit_target: float = 1,
    stop_loss: float = 1,
    max_holding_period: Dict[str, int] = dict(days=1),
    min_ret: float = 0.0,
    vertical_barrier_zero: bool = True,
    filter_as_series: bool = True,
    logger=None,
) -> pd.DataFrame:
    """
    Generate trading events using the triple-barrier method.

    Parameters
    ----------
    data : pd.DataFrame
        Price bars with 'close' column.
    strategy : BaseStrategy or None
        Strategy instance implementing `generate_signals()`. When None, every
        bar in `data` is treated as a potential entry point and no directional
        side signal is applied, producing a symmetric label space
        (bin ∈ {-1, 0, 1}).
    target_config : dict
        Volatility target configuration.
        - func: Volatility target function
        - params: Function parameters
    profit_target : float, default=1
        Profit-taking threshold multiplier.
    stop_loss : float, default=1
        Stop-loss threshold multiplier.
    max_holding_period : dict, default={'days': 1}
        Maximum holding period for vertical barrier.
    min_ret : float, default=0.0
        Minimum return threshold.
    vertical_barrier_zero : bool, default=True
        Set label to zero if vertical barrier is reached.
    filter_as_series : bool, default=True
        Pass volatility threshold as series instead of scalar.

    Returns
    -------
    pd.DataFrame
        Event labels with columns:
        - 'bin' : {-1, 0, 1} for primary; {0, 1} for secondary (meta-label)
        - 't1'  : vertical barrier timestamps
        - 'w'   : sample weights
        - 'tW'  : uniqueness weights
        - 'side': directional signal (only present for secondary models)

    Notes
    -----
    - Prevents data leakage via time-aware caching.
    """
    data_dict = dict(
        open=data["open"],
        high=data["high"],
        low=data["low"],
        close=data["close"],
        df=data,
        data=data,
        prices=data,
    )
    close = data["close"]
    target_func = target_config["func"]
    target_params = target_config["params"].copy()

    sig = inspect.signature(target_func)
    for key in sig.parameters.keys():
        if key not in target_params:
            target_params[key] = data_dict.get(key)

    try:
        target = target_func(**target_params)
    except Exception as e:
        if logger is None:
            from loguru import logger
        logger.error(f"Target function failed: {e}")
        raise

    # ── Entry generation ──────────────────────────────────────────────────────
    if strategy is None:
        # No pre-defined signal: use every bar as a candidate entry.
        # side=None → symmetric triple-barrier (bin ∈ {-1, 0, 1}).
        side = None
        t_events = close.index
    else:
        if filter_as_series is None:
            side = strategy.generate_signals(data)
            t_events = side[side != 0].index
        else:
            filter_threshold = target if filter_as_series else target.mean()
            side, t_events = get_entries(strategy, data, filter_threshold)

    vb = add_vertical_barrier(t_events, close, **max_holding_period)
    events = triple_barrier_labels(
        close,
        target,
        t_events,
        vertical_barrier_times=vb,
        side_prediction=side,
        pt_sl=[profit_target, stop_loss],
        min_ret=min_ret,
        min_pct=0.05,
        vertical_barrier_zero=vertical_barrier_zero,
        drop=True,
        verbose=False,
    )
    return get_event_weights(events, close)


# ============================================================================
# Pipeline Components
# ============================================================================


@cacheable(time_aware=True)
def _add_meta_label_features(events, prices, sample_weight):
    return add_meta_label_features(events, prices, sample_weight)


@cacheable(time_aware=True)
def best_weighting_scheme(
    classifier,
    X,
    y,
    cv_gen,
    scoring,
    sample_weight_train,
    sample_weight_score,
    scheme=None,
    best_score=0,
    best_scheme=None,
    cv_results=pd.DataFrame(),
):
    scores = ml_cross_val_score(
        classifier,
        X,
        y,
        cv_gen,
        sample_weight_train=sample_weight_train,
        sample_weight_score=sample_weight_score,
        scoring=scoring,
    )
    score = scores.mean()
    cv_results[scheme] = scores

    if not np.isinf(score) and score > best_score:
        best_score = score
        best_scheme = scheme

    return best_score, best_scheme, cv_results


def get_optimal_sample_weight(
    data_index: pd.DatetimeIndex,
    events: pd.DataFrame,
    features: pd.DataFrame,
    n_splits: int = 5,
    linear: bool = None,
    decay_factors: Union[list, np.ndarray] = np.arange(0.1, 1, 0.1),
) -> pd.Series:
    """
    Search-based optimization for sample weighting schemes.

    Financial Rationale:
    Financial observations are rarely IID. This function conducts a systematic
    search to find the weighting scheme that yields the highest cross-validated
    performance, effectively 'de-noising' the dataset.

    Evaluated Schemes:
    1. Uniqueness (tW): Weights samples based on how little they overlap with
       other concurrent labels.
    2. Return (w): Weights samples by the absolute log-return of the outcome.
    3. Time-Decay: Applies linear or exponential decay to prioritize recent
       market structure over distant history.

    Parameters
    ----------
    data_index: pd.DatetimeIndex
        Price data index.
    events : pd.DataFrame
        Event labels with uniqueness weights.
    features: pd.DataFrame
        Training features.
    n_splits : int, optional
        Number of cross-validation splits (default: 5).
    linear : bool, optional
        Default is None, which searches both linear and exponential time-decay.
        If True, use linear time-decay, if False, exponential.
    decay_factors: Union[list, np.ndarray]
        Time-decay factors to apply to best sample weight.

    Returns
    -------
    weights : pd.Series
        Computed sample weights.
    cv_results : dict
        Cross-validation results.
    """
    valid_index = features.dropna().index.intersection(events.index)
    cont = events.loc[valid_index]
    X = features.loc[valid_index]
    y = cont["bin"]

    classifier = RandomForestClassifier(
        criterion="entropy",
        class_weight="balanced_subsample",
        n_estimators=100,
        max_depth=4,
        min_weight_fraction_leaf=0.05,
        max_samples=cont["tW"].mean(),
        random_state=42,
    )

    cv_gen = PurgedKFold(n_splits=n_splits, t1=cont["t1"], pct_embargo=0.02)

    weights = {
        "return": cont["w"],
        "unweighted": pd.Series(1.0, index=cont.index),
        "uniqueness": cont["tW"],
    }

    best_score, best_scheme = 0, None
    cv_results = pd.DataFrame()
    scoring = "f1" if set(y.unique()) == {0, 1} else "neg_log_loss"

    pbar1 = tqdm(
        weights.items(),
        desc="Analyzing weighting schemes",
        total=len(weights),
        mininterval=1.0,
        leave=False,
        file=sys.stdout,
    )
    for i, (scheme, weight) in enumerate(pbar1, 1):
        best_score, best_scheme, cv_results = best_weighting_scheme(
            clone(classifier),
            X,
            y,
            cv_gen,
            scoring,
            weight,
            weights["return"],
            scheme,
            best_score,
            best_scheme,
            cv_results,
        )
        pbar1.set_postfix(
            {"scheme": scheme, "scoring": scoring, "score": f"{cv_results[scheme].mean():.4f}"}
        )

    best_weight = weights[best_scheme]
    linear_search = [1, 0] if linear is None else ([1] if linear else [0])

    time_decay_weights = {}
    for decay in decay_factors:
        for lin in linear_search:
            decay_vec = get_weights_by_time_decay_optimized(
                triple_barrier_events=cont,
                close_index=data_index,
                last_weight=decay,
                linear=lin,
                av_uniqueness=cont["tW"],
            )
            scheme = f"{best_scheme}_{'linear' if lin else 'exp'}_{decay}"
            time_decay_weights[scheme] = best_weight * decay_vec

    pbar2 = tqdm(
        time_decay_weights.items(),
        desc="Analyzing time-decay",
        total=len(time_decay_weights),
        mininterval=1.0,
        leave=False,
        file=sys.stdout,
    )
    for i, (scheme, weight) in enumerate(pbar2, 1):
        best_score, best_scheme, cv_results = best_weighting_scheme(
            clone(classifier),
            X,
            y,
            cv_gen,
            scoring,
            weight,
            weights["return"],
            scheme,
            best_score,
            best_scheme,
            cv_results,
        )
        pbar2.set_postfix(
            {"scheme": scheme, "scoring": scoring, "score": f"{cv_results[scheme].mean():.4f}"}
        )

    print(pformat({"best": best_scheme, "scoring": scoring, "score": float(best_score.round(4))}))

    weights.update(time_decay_weights)
    cv_results_dict = {
        "best_score": float(best_score),
        "cv_results": cv_results,
        "scoring": scoring,
        "best_scheme": best_scheme,
    }
    return weights[best_scheme], cv_results_dict


@cacheable(time_aware=True)
def fit_calibrated_model(
    base_pipeline,
    X: pd.DataFrame,
    y: pd.Series,
    sample_weight: pd.Series,
    sample_weight_score: pd.Series,
    t1: pd.Series,
    n_splits: int,
    pct_embargo: float,
    calibration_diagnostic: bool = False,
):
    """
    Fit a CalibratorCV and (optionally) compute the raw-vs-calibrated CV
    diagnostic, with caching.

    Both expensive operations — the deployment calibrator fit and the nested-CV
    diagnostic — are keyed on the calibration inputs (base estimator
    hyperparameters, feature/label/weight content, CV protocol and the
    diagnostic flag) so an identical configuration reuses the persisted result
    instead of refitting.  See ``ModelDevelopmentPipeline.calibrate_model`` for
    how the returned objects are wired into the pipeline.

    Parameters
    ----------
    base_pipeline : estimator
        Unfitted base pipeline (already wrapped via ``make_custom_pipeline``) to
        be cloned for the calibrator and the per-fold diagnostic.
    X : pd.DataFrame
        Preprocessed feature matrix aligned to ``y``.
    y : pd.Series
        Binary labels.
    sample_weight : pd.Series
        Training sample weights.
    sample_weight_score : pd.Series
        Scoring sample weights (used only by the diagnostic).
    t1 : pd.Series
        Event end times for PurgedKFold.
    n_splits : int
        Number of CV folds.
    pct_embargo : float
        Embargo fraction for PurgedKFold.
    calibration_diagnostic : bool, optional
        When True, run the costly nested-CV raw-vs-calibrated Brier diagnostic
        (default False).

    Returns
    -------
    calibrator : CalibratorCV
        The fitted deployment calibrator.
    oof_brier : float
        Out-of-fold Brier score of the calibrated probabilities.
    calibration_scores : pd.DataFrame or None
        Per-fold raw-vs-calibrated CV scores when the diagnostic is enabled,
        else None.  A failed diagnostic is non-fatal and yields None.
    """
    from ..calibration.calibration import CalibratorCV, analyze_calibrated_cross_val_scores

    cv = PurgedKFold(n_splits=n_splits, t1=t1, pct_embargo=pct_embargo)

    calibrator = CalibratorCV(estimator=clone(base_pipeline), cv=cv)
    calibrator.fit(X, y, sample_weight=sample_weight)

    valid = ~np.isnan(calibrator.oof_probs_)
    oof_brier = float(np.mean((calibrator.oof_probs_[valid] - y.values[valid]) ** 2))

    calibration_scores = None
    if calibration_diagnostic:
        try:
            _, calibration_scores, _ = analyze_calibrated_cross_val_scores(
                base_estimator=clone(base_pipeline),
                X=X,
                y=y,
                cv_gen=cv,
                sample_weight_train=sample_weight,
                sample_weight_score=sample_weight_score,
                calibrator_cv=cv,
            )
        except Exception as e:
            logger.warning(f"Calibration diagnostic (analyze_calibrated_*) skipped: {e}")

    return calibrator, oof_brier, calibration_scores


# ============================================================================
# ModelDevelopmentPipeline
# ============================================================================


class ModelDevelopmentPipeline:
    """
    The central production controller for Model Training and HPO.

    This class encapsulates the state of the model development lifecycle,
    ensuring that hyperparameters, feature names, and evaluation metrics
    are kept in sync with the physical artifacts saved to disk.

    Core Responsibilities:
    - Pre-processing: Removes constant and duplicate features to reduce
      model variance and training time.
    - Backend Switching: Transparently toggles between Scikit-learn (Grid)
      and Optuna (Bayesian) optimization based on 'use_optuna' config.
    - Artifact Management: Automatically organizes models, parquet data,
      and HTML reports into a versioned directory structure. The model
      filename is prefixed with the bagging wrapper type (sbag/bag/plain)
      so artifacts are immediately distinguishable in the file system.
    - Calibration (optional): Wraps the trained model in CalibratorCV
      to correct systematic overconfidence before downstream bet sizing.
    - Analysis: Triggers feature importance calculation and automated
      contamination reports after every successful run.

    Meta-Labeling:
    - Primary models (is_primary=True or auto-detected) can hand off to a
      secondary pipeline via prepare_meta_labeling_inputs().
    - strategy=None instructs the pipeline to use every bar as a candidate
      entry point with no directional signal (symmetric barriers).
    """

    def __init__(
        self,
        data_config: dict,
        feature_config: dict,
        target_config: dict,
        label_config: dict,
        model_params: dict,
        strategy: Optional[BaseStrategy] = None,
        is_primary: Optional[bool] = None,
        base_dir: str = "Models",
    ):
        """
        Initialize the pipeline with configuration parameters.

        Parameters
        ----------
        data_config : dict
            Bar construction configuration.
            - symbol : str
            - start_date : str  ('YYYY-MM-DD')
            - end_date : str    ('YYYY-MM-DD')
            - account_name : str
            - bar_type : str    ('tick', 'volume', 'time')
            - bar_size : str    (e.g. 'M1', 'M5')
            - price : str       ('bid', 'ask', 'mid_price', 'bid_ask')
            - path : Union[str, Path] = None
        strategy : BaseStrategy or None, optional
            Signal-generating strategy.  When None every bar is treated as a
            potential entry point and no directional side signal is applied,
            producing a symmetric label space (bin ∈ {-1, 0, 1}).  Pass a
            concrete strategy only when entry timing or direction comes from a
            rule-based signal (e.g. moving-average crossover for meta-labeling).
        feature_config : dict
            - func: Feature engineering function
            - params: Function parameters
        target_config : dict
            - func: Volatility target function
            - params: Function parameters
        label_config : dict
            - profit_target : float
            - stop_loss : float
            - max_holding_period : dict
            - min_ret : float
            - vertical_barrier_zero : bool
            - filter_as_series : bool
        model_params : dict
            - pipe_clf : BaseEstimator or Pipeline or MyPipeline
            - param_grid : dict or list of dicts
            - n_splits : int, default=5
            - bagging_n_estimators : int, default=0
            - bagging_sequential : bool, default=False
            - bagging_max_samples : float or int, default=1.0
            - bagging_max_features : float or int, default=1.0
            - rnd_search_iter : int, default=0
            - n_jobs : int, default=-1
            - pct_embargo : float, default=0.02
            - random_state : int or None, default=None
            - use_optuna : bool, default=False
            - n_trials : int  (Optuna only)
            - timeout : int   (Optuna only)
            - pruner_type : str, default='hyperband'
            - verbose : int, default=0
        is_primary : bool or None, optional
            Explicit role override.
            - True  → treat as primary model regardless of events content.
            - False → treat as secondary (meta-labeling) model.
            - None  → auto-detect: primary if 'side' not in events.columns
                       (default, preserves original behaviour).
        base_dir : str
            Root directory for saved artifacts.
        """
        from .file_manager import ModelFileManager

        if data_config["target_timeframe"] is not None:
            if data_config["bar_type"] in _INFO_BAR_TYPES:
                # For info bars, bar_size is effectively the target_timeframe
                # for logging and feature engineering purposes,
                # even though the actual bar construction is adaptive.
                data_config["bar_size"] = data_config["target_timeframe"]
            else:
                data_config["target_timeframe"] = None

        self.data_config = data_config
        self.symbol = data_config["symbol"]
        self.train_start = data_config["start_date"]
        self.train_end = data_config["end_date"]
        self.strategy = strategy
        self.feature_config = feature_config
        self.label_config = label_config
        self.target_config = target_config
        self.account_name = data_config.get("account_name", "default")
        self.pipeline_version = "4.4"
        self.model_params = model_params

        # Explicit is_primary override; None means auto-detect in generate_labels.
        self._is_primary_override = is_primary

        self.config = data_config.copy()
        self.config.pop("path", None)
        self.config["training_start"] = self.config.pop("start_date")
        self.config["training_end"] = self.config.pop("end_date")
        self.config["strategy"] = (
            strategy.get_strategy_name() if strategy is not None else "ml_driven"
        )
        self.config["feature_func"] = feature_config["func"].__name__
        self.config["feature_params"] = feature_config["params"]
        self.config["target_func"] = target_config["func"].__name__
        self.config["target_params"] = target_config["params"]
        self.config.update(label_config)

        # ── State ─────────────────────────────────────────────────────────────
        self.bar_data = None
        self.features = None
        self.events = None
        self.sample_weight = None
        self.best_weighting_scheme = None
        self.meta_features = None
        self.preprocessed_features = None
        self.preprocessor = None
        self.best_model = None
        self.calibrator_ = None
        self.calibration_scores_ = None
        self.cv_results = None
        self.weight_cv_results = None
        self.feature_importance = None
        self.metrics = None
        self.study = None
        self.is_primary = None  # resolved in generate_labels()
        self.display = None
        self.calibrate = None

        # ── Model type: bagging wrapper prefix + base estimator token ─────────
        # The wrapper prefix is computed here — it depends only on model_params
        # and is therefore stable for the lifetime of this instance. It is
        # prepended to the base estimator token so the model filename in the
        # file system immediately identifies the ensemble strategy used.
        #
        # Examples:  sbag_rf  |  bag_rf  |  plain_rf
        #
        if isinstance(model_params["pipe_clf"], Pipeline):
            _base_model = model_params["pipe_clf"].steps[-1][1]
        else:
            _base_model = model_params["pipe_clf"]

        self.model_type = f"{self._bagging_wrapper}_{get_model_type(_base_model)}"

        self.file_manager = ModelFileManager(base_dir)
        self.file_paths = self.file_manager.setup_model_directory(self.config, self.model_type)

        self.decay_factors = [0.001, 0.1, 0.25, 0.5, 0.75, 0.9]

        self.completed_steps = {
            "data_loading": False,
            "feature_engineering": False,
            "label_generation": False,
            "weight_computation": False,
            "meta_features": False,
            "model_training": False,
            "calibration": False,
            "analysis": False,
        }

        self.log_file = self.file_paths["logs"] / "pipeline.log"
        self._setup_logging()
        self.n_splits = model_params["n_splits"]

    # ── Properties ────────────────────────────────────────────────────────────

    @property
    def _bagging_wrapper(self) -> str:
        """
        Canonical three-state label for the bagging wrapper in use.

        Computed purely from model_params so it is available from __init__
        onward.  Used as a prefix on model_type (and therefore on the saved
        model filename) so artifacts are immediately distinguishable without
        opening them.

        Returns
        -------
        str
            'sbag'  — SequentiallyBootstrappedBaggingClassifier
            'bag'   — standard BaggingClassifier
            'plain' — no bagging; base estimator used directly
        """
        sequential = self.model_params.get("bagging_sequential", False)
        n_estimators = self.model_params.get("bagging_n_estimators", 0)
        if sequential and n_estimators > 0:
            return "sbag"
        if n_estimators > 0:
            return "bag"
        return "plain"

    # ── Logging ───────────────────────────────────────────────────────────────

    def _setup_logging(self):
        self.logger = logger.bind(context=self.__class__.__name__)
        # Optional: add per-run file sink (removed when pipeline is GC'd)
        self._log_handler_id = logger.add(
            self.file_paths["logs"] / "pipeline.log",
            level="DEBUG",
            rotation="50 MB",
            filter=lambda record: record["extra"].get("context") == "ModelDevelopmentPipeline",
        )

    # ── run() ─────────────────────────────────────────────────────────────────

    def run(
        self,
        generate_reports: bool = True,
        cache_reports: bool = False,
        save: bool = True,
        export_onnx: bool = False,
        calibrate: bool = False,
        display: bool = False,
        continue_study: bool = False,
        verbose: bool = True,
    ) -> Tuple:
        """
        Run the complete model development pipeline with integrated reporting.

        Parameters
        ----------
        generate_reports : bool, optional
            Generate analysis reports (default: True).
        cache_reports : bool, optional
            Display cache performance reports (default: False).
        save : bool, optional
            Save model and artifacts (default: True).
        export_onnx : bool, optional
            Export model to ONNX format (default: False).
        calibrate : bool, optional
            Fit CalibratorCV on OOF predictions after training.
            When True, self.best_model is replaced with the fitted calibrator,
            so all downstream calls to best_model.predict_proba() return
            calibrated probabilities.  Default: False.
        display : bool, optional
            Display the hyperparameter analysis report inline in a Jupyter
            notebook using IPython.display.  Default: False.
        verbose : bool, optional
            Print progress information (default: True).

        Returns
        -------
        tuple
            (best_model, feature_columns, metrics, config)
        """
        time0 = time.time()
        use_optuna = self.model_params.get("use_optuna", False)

        if verbose:
            print("\n" + "=" * 70)
            print(
                f"PRODUCTION MODEL DEVELOPMENT PIPELINE "
                f"(Backend: {'Optuna' if use_optuna else 'sklearn'})"
            )
            print("=" * 70)

        try:
            self.load_training_data()
            self.engineer_features()
            self.generate_labels()
            self.compute_sample_weights()
            self.add_meta_features()
            self.preprocess_features()
            self.train_model(continue_study)

            # ── Calibration (optional) ────────────────────────────────────────
            self.calibrate = calibrate
            if calibrate:
                self.calibrate_model()

            self.analyze_features()
            self._compile_metrics()

            if generate_reports:
                self._generate_analysis_reports(display=display)
            if cache_reports:
                self._display_cache_reports()
            if (save or export_onnx) and self.best_model is not None:
                self.export_onnx = export_onnx
                try:
                    self._save_all_artifacts()
                except Exception as e:
                    self.logger.error(f"Failed to save artifacts: {e}")
                    raise

            if verbose:
                elapsed = str(pd.Timedelta(seconds=time.time() - time0).round("1s")).replace(
                    "0 days ", ""
                )
                print(f"\n✓ Completed in {elapsed}")

            return self.best_model, self._get_feature_names(), self.metrics, self.config

        except Exception as e:
            self.logger.error(f"Pipeline failed: {e}")
            raise

    # ── Pipeline steps ────────────────────────────────────────────────────────

    def load_training_data(self):
        self.bar_data = load_and_prepare_training_data(**self.data_config)
        self.completed_steps["data_loading"] = True

    def engineer_features(self):
        ticks = loader.get_tick_data(
            self.data_config["symbol"],
            self.data_config["start_date"],
            self.data_config["end_date"],
            self.data_config["account_name"],
        )
        self.features = create_feature_engineering_pipeline(
            self.bar_data, ticks, self.feature_config, self.data_config
        )
        self.completed_steps["feature_engineering"] = True

    def generate_labels(self):
        """
        Apply the triple-barrier method and resolve the model's role.

        Role resolution order:
        1. If ``_is_primary_override`` was set at construction, use it.
        2. Otherwise auto-detect: primary when 'side' is absent from events.
        """
        self.events = generate_events_triple_barrier(
            self.bar_data,
            self.strategy,
            self.target_config,
            **self.label_config,
            logger=self.logger,
        )

        if self._is_primary_override is not None:
            self.is_primary = self._is_primary_override
        else:
            self.is_primary = "side" not in self.events.columns

        self.config["model_role"] = "primary" if self.is_primary else "secondary"
        self.logger.info(
            f"Model role: {self.config['model_role']} | "
            f"Label space: {np.unique(self.events['bin']).tolist()}"
        )
        self.logger.info(f"Average uniqueness: ({self.events['tW'].mean():.4f})")
        self.completed_steps["label_generation"] = True

    def compute_sample_weights(self):
        self.sample_weight, self.weight_cv_results = get_optimal_sample_weight(
            self.bar_data.index,
            self.events,
            self.features,
            self.n_splits,
            None,
            self.decay_factors,
        )
        self.best_weighting_scheme = self.weight_cv_results["best_scheme"]
        self.logger.info(
            f"best_weighting_scheme: {self.best_weighting_scheme} ({self.weight_cv_results['scoring']}={self.weight_cv_results['best_score']:.4f})"
        )
        self.completed_steps["weight_computation"] = True

    def add_meta_features(self):
        """
        Append rolling performance meta-features for secondary models.

        Primary models skip this step — they have no prior predictions to
        reference.  Secondary (meta-labeling) models receive rolling accuracy,
        precision, recall, and F1 metrics derived from the primary model's
        label outcomes, allowing the secondary to adapt to changing regimes.
        """
        if self.is_primary:
            self.meta_features = pd.DataFrame(index=self.events.index)
            self.logger.info("Primary model — rolling meta-features skipped.")
        else:
            self.meta_features = _add_meta_label_features(
                events=self.events,
                prices=self.bar_data.loc[self.events.index],
                sample_weight=self.sample_weight,
            )
        self.completed_steps["meta_features"] = True

    def preprocess_features(self):
        if self.meta_features.empty:
            combined = self.features.dropna()
        else:
            combined = self.features.join(self.meta_features, how="inner").dropna()

        self.preprocessor = Pipeline(
            [
                ("dcf", DropConstantFeatures()),
                ("ddf", DropDuplicateFeatures()),
            ]
        )
        self.preprocessed_features = self.preprocessor.fit_transform(combined)
        self.events = self.events.loc[self.preprocessed_features.index]

    def train_model(self, continue_study):
        """
        Dispatch to the appropriate HPO backend.

        The Optuna study optimises the base estimator's hyperparameters only.
        The bagging wrapper (if any) is applied post-HPO using the tuned base
        estimator.  Because bagging is not part of the optimisation loop the
        wrapper type does not affect the study name or config hash — a single
        study's trials are valid priors regardless of which wrapper is
        subsequently applied.

        Post-dispatch (both paths):
            The fitted preprocessor (DropConstant + DropDuplicate) is prepended
            to best_model so that sklearn inference is fully self-contained.
            NOTE: for ONNX export the preprocessor step is stripped before
            conversion.  Apply self.preprocessor.transform() as a standalone
            step before passing data to the deployed ONNX model.
        """
        self.model_params["pipe_clf"] = make_custom_pipeline(self.model_params["pipe_clf"])
        pipe = clone(self.model_params["pipe_clf"])

        bagging_n_estimators = self.model_params.get("bagging_n_estimators", 0)
        bagging_sequential = self.model_params.get("bagging_sequential", False)
        # avgU-as-max_samples (AFML §6.2) only applies to the standard
        # BaggingClassifier; the sequential wrapper draws full-size samples and
        # corrects for overlap internally, so skip the auto-set there.
        if bagging_n_estimators > 0 and not bagging_sequential:
            if self.model_params.get("bagging_max_samples") is None:
                av_uniqueness = float(self.events["tW"].mean().round(2))
                self.model_params["bagging_max_samples"] = av_uniqueness
                self.logger.info(
                    f"bagging_max_samples set to average uniqueness ({av_uniqueness:.4f})"
                )

        self.model_params["pipe_clf"] = pipe

        if self.model_params.get("use_optuna", False):
            self._train_model_optuna(continue_study)
        else:
            self._train_model_sklearn()

        self.best_model = Pipeline(
            [
                ("preprocessor", self.preprocessor),
                *self.best_model.steps,
            ]
        )
        self.best_model = set_pipeline_params(self.best_model, n_jobs=-1)
        self.completed_steps["model_training"] = True

    def calibrate_model(self) -> None:
        """
        Fit CalibratorCV wrapping best_model on the full training data.

        The calibrator uses PurgedKFold with the same n_splits and pct_embargo
        as the HPO step, ensuring that the OOF predictions used to fit the
        isotonic map are generated without temporal leakage.

        After this method returns:
        - self.best_model is replaced with the fitted CalibratorCV so all
          downstream calls to best_model.predict_proba() return calibrated
          probabilities.
        - self.calibrator_ holds the CalibratorCV instance for post-hoc
          diagnostics (reliability diagram, Brier score, oof_probs_).
        - self.calibration_scores_ holds the per-fold raw-vs-calibrated CV
          scores (mean/std DataFrame) from analyze_calibrated_cross_val_scores
          when model_params["calibration_diagnostic"] is True (default False),
          else None.  The diagnostic adds a nested CV of refits and is costly
          for large bagging ensembles, so it is opt-in.

        Both the deployed calibrator and this diagnostic re-inject the per-fold
        samples_info_sets when the model wraps a
        SequentiallyBootstrappedBaggingClassifier, so the sequential bootstrap
        sampler is genuinely reproduced on every refit (a plain clone would
        silently fall back to standard bootstrapping, making sequential and
        standard configurations indistinguishable).

        Notes
        -----
        ONNX export: CalibratorCV is not ONNX-compatible.  When export_onnx=True,
        _save_all_artifacts() unwraps the calibrator and exports the inner
        estimator (self.calibrator_.estimator_).
        """
        X = self.preprocessed_features.loc[self.events.index]
        y = self.events["bin"]
        sample_weight = self.sample_weight.loc[self.events.index]
        sample_weight_score = self.events["w"].loc[self.events.index]
        pct_embargo = self.model_params.get("pct_embargo", 0.01)
        calibration_diagnostic = self.model_params.get("calibration_diagnostic", False)

        # The base pipeline before it is wrapped/replaced by the fitted
        # calibrator.  Cloned (unfitted) inside the helper for the
        # cross-validated diagnostic so the sequential bootstrap wrapper, if
        # any, is reproduced per fold.
        base_pipeline = make_custom_pipeline(self.best_model)

        # Both the deployment calibrator fit and the (opt-in) raw-vs-calibrated
        # diagnostic are delegated to the cached helper so an identical
        # configuration reuses the persisted result instead of refitting.  The
        # diagnostic — enabled via model_params["calibration_diagnostic"]
        # (default False) — adds a nested CV (outer folds × inner CalibratorCV
        # folds) of refits and is costly for large bagging ensembles; a failure
        # there is non-fatal and yields calibration_scores_ = None.
        self.calibrator_, oof_brier, self.calibration_scores_ = fit_calibrated_model(
            base_pipeline=base_pipeline,
            X=X,
            y=y,
            sample_weight=sample_weight,
            sample_weight_score=sample_weight_score,
            t1=self.events["t1"],
            n_splits=self.n_splits,
            pct_embargo=pct_embargo,
            calibration_diagnostic=calibration_diagnostic,
        )
        self.logger.info(f"CalibratorCV fitted.  OOF Brier score: {oof_brier:.4f}")

        if self.calibration_scores_ is not None:
            raw_brier = self.calibration_scores_.loc["raw_brier", "mean"]
            cal_brier = self.calibration_scores_.loc["cal_brier", "mean"]
            self.logger.info(
                f"[{self._bagging_wrapper}] CV Brier — raw: {raw_brier:.4f}, "
                f"calibrated: {cal_brier:.4f}"
            )

        self.best_model = self.calibrator_
        self.completed_steps["calibration"] = True

    # ── Meta-labeling handoff ─────────────────────────────────────────────────

    def prepare_meta_labeling_inputs(self) -> pd.DataFrame:
        """
        Produce the events DataFrame annotated with the primary model's
        predicted side, ready for consumption by a secondary pipeline.

        The secondary pipeline detects 'side' in events.columns and switches
        to binary meta-labeling (bin ∈ {0, 1}).  The side signal is derived
        from the primary model's calibrated (or raw) probabilities:

            side = +1  where P(positive class) >= 0.5
            side = -1  otherwise

        Usage
        -----
        >>> meta_events = primary_pipeline.prepare_meta_labeling_inputs()
        >>> secondary = ModelDevelopmentPipeline(
        ...     data_config    = data_config,
        ...     feature_config = feature_config,
        ...     target_config  = target_config,
        ...     label_config   = {**label_config, "events": meta_events},
        ...     model_params   = secondary_model_params,
        ...     is_primary     = False,
        ... )

        Returns
        -------
        pd.DataFrame
            Copy of self.events with an additional 'side' column
            (values ∈ {-1, +1}).

        Raises
        ------
        RuntimeError
            If the pipeline has not completed training, or if called on a
            secondary model.
        """
        if self.best_model is None:
            raise RuntimeError(
                "Pipeline must complete training before calling prepare_meta_labeling_inputs()."
            )
        if not self.is_primary:
            raise RuntimeError("prepare_meta_labeling_inputs() is only valid for primary models.")

        X = self.preprocessed_features.loc[self.events.index]
        proba = self.best_model.predict_proba(X)

        # The last column is always P(positive class) for both binary and
        # ternary label spaces under sklearn's sorted-classes convention.
        side = np.where(proba[:, -1] >= 0.5, 1, -1)

        meta_events = self.events.copy()
        meta_events["side"] = side
        return meta_events

    # ── Private training backends ─────────────────────────────────────────────

    def _train_model_sklearn(self):
        from ..cross_validation.hyper_fit import clf_hyper_fit_cached

        bagging_sequential = self.model_params.get("bagging_sequential", False)
        bagging_n = self.model_params.get("bagging_n_estimators", 0)
        sample_weight_train = self.sample_weight.loc[self.events.index]
        sample_weight_score = self.events["w"].loc[sample_weight_train.index]

        included = inspect.signature(clf_hyper_fit_cached).parameters.keys()
        params = {k: v for k, v in self.model_params.items() if k in included}

        if bagging_sequential and bagging_n > 0:
            params["bagging_n_estimators"] = 0
            tuned_pipeline, self.cv_results = clf_hyper_fit_cached(
                features=self.preprocessed_features,
                labels=self.events["bin"],
                t1=self.events["t1"],
                **params,
                sample_weight_train=sample_weight_train,
                sample_weight_score=sample_weight_score,
            )
            self.best_model = self._apply_sequential_bagging(
                self.preprocessed_features,
                self.events["bin"],
                tuned_pipeline,
                sample_weight=sample_weight_train,
            )
        else:
            self.best_model, self.cv_results = clf_hyper_fit_cached(
                features=self.preprocessed_features,
                labels=self.events["bin"],
                t1=self.events["t1"],
                **params,
                sample_weight_train=sample_weight_train,
                sample_weight_score=sample_weight_score,
            )

    def _train_model_optuna(self, continue_study):
        from ..cross_validation.optuna_hyper_fit import (
            check_for_overfitting,
            optimize_trading_model,
            print_best_trial,
        )

        X, y = self.preprocessed_features, self.events["bin"]
        base_clf = self.model_params["pipe_clf"].steps[-1][1]
        metric = "f1" if set(y.unique()) == {0, 1} else "neg_log_loss"

        included = inspect.signature(optimize_trading_model).parameters.keys()
        opt_params = {"metric": metric, "continue_study": continue_study}
        for k, v in self.model_params.items():
            if k == "param_grid":
                opt_params["param_distributions"] = v
            elif k in included:
                opt_params[k] = v

        # ── Study name ────────────────────────────────────────────────────────
        # Tokens that change the Optuna optimization surface:
        #   strategy  — determines entry set and label distribution
        #   symbol    — different instruments have different dynamics
        #   bar_type/size — sampling frequency changes autocorrelation structure
        #   role      — primary (ternary) vs secondary (binary) label space
        #   config hash — catches all remaining surface dimensions (CV protocol,
        #                 search space shape, barrier params, target function)
        #
        # The bagging wrapper is intentionally omitted: HPO optimises the base
        # estimator only; bagging is applied post-study.  A study's trials are
        # valid priors regardless of which wrapper is subsequently applied.
        _role = "pri" if self.is_primary else "sec"
        _config_hash = self._get_study_config_hash(metric=metric)

        opt_params["study_name"] = (
            f"{self.config['strategy']}"
            f"_{self.symbol}"
            f"_{self.data_config.get('bar_type', 'unk')}"
            f"{self.data_config.get('bar_size', 'unk')}"
            f"_{_role}"
            f"_s{_config_hash}"
        )

        db_path: Path = self.file_paths["db_path"]
        db_path.parent.mkdir(parents=True, exist_ok=True)
        opt_params["db_path"] = f"sqlite:///{db_path.resolve()}"
        opt_params["reports_path"] = self.file_paths["reports"] / "trials"
        callbacks = [check_for_overfitting, print_best_trial]

        try:
            from .optuna_dashboard import launch_optuna_dashboard

            launch_optuna_dashboard(storage=opt_params["db_path"], timeout=60, force_restart=True)
        except Exception as e:
            self.logger.error(e)

        self.study, cv_results_df = optimize_trading_model(
            classifier=base_clf,
            X=X,
            y=y,
            events=self.events,
            data_index=self.bar_data.index,
            refit=True,
            callbacks=callbacks,
            **opt_params,
        )

        self.logger.info(
            f"Optuna complete.\nBest score: {self.study.best_value:.4f}"
            f"\nBest params: {pformat(self.study.best_params)}"
        )

        best_estimator = make_custom_pipeline(self.study.best_estimator_.base_estimator)
        bagging_sequential = self.model_params.get("bagging_sequential", False)
        bagging_n_estimators = self.model_params.get("bagging_n_estimators", 0)
        bagging_max_samples = self.model_params.get("bagging_max_samples", 1.0)
        bagging_max_features = self.model_params.get("bagging_max_features", 1.0)
        n_jobs = self.model_params.get("n_jobs", -1)
        random_state = self.model_params.get("random_state", None)

        if bagging_sequential and bagging_n_estimators > 0:
            self.best_model = self._apply_sequential_bagging(
                X,
                y,
                best_estimator,
                sample_weight=self.study.best_estimator_.sample_weight_,
            )
        elif bagging_n_estimators > 0:
            time0 = time.time()
            base_est = set_pipeline_params(best_estimator, n_jobs=1)
            bag = BaggingClassifier(
                estimator=MyPipeline(base_est.steps),
                n_estimators=int(bagging_n_estimators),
                max_samples=bagging_max_samples,
                max_features=bagging_max_features,
                n_jobs=n_jobs,
                random_state=random_state,
            )
            bag.fit(X, y, sample_weight=self.study.best_estimator_.sample_weight_)
            self.best_model = Pipeline([("bag", bag)])
            elapsed = str(pd.Timedelta(seconds=time.time() - time0).round("1s")).replace(
                "0 days ", ""
            )
            self.logger.info(f"\n✓ BaggingClassifier fitted in {elapsed}")
        else:
            self.best_model = best_estimator

        pruner_type = self.model_params.get("pruner_type", "hyperband")
        self.cv_results = {
            "best_params": self.study.best_params,
            "best_score": self.study.best_value,
            "cv_results": cv_results_df,
            "scoring": metric,
            "search_method": "optuna",
            "pruner_type": pruner_type,
            "n_trials_completed": len([t for t in self.study.trials if t.state.name == "COMPLETE"]),
            "n_trials_pruned": len([t for t in self.study.trials if t.state.name == "PRUNED"]),
        }

    # ── Config hash ───────────────────────────────────────────────────────────

    def _get_study_config_hash(self, metric: str = "") -> str:
        """
        Return an 8-character SHA-256 prefix uniquely identifying the
        combination of parameters that determines the Optuna study's
        optimisation surface.

        Resuming an existing study is only correct when the optimisation
        surface is identical to the one those trials explored.  Every
        dimension that, if changed, should produce a fresh study is included.

        Covered dimensions
        ------------------
        model
            Base classifier type and constructor parameters.
        search
            Sorted list of search-space keys.  Adding or removing a
            hyperparameter changes the dimensionality of the space; existing
            trials become incomplete records and corrupt the TPE surrogate.
        cv
            n_splits and pct_embargo.
        metric
            'f1' (binary) or 'neg_log_loss' (ternary).
        role
            'primary' vs 'secondary'.
        label
            All label_config fields.
        target
            Volatility target function name and parameters.

        Note: bagging configuration is intentionally excluded.  HPO optimises
        the base estimator only; the wrapper applied afterwards does not change
        the optimisation surface.

        Parameters
        ----------
        metric : str
            Scoring metric passed to optimize_trading_model.

        Returns
        -------
        str
            8-character lowercase hex string (SHA-256 prefix).
        """
        import hashlib

        # ── 1. Base classifier ────────────────────────────────────────────────
        pipe = self.model_params["pipe_clf"]
        base_clf = pipe.steps[-1][1] if hasattr(pipe, "steps") else pipe

        # ── 2. Search space shape ─────────────────────────────────────────────
        param_grid = self.model_params.get("param_grid", {})
        search_space = sorted(param_grid.keys())

        # ── 3. CV protocol ────────────────────────────────────────────────────
        cv_config = {
            "n_splits": self.model_params.get("n_splits", 5),
            "pct_embargo": self.model_params.get("pct_embargo", 0.02),
        }

        # ── 4. Combine and hash ───────────────────────────────────────────────
        combined = {
            "model": type(base_clf).__name__,
            "search": search_space,
            "cv": cv_config,
            "metric": metric,
            "role": "primary" if self.is_primary else "secondary",
            "config": self.config,
        }

        digest = hashlib.sha256(json.dumps(combined, sort_keys=True).encode()).hexdigest()

        return digest[:8]

    # ── Bagging helpers ───────────────────────────────────────────────────────

    def _apply_sequential_bagging(
        self,
        X: pd.DataFrame,
        y: pd.Series,
        tuned_pipeline,
        sample_weight: pd.Series = None,
    ) -> Pipeline:
        """
        Wrap a tuned base pipeline in SequentiallyBootstrappedBaggingClassifier.

        After fitting, the trained estimators are transferred to a standard
        BaggingClassifier shell so that inference (predict / predict_proba) is
        available without requiring the events index at deployment time.
        """
        time0 = time.time()
        self.logger.info("\nSequential bootstrap is being fitted...")

        bagging_n = self.model_params.get("bagging_n_estimators", 0)
        # Sequential bootstrap already corrects for non-IID overlap inside the
        # draw (the indicator matrix lowers the probability of redundant
        # samples), so we draw a full-size sample. The avgU-as-max_samples
        # remedy (AFML §6.2) applies only to the standard BaggingClassifier
        # path, which still reads bagging_max_samples upstream.
        bagging_samples = 1.0
        bagging_feats = self.model_params.get("bagging_max_features", 1.0)
        random_state = self.model_params.get("random_state", 1)

        base_est = set_pipeline_params(tuned_pipeline, n_jobs=1)

        bag = apply_seq_bootstrap(
            X=X,
            y=y,
            estimator=MyPipeline(base_est.steps),
            n_estimators=int(bagging_n),
            max_samples=bagging_samples,
            max_features=bagging_feats,
            samples_info_sets=self.events["t1"],
            price_bars_index=self.bar_data.index,
            random_state=random_state,
            sample_weight=sample_weight,
        )

        # Keep the genuine SequentiallyBootstrappedBaggingClassifier in the
        # pipeline (do NOT down-cast to a standard BaggingClassifier shell).
        # Inference (predict / predict_proba) is inherited from BaggingClassifier
        # and needs only estimators_, so the price bar index is not required at
        # deployment time.  Retaining the real class is what lets calibration
        # (CalibratorCV / analyze_calibrated_cross_val_scores) detect the
        # sequential sampler via its samples_info_sets and reproduce it per
        # fold; converting to a shell silently reverts refits to standard
        # bootstrapping.  Downstream artifact saving and ONNX export already
        # special-case SequentiallyBootstrappedBaggingClassifier.
        elapsed = str(pd.Timedelta(seconds=time.time() - time0).round("1s")).replace("0 days ", "")
        self.logger.info(f"\n✓ Sequential bootstrap fitted in {elapsed}")

        return Pipeline([("seq_bag", bag)])

    # ── Analysis ──────────────────────────────────────────────────────────────

    def analyze_features(self):
        from .weighted_estimator import _WeightedEstimator

        clf = self.best_model
        if self.calibrate:
            clf = clf.estimator_

        if hasattr(clf, "steps"):
            clf = clf.steps[-1][1]

        feat_names = self._get_feature_names()

        if isinstance(clf, (SequentiallyBootstrappedBaggingClassifier, BaggingClassifier)):
            importances = np.mean(
                [est.steps[-1][1].feature_importances_ for est in clf.estimators_], axis=0
            )
        elif isinstance(clf, _WeightedEstimator):
            try:
                importances = clf.base_estimator.feature_importances_
            except Exception as e:
                importances = np.zeros(len(feat_names))
                self.logger.error(e)
        else:
            try:
                importances = clf.feature_importances_
            except Exception as e:
                importances = np.zeros(len(feat_names))
                self.logger.error(e)

        self.feature_importance = pd.DataFrame(
            {
                "feature": feat_names,
                "importance": importances,
            }
        ).sort_values("importance", ascending=False)
        self.completed_steps["analysis"] = True

    def _compile_metrics(self):
        self.metrics = {
            "cv_results": self.cv_results,
            "feature_importance": self.feature_importance,
            "training_samples": len(self.bar_data),
            "feature_count": len(self._get_feature_names()),
            "best_weighting_scheme": self.best_weighting_scheme,
            "average_uniqueness": self.events["tW"].mean(),
            "completed_steps": self.completed_steps,
            "calibrated": self.calibrator_ is not None,
            "bagging_wrapper": self._bagging_wrapper,
            "model_role": self.config.get("model_role"),
        }

        if self.calibrator_ is not None:
            try:
                from ..calibration.calibration import calibration_report

                y_true, raw, calibrated = self._calibration_oof_arrays()
                report_df = calibration_report(y_true, raw, calibrated)
                self.metrics["calibration"] = report_df["value"].to_dict()
            except Exception as e:
                self.logger.warning(f"Calibration metrics compilation failed: {e}")

    def _get_feature_names(self):
        if self.preprocessed_features is None:
            return []
        return self.preprocessed_features.columns.tolist()

    # ── Persistence ───────────────────────────────────────────────────────────

    @staticmethod
    def _seq_bagging_to_standard(
        sb: SequentiallyBootstrappedBaggingClassifier,
    ) -> BaggingClassifier:
        """
        Down-cast a fitted SequentiallyBootstrappedBaggingClassifier to a
        standard, fitted BaggingClassifier.

        At inference the two are identical — predict / predict_proba are
        inherited unchanged from BaggingClassifier, and the sequential sampler
        only ever influences which rows the (already-fitted) base estimators saw
        during training.  sklearn-onnx ships a shape calculator/converter for
        BaggingClassifier but not for the custom subclass, so this conversion is
        applied only when preparing the model for ONNX export.  The live,
        in-memory model keeps the real class so calibration can reproduce the
        sequential bootstrap per fold.
        """
        bag = BaggingClassifier(
            estimator=sb.estimator,
            n_estimators=len(sb.estimators_),
            max_features=sb.max_features,
            bootstrap=sb.bootstrap,
            bootstrap_features=sb.bootstrap_features,
            random_state=sb.random_state,
            n_jobs=sb.n_jobs,
        )
        # Transfer the fitted state required for inference.
        bag.estimators_ = sb.estimators_
        bag.estimators_features_ = sb.estimators_features_
        bag.classes_ = sb.classes_
        bag.n_classes_ = sb.n_classes_
        bag.n_features_in_ = sb.n_features_in_
        if hasattr(sb, "estimator_"):
            bag.estimator_ = sb.estimator_
        return bag

    @staticmethod
    def _convert_mypipeline_for_onnx(pipeline: Pipeline) -> None:
        """
        Recursively replace every MyPipeline instance inside a fitted sklearn
        Pipeline with a standard sklearn Pipeline in-place, and down-cast any
        SequentiallyBootstrappedBaggingClassifier to a standard BaggingClassifier
        so sklearn-onnx can convert it.
        """
        for i, (name, step) in enumerate(pipeline.steps):
            # Down-cast the sequential bootstrap classifier first so the branch
            # below handles its base-estimator MyPipeline conversion uniformly.
            if isinstance(step, SequentiallyBootstrappedBaggingClassifier):
                step = ModelDevelopmentPipeline._seq_bagging_to_standard(step)
                pipeline.steps[i] = (name, step)

            if isinstance(step, MyPipeline):
                pipeline.steps[i] = (name, Pipeline(step.steps))
            elif isinstance(step, BaggingClassifier):
                if isinstance(step.estimator, MyPipeline):
                    step.estimator = Pipeline(step.estimator.steps)
                if hasattr(step, "estimators_"):
                    step.estimators_ = [
                        Pipeline(e.steps) if isinstance(e, MyPipeline) else e
                        for e in step.estimators_
                    ]

    def _save_all_artifacts(self):
        metadata = {
            "strategy": self.config["strategy"],
            "feature_names": self._get_feature_names(),
            "use_optuna": self.model_params.get("use_optuna", False),
            "pipeline_version": self.pipeline_version,
            "calibrated": self.calibrator_ is not None,
            "bagging_wrapper": self._bagging_wrapper,
            "model_role": self.config.get("model_role"),
        }
        self.file_manager.save_model(self.best_model, metadata)

        if self.strategy is not None:
            self.file_manager.save_object(self.strategy, "strategy")

        self.file_manager.save_dataframe(self.preprocessed_features, "features")
        self.file_manager.save_dataframe(self.events, "events")

        if self.sample_weight is not None:
            self.file_manager.save_dataframe(self.sample_weight.to_frame("weights"), "weights")

        self.file_manager.save_object(self.metrics, "metrics")

        if self.feature_config is not None:
            self.file_manager.save_object(self.feature_config, "feature_config")

        feature_names = self._get_feature_names()
        if feature_names:
            self.file_manager.save_object(feature_names, "feature_names")

        if self.preprocessor is not None:
            self.file_manager.save_object(self.preprocessor, "preprocessor")

        if self.target_config is not None:
            self.file_manager.save_object(self.target_config, "target_config")

        if self.calibrator_ is not None:
            self.file_manager.save_object(self.calibrator_.calibrator_, "calibrator")

        if self.export_onnx:
            if self.calibrator_ is not None:
                onnx_source = Pipeline(self.calibrator_.estimator_.steps[1:])
            else:
                onnx_source = Pipeline(self.best_model.steps[1:])

            self._convert_mypipeline_for_onnx(onnx_source)
            self.file_manager.save_model_as_onnx(onnx_source, self._get_feature_names(), metadata)

        self.logger.info(f"Saved all artifacts to {self.file_paths['base_dir']}")

    # ── Reporting ─────────────────────────────────────────────────────────────

    def _generate_analysis_reports(self, display: bool = False):
        """
        Generates hyperparameter analysis, importance plots, and HTML summary.

        Any column in cv_results["cv_results"] that is 100% NaN is dropped
        before calling generate_complete_hyperparameter_report to prevent the
        matplotlib "autodetected range of [nan, nan]" error when param_grid
        contains heterogeneous search spaces.
        """
        from ..cross_validation.hyper_fit_analysis import generate_complete_hyperparameter_report

        try:
            if self.cv_results and "cv_results" in self.cv_results:
                cv_df = pd.DataFrame(self.cv_results["cv_results"])
                nan_cols = cv_df.columns[cv_df.isna().all()].tolist()
                if nan_cols:
                    cv_df = cv_df.dropna(axis=1, how="all")
                    self.logger.info(
                        f"Hyperparameter report: dropped {len(nan_cols)} fully-NaN "
                        f"column(s): {nan_cols}"
                    )
                generate_complete_hyperparameter_report(
                    cv_results=cv_df,
                    strategy_config=self.config,
                    output_dir=self.file_paths["reports"],
                    display_in_notebook=display,
                )

            self._generate_training_summary_html()
            if self.study is not None:
                self._generate_optuna_report(display=display)
            if self.calibrator_ is not None:
                self._generate_calibration_report()

        except Exception as e:
            self.logger.warning(f"Report generation failed: {e}")

    def _generate_optuna_report(self, display: bool = False):
        """
        Save a self-contained HTML report with Optuna visualisation plots.
        """
        import matplotlib.pyplot as plt
        import optuna.visualization as vis
        from plotly.io import to_html

        report_path = self.file_paths["reports"] / "optuna_study_report.html"
        study = self.study

        n_complete = len([t for t in study.trials if t.state.name == "COMPLETE"])
        n_pruned = len([t for t in study.trials if t.state.name == "PRUNED"])

        plot_specs = [
            (
                "Optimization History",
                "Objective value per trial. Dashed line shows the running best.",
                lambda: vis.plot_optimization_history(study),
            ),
            (
                "Fold Scores per Trial (Intermediate Values)",
                "Each line is one trial. Trials that end early were pruned.",
                lambda: vis.plot_intermediate_values(study),
            ),
            (
                "Hyperparameter Importances",
                "fANOVA estimate of each parameter's contribution to score variance.",
                lambda: vis.plot_param_importances(study),
            ),
            (
                "Parallel Coordinates",
                "One line per completed trial, coloured by score.",
                lambda: vis.plot_parallel_coordinate(study),
            ),
        ]

        html_parts = [
            f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Optuna Study Report — {study.study_name}</title>
    <style>
        body {{ font-family: 'Segoe UI', Arial, sans-serif; background: #0f172a;
               color: #f1f5f9; padding: 40px; line-height: 1.6; }}
        h1   {{ color: #38bdf8; border-bottom: 2px solid #334155; padding-bottom: 10px; }}
        h2   {{ color: #7dd3fc; margin-top: 48px; }}
        p.caption {{ color: #64748b; font-size: 0.88rem; margin-top: 4px; }}
        .meta {{ color: #94a3b8; font-size: 0.9rem; margin-bottom: 32px; }}
        .meta span {{ color: #22c55e; font-weight: 600; }}
        .plot {{ margin-bottom: 56px; }}
    </style>
</head>
<body>
<h1>Optuna Study Report</h1>
<p class="meta">
    Study: <b>{study.study_name}</b><br>
    Completed: <span>{n_complete}</span> trials &nbsp;|&nbsp;
    Pruned: <span style="color:#f97316">{n_pruned}</span> trials &nbsp;|&nbsp;
    Best score: <span>{study.best_value:.4f}</span>
</p>"""
        ]

        for title, caption, plot_fn in plot_specs:
            try:
                fig = plot_fn()
                html = to_html(fig, full_html=False, include_plotlyjs="cdn")
                html_parts.append(
                    f'<div class="plot"><h2>{title}</h2>'
                    f'<p class="caption">{caption}</p>{html}</div>'
                )
            except Exception as e:
                self.logger.warning(f"Optuna plot '{title}' failed: {e}")

        try:
            from ..cross_validation.optuna_hyper_fit import plot_model_vs_baseline

            original_backend = plt.get_backend()
            plt.switch_backend("agg")
            with plt.ioff():
                plot_model_vs_baseline(
                    study,
                    self.events["bin"],
                    self.events,
                    show=display,
                )
                baseline_path = self.file_paths["reports"] / "optuna_baseline_comparison.png"
                plt.savefig(
                    baseline_path,
                    dpi=150,
                    bbox_inches="tight",
                    facecolor="#0f172a",
                    edgecolor="none",
                )
                plt.close("all")
            plt.switch_backend(original_backend)
            self.logger.info(f"Baseline comparison plot saved: {baseline_path}")
        except Exception as e:
            self.logger.warning(f"Baseline plot failed: {e}")

        html_parts.append("</body></html>")

        report_path.write_text("\n".join(html_parts), encoding="utf-8")
        self.logger.info(f"Optuna study report saved: {report_path}")

    def _generate_training_summary_html(self):
        """Constructs a comprehensive HTML training report."""
        try:
            report_path = self.file_paths["reports"] / "training_summary.html"

            best_score = self.cv_results.get("best_score", 0)
            search_method = "Optuna" if self.study is not None else "Scikit-Learn"
            calibrated = self.calibrator_ is not None

            html_content = f"""
            <html>
            <head>
                <title>Training Report - {self.symbol}</title>
                <style>
                    body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
                            background-color: #0f172a; color: #f1f5f9; padding: 40px;
                            line-height: 1.6; }}
                    .container {{ max-width: 900px; margin: auto; }}
                    h1 {{ color: #38bdf8; border-bottom: 2px solid #334155; padding-bottom: 10px; }}
                    .card {{ background-color: #1e293b; border-radius: 12px; padding: 24px;
                             margin-bottom: 24px; border: 1px solid #334155; }}
                    table {{ width: 100%; border-collapse: collapse; margin-top: 10px; }}
                    th, td {{ text-align: left; padding: 12px; border-bottom: 1px solid #334155; }}
                    th {{ color: #94a3b8; font-weight: 600; text-transform: uppercase;
                          font-size: 0.8rem; }}
                    .metric {{ font-size: 1.5rem; font-weight: 700; color: #22c55e; }}
                    .label {{ color: #94a3b8; font-size: 0.9rem; }}
                    .badge {{ display: inline-block; padding: 2px 10px; border-radius: 999px;
                              font-size: 0.78rem; font-weight: 600; }}
                    .badge-on  {{ background: #14532d; color: #4ade80; }}
                    .badge-off {{ background: #1e293b; color: #64748b; border: 1px solid #334155; }}
                </style>
            </head>
            <body>
                <div class="container">
                    <h1>Training Summary: {self.symbol}</h1>
                    <p class="label">Report Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}</p>

                    <div class="card">
                        <h2>Performance Snapshot</h2>
                        <table>
                            <tr>
                                <td>
                                    <span class="label">
                                        Primary Metric ({self.cv_results.get("scoring", "F1")})
                                    </span><br>
                                    <span class="metric">{best_score:.4f}</span>
                                </td>
                                <td>
                                    <span class="label">Backend</span><br>
                                    <strong>{search_method}</strong>
                                </td>
                            </tr>
                            <tr>
                                <td>
                                    <span class="label">Training Samples</span><br>
                                    <strong>{len(self.events)}</strong>
                                </td>
                                <td>
                                    <span class="label">Average Uniqueness</span><br>
                                    <strong>{self.events["tW"].mean():.4f}</strong>
                                </td>
                            </tr>
                            <tr>
                                <td>
                                    <span class="label">Model Role</span><br>
                                    <strong>{self.config.get("model_role", "N/A").capitalize()}</strong>
                                </td>
                                <td>
                                    <span class="label">Bagging Wrapper</span><br>
                                    <strong>{self._bagging_wrapper}</strong>
                                </td>
                            </tr>
                            <tr>
                                <td>
                                    <span class="label">Calibrated</span><br>
                                    <span class="badge {"badge-on" if calibrated else "badge-off"}">
                                        {"CalibratorCV" if calibrated else "No calibration"}
                                    </span>
                                    {'<br><a href="calibration_report.html" style="color:#38bdf8;font-size:0.8rem;">View reliability report</a>' if calibrated else ""}
                                </td>
                                <td></td>
                            </tr>
                        </table>
                    </div>

                    <div class="card">
                        <h2>Weighting Logic</h2>
                        <p><strong>Selected Scheme:</strong>
                           {self.best_weighting_scheme or "Standard/Time-Decay"}</p>
                        <p class="label">
                            Weights were optimized via Purged-KFold to minimize
                            serial correlation leakage.
                        </p>
                    </div>
                </div>
            </body>
            </html>
            """

            report_path.write_text(html_content, encoding="utf-8")
            self.logger.info(f"Generated HTML summary report: {report_path}")

        except Exception as e:
            self.logger.error(f"HTML report generation failed: {e}")

    def _calibration_oof_arrays(self):
        """
        Recover aligned (y_true, raw, calibrated) out-of-fold probability arrays
        from the fitted CalibratorCV, restricted to valid (non-NaN) OOF rows.

        The raw probabilities are the calibrator's stored out-of-fold
        positive-class predictions (CalibratorCV.oof_probs_); the calibrated
        probabilities are obtained by replaying the fitted calibration map
        exactly as CalibratorCV.predict_proba does.

        Returns
        -------
        (y_true, raw, calibrated) : tuple of np.ndarray
            Binary labels and raw/calibrated positive-class probabilities.

        Raises
        ------
        ValueError
            If no calibrator is fitted.
        """
        if self.calibrator_ is None:
            raise ValueError("No fitted calibrator available.")

        raw = np.asarray(self.calibrator_.oof_probs_, dtype=float)
        y_true = self.events["bin"].to_numpy()
        valid = ~np.isnan(raw)
        raw, y_true = raw[valid], y_true[valid]

        calibrator = self.calibrator_.calibrator_
        if self.calibrator_.method == "isotonic":
            calibrated = np.clip(calibrator.predict(raw), 0.0, 1.0)
        else:  # 'platt'
            calibrated = calibrator.predict_proba(raw.reshape(-1, 1))[:, 1]

        return y_true, raw, calibrated

    def _generate_calibration_report(self):
        """
        Save a self-contained HTML calibration report with a raw-vs-calibrated
        reliability diagram and a Brier/ECE/MCE metrics table.

        Only produced when a CalibratorCV has been fitted (``calibrate=True``),
        since the raw and calibrated out-of-fold probabilities required for the
        reliability curves exist only then.  When the opt-in per-fold diagnostic
        is enabled (``model_params["calibration_diagnostic"]=True``), the
        per-fold raw-vs-calibrated CV scores are appended as a second table.

        A failure here is non-fatal: it is logged as a warning and leaves the
        rest of the report generation unaffected.
        """
        if self.calibrator_ is None:
            return

        try:
            import base64
            from io import BytesIO

            import matplotlib.pyplot as plt

            from ..calibration.calibration import calibration_report, plot_reliability_with_ci

            y_true, raw, calibrated = self._calibration_oof_arrays()
            report_df = calibration_report(y_true, raw, calibrated)

            # ── Reliability diagram (raw vs calibrated) ───────────────────────
            original_backend = plt.get_backend()
            plt.switch_backend("agg")
            png_path = self.file_paths["reports"] / "calibration_reliability.png"
            with plt.ioff():
                fig, axes = plt.subplots(1, 2, figsize=(14, 7))
                plot_reliability_with_ci(y_true, raw, ax=axes[0], title="Raw (uncalibrated)")
                plot_reliability_with_ci(y_true, calibrated, ax=axes[1], title="Calibrated")
                fig.tight_layout()
                fig.savefig(
                    png_path,
                    dpi=150,
                    bbox_inches="tight",
                    facecolor="#0f172a",
                    edgecolor="none",
                )
                buf = BytesIO()
                fig.savefig(
                    buf,
                    format="png",
                    dpi=150,
                    bbox_inches="tight",
                    facecolor="#0f172a",
                    edgecolor="none",
                )
                plt.close("all")
            plt.switch_backend(original_backend)
            buf.seek(0)
            img_b64 = base64.b64encode(buf.read()).decode("utf-8")

            # ── Metrics tables ────────────────────────────────────────────────
            # Pivot the flat report into original / calibrated / improvement
            # columns with one row per metric (Brier, ECE, MCE).
            vals = report_df["value"].to_dict()
            metric_labels = {"brier": "Brier", "ece": "ECE", "mce": "MCE"}
            metrics_pivot = pd.DataFrame(
                {
                    "original": {
                        label: vals.get(f"original_{key}") for key, label in metric_labels.items()
                    },
                    "calibrated": {
                        label: vals.get(f"calibrated_{key}") for key, label in metric_labels.items()
                    },
                }
            )
            metrics_pivot["improvement"] = metrics_pivot["original"] - metrics_pivot["calibrated"]
            metrics_table = metrics_pivot.to_html(
                classes="cal-table", border=0, float_format=lambda v: f"{v:.4f}"
            )

            diagnostic_section = ""
            if self.calibration_scores_ is not None:
                diag_table = self.calibration_scores_.to_html(
                    classes="cal-table", border=0, float_format=lambda v: f"{v:.4f}"
                )
                diagnostic_section = f"""
                    <div class="card">
                        <h2>Per-fold CV Diagnostic (raw vs calibrated)</h2>
                        <p class="label">Mean and std across PurgedKFold folds from
                           analyze_calibrated_cross_val_scores.</p>
                        {diag_table}
                    </div>"""

            html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Calibration Report - {self.symbol}</title>
    <style>
        body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
                background-color: #0f172a; color: #f1f5f9; padding: 40px;
                line-height: 1.6; }}
        .container {{ max-width: 1100px; margin: auto; }}
        h1 {{ color: #38bdf8; border-bottom: 2px solid #334155; padding-bottom: 10px; }}
        h2 {{ color: #7dd3fc; margin-top: 8px; }}
        .card {{ background-color: #1e293b; border-radius: 12px; padding: 24px;
                 margin-bottom: 24px; border: 1px solid #334155; }}
        table.cal-table {{ width: 100%; border-collapse: collapse; margin-top: 10px; }}
        table.cal-table th, table.cal-table td {{ text-align: left; padding: 10px;
                 border-bottom: 1px solid #334155; }}
        table.cal-table th {{ color: #94a3b8; font-weight: 600; }}
        .label {{ color: #94a3b8; font-size: 0.9rem; }}
        img {{ width: 100%; max-width: 1100px; border-radius: 8px; }}
    </style>
</head>
<body>
    <div class="container">
        <h1>Calibration Report: {self.symbol}</h1>
        <p class="label">Report Generated: {datetime.now().strftime("%Y-%m-%d %H:%M")}
           &nbsp;|&nbsp; Method: {self.calibrator_.method}
           &nbsp;|&nbsp; OOF samples: {len(y_true)}</p>

        <div class="card">
            <h2>Reliability Diagram</h2>
            <p class="label">Predicted probability vs observed frequency with bootstrap
               95% confidence intervals. Points on the dashed diagonal are perfectly
               calibrated.</p>
            <img src="data:image/png;base64,{img_b64}" alt="Reliability diagram">
        </div>

        <div class="card">
            <h2>Calibration Metrics</h2>
            <p class="label">Lower Brier, Expected Calibration Error (ECE) and
               Maximum Calibration Error (MCE) indicate better calibration;
               positive improvement values mean calibration helped.</p>
            {metrics_table}
        </div>
        {diagnostic_section}
    </div>
</body>
</html>"""

            report_path = self.file_paths["reports"] / "calibration_report.html"
            report_path.write_text(html_content, encoding="utf-8")
            self.logger.info(f"Generated calibration report: {report_path}")

        except Exception as e:
            self.logger.warning(f"Calibration report generation failed: {e}")

    def _display_cache_reports(self):
        print("\n" + "=" * 70)
        print("CACHE PERFORMANCE REPORT")
        print("=" * 70)
        monitor = get_cache_monitor()
        if monitor:
            monitor.print_report()

    # ── Diagnostics ───────────────────────────────────────────────────────────

    def check_contamination(self):
        print("\n" + "=" * 70)
        print("DATA CONTAMINATION CHECK")
        print("=" * 70)
        print_contamination_report()

    def get_data_summary(self) -> pd.DataFrame:
        summary_data = []
        components = [
            ("bar_data", self.bar_data),
            ("features", self.features),
            ("preprocessed_features", self.preprocessed_features),
            ("events", self.events),
            ("meta_features", self.meta_features),
            ("sample_weight", self.sample_weight),
        ]
        for name, data in components:
            if data is not None:
                if isinstance(data, pd.DataFrame):
                    shape, dtype, columns = data.shape, "DataFrame", f"{len(data.columns)} cols"
                elif isinstance(data, pd.Series):
                    shape, dtype, columns = (len(data),), "Series", "N/A"
                else:
                    shape, dtype, columns = "N/A", type(data).__name__, "N/A"
                summary_data.append(
                    {
                        "Component": name,
                        "Type": dtype,
                        "Rows": shape[0] if isinstance(shape, tuple) else shape,
                        "Columns": (
                            shape[1] if isinstance(shape, tuple) and len(shape) > 1 else columns
                        ),
                        "Memory (MB)": (
                            data.memory_usage(deep=True).sum() / (1024**2)
                            if hasattr(data, "memory_usage")
                            else "N/A"
                        ),
                    }
                )
        return pd.DataFrame(summary_data)


# ============================================================================
# Helpers
# ============================================================================


def get_model_type(model) -> str:
    types = {
        "RandomForestClassifier": "rf",
        "SequentiallyBootstrappedBaggingClassifier": "seq_rf",
        "DecisionTreeClassifier": "dt",
    }
    name = type(model).__name__
    return types.get(name, name.replace("Classifier", "").lower())


def is_tree(estimator) -> bool:
    return isinstance(estimator, (RandomForestClassifier, DecisionTreeClassifier))


@cacheable(time_aware=True)
def apply_seq_bootstrap(
    X,
    y,
    estimator,
    n_estimators,
    max_samples,
    max_features,
    samples_info_sets,
    price_bars_index,
    random_state,
    sample_weight,
):
    bag = SequentiallyBootstrappedBaggingClassifier(
        estimator=estimator,
        n_estimators=n_estimators,
        max_samples=max_samples,
        max_features=max_features,
        samples_info_sets=samples_info_sets,
        price_bars_index=price_bars_index,
        random_state=random_state,
    )

    if sample_weight is not None:
        bag.fit(X, y, sample_weight=sample_weight)
    else:
        bag.fit(X, y)

    return bag
