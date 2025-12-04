Specification
"""
Fast Multi-Pair RSI Trading Bot
Supports:
- BTCUSDT
- XAUUSD
- GBPUSD
Opens fast buy or sell trades based on RSI signals
Closes trades after 5, 10, or 15 minutes
"""
import asyncio
import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import pandas as pd
import numpy as np
# ===== RSI calculation ===== #
def compute_rsi(close: pd.Series, period: int = 14) -> pd.Series:
delta = close.diff()
gain = delta.clip(lower=0)
loss = -delta.clip(upper=0)
avg_gain = gain.ewm(alpha=1 / period, adjust=False).mean()
avg_loss = loss.ewm(alpha=1 / period, adjust=False).mean()
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
# ===== Position structure ===== #
@dataclass
class Position:
id: str
symbol: str
side: str
entry_price: float
size: float
opened_at: float
duration_min: int
# ===== Config ===== #
@dataclass
class BotConfig:
symbols: List[str] = field(default_factory=lambda: ["BTCUSDT", "XAUUSD", "GBPUSD"])
rsi_period: int = 14
rsi_oversold: int = 30
rsi_overbought: int = 70
durations_min: List[int] = field(default_factory=lambda: [5, 10, 15])
account_equity: float = 2000.0
risk_pct: float = 0.5
lot_size: Optional[float] = None
paper: bool = True
# ===== Trading Bot ===== #
class MultiPairRSIBot:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.data: Dict[str, pd.DataFrame] = {sym: pd.DataFrame() for sym in cfg.symbols}
self.positions: Dict[str, Dict[str, Position]] = {sym: {} for sym in cfg.symbols}
self._id = 0
# ========== Fake 1-minute feed for PAPER mode ========== #
def get_fake_ohlcv(self, symbol):
now = int(time.time()) * 1000
df = self.data[symbol]
last_close = df["close"].iloc[-1] if not df.empty else 1000 + np.random.rand() * 10
change = np.random.normal(0, 0.0008)
close = last_close * (1 + change)
high = max(last_close, close)
low = min(last_close, close)
return (now, last_close, high, low, close, 0)
# ========== Append new candle ========== #
def append_candle(self, symbol, ohlc):
ts, o, h, l, c, v = ohlc
row = {"timestamp": pd.to_datetime(ts, unit="ms"),
"open": o, "high": h, "low": l, "close": c, "volume": v}
self.data[symbol] = pd.concat([self.data[symbol], pd.DataFrame([row])], ignore_index=True)
if len(self.data[symbol]) > 2000:
self.data[symbol] = self.data[symbol].iloc[-2000:]
# ========== Timeframe aggregation ========== #
def to_tf(self, symbol, minutes):
df = self.data[symbol]
if df.empty:
return pd.DataFrame()
df["bucket"] = df["timestamp"].dt.floor(f"{minutes}T")
out = df.groupby("bucket").agg({
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum"
}).reset_index().rename(columns={"bucket": "timestamp"})
return out
# ========== Position sizing ========== #
def get_size(self, price):
if self.cfg.lot_size:
return self.cfg.lot_size
risk_amount = self.cfg.account_equity * (self.cfg.risk_pct / 100)
return round(risk_amount / price, 4)
# ========== Check RSI signals and enter trades ========== #
async def process_signals(self, symbol):
for dur in self.cfg.durations_min:
df = self.to_tf(symbol, dur)
if len(df) < self.cfg.rsi_period + 2:
continue
df["rsi"] = compute_rsi(df["close"], self.cfg.rsi_period)
prev = df["rsi"].iloc[-2]
last = df["rsi"].iloc[-1]
price = df["close"].iloc[-1]
# BUY: RSI cross up
if prev <= self.cfg.rsi_oversold and last > prev:
size = self.get_size(price)
await self.open_position(symbol, "buy", price, size, dur)
# SELL: RSI cross down
if prev >= self.cfg.rsi_overbought and last < prev:
size = self.get_size(price)
await self.open_position(symbol, "sell", price, size, dur)
# ========== Open position ========== #
async def open_position(self, symbol, side, price, size, duration):
self._id += 1
pid = f"{symbol}_{self._id}"
print(f"[{symbol}] OPEN {side.upper()} @ {price:.2f} | {duration}m | size {size}")
pos = Position(
id=pid,
symbol=symbol,
side=side,
entry_price=price,
size=size,
opened_at=time.time(),
duration_min=duration
)
self.positions[symbol][pid] = pos
# ========== Close expired trades ========== #
async def close_expired(self, symbol):
now = time.time()
to_close = []
for pid, pos in self.positions[symbol].items():
if (now - pos.opened_at) / 60 >= pos.duration_min:
to_close.append(pid)
for pid in to_close:
await self.close_position(symbol, pid)
# ========== Close position ========== #
async def close_position(self, symbol, pid):
pos = self.positions[symbol][pid]
last_price = self.data[symbol]["close"].iloc[-1]
pnl = (last_price - pos.entry_price) * pos.size if pos.side == "buy" else (pos.entry_price - last_price) * pos.size
print(f"[{symbol}] CLOSE {pos.side.upper()} @ {last_price:.2f} | PnL = {pnl:.3f}")
self.cfg.account_equity += pnl
del self.positions[symbol][pid]
# ========== Main loop ========== #
async def start(self):
print("Starting multi-pair RSI bot...")
print("Symbols:", self.cfg.symbols)
while True:
try:
for symbol in self.cfg.symbols:
# new candle
ohlcv = self.get_fake_ohlcv(symbol)
self.append_candle(symbol, ohlcv)
# signal scan
await self.process_signals(symbol)
# manage trades
await self.close_expired(symbol)
except Exception as e:
print("Error:", e)
await asyncio.sleep(1)
# ========== Launch Example ========== #
async def main():
cfg = BotConfig(
symbols=["BTCUSDT", "XAUUSD", "GBPUSD"],
account_equity=3000.0,
paper=True,
lot_size=None
)
bot = MultiPairRSIBot(cfg)
task = asyncio.create_task(bot.start())
await asyncio.sleep(60 * 5) # run 5 minutes demo
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
