Торговый бот для крипты
Запустить на сервере
sudo nano /etc/systemd/system/gateio_1h_bot.service
[Unit]
Description=GateIO Trading Bot Service
After=network-online.target
Wants=network-online.target
[Service]
User=andedali
WorkingDirectory=/media/andedali/Data/bots/gateio_1h_bot
ExecStart=/media/andedali/Data/bots/gateio_1h_bot/.venv/bin/python /media/andedali/Data/bots/gateio_1h_bot/gateio_strategy.py --live --balance 10000
Restart=always
RestartSec=10
StandardOutput=append:/media/andedali/Data/bots/gateio_1h_bot/bot.log
StandardError=append:/media/andedali/Data/bots/gateio_1h_bot/bot_error.log
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start gateio-1h-bot
sudo systemctl enable gateio-1h-bot
sudo systemctl status gateio-1h-bot
Сбор данных. MultiTimeframeAnalyzer: загрузка и ресэмплинг.
class MultiTimeframeAnalyzer:
Эти значения ограничивают, как далеко в прошлое можно запросить данные для каждого таймфрейма, чтобы избежать перегрузки API или работы с устаревшими данными.
LOOKBACK_LIMITS: Dict[str, int] = {
'1h': 416 * 24 * 3600,
'4h': 993 * 24 * 3600,
'1d': 1826 * 24 * 3600,
}
def __init__(self, symbol: str, start: str, end: str):
self.symbol = symbol
self.start = start
self.end = end
self.data_1h = None
self.data_4h = None
self.data_1d = None
self.full = None
def _to_unix_ts(self, iso_str: str) -> int:
dt = pd.to_datetime(iso_str, utc=True)
return int(dt.timestamp())
def fetch_data(self):
cache_dir = 'data_cache'
os.makedirs(cache_dir, exist_ok=True)
# Обновляем end_date в JSON, если в live-режиме старый конец
config_path = 'strategy_params.json'
try:
with open(config_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
old_end = cfg.get('data_params', {}).get('end_date')
now_dt = pd.Timestamp.now(tz='UTC')
if old_end is None or pd.to_datetime(old_end, utc=True) < now_dt:
new_end = now_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
self.end = new_end
cfg.setdefault('data_params', {})['end_date'] = new_end
with open(config_path, 'w', encoding='utf-8') as fw:
json.dump(cfg, fw, ensure_ascii=False, indent=4)
except FileNotFoundError:
logging.warning(f"{config_path} не найден. Продолжаем без обновления end_date.")
except Exception as e:
logging.error(f"Ошибка обновления end_date: {e}")
# Возвращает pd.DataFrame — DataFrame с данными свечей.
def fetch_in_chunks(tf: str, interval_seconds: int) -> pd.DataFrame:
cache_dir = 'data_cache'
os.makedirs(cache_dir, exist_ok=True)
cache_file = os.path.join(cache_dir, f"{self.symbol}_{tf}.csv")
# Вспомогательные timestamps
raw_start_ts = self._to_unix_ts(self.start)
end_ts = self._to_unix_ts(self.end)
max_window = self.LOOKBACK_LIMITS[tf]
# Первый from не раньше, чем end_ts - max_window
first_from = max(raw_start_ts, end_ts - max_window)
logging.info(f"[{self.symbol}][{tf}] Начинаем с {pd.to_datetime(first_from, unit='s', utc=True)} до {pd.to_datetime(end_ts, unit='s', utc=True)}")
# Убедимся, что файл есть
if not os.path.exists(cache_file):
open(cache_file, 'w').close()
# Определяет вложенную функцию api_request, которая выполняет запрос к API Gate.io для получения свечей.
def api_request(from_unix: int) -> pd.DataFrame:
url = (
f"https://api.gateio.ws/api/v4/spot/candlesticks"
f"?currency_pair={self.symbol}"
f"&interval={tf}"
f"&from={from_unix}"
f"&limit=1000"
)
for attempt in range(3):
try:
resp = requests.get(url, timeout=10)
resp.raise_for_status()
data = resp.json()
if not data:
return pd.DataFrame()
df = pd.DataFrame(
data,
columns=[
'timestamp', # 0
'quote_volume', # 1
'close', # 2
'high', # 3
'low', # 4
'open', # 5
'base_volume', # 6
'is_closed' # 7
]
)
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(float), unit='s', utc=True)
df.set_index('timestamp', inplace=True)
ohlcv = df[['open', 'high', 'low', 'close', 'base_volume']].astype(float)
return ohlcv
except Exception as e:
logging.warning(f"[{self.symbol}][{tf}] Попытка {attempt+1}/3 не удалась: {e}")
time.sleep(2 ** attempt)
raise ConnectionError(f"[{self.symbol}][{tf}] Не удалось получить данные после 3 попыток.")
all_chunks = []
chunk_idx = 0
# Если в кеше уже есть данные — докачиваем с места последней свечи
if os.path.getsize(cache_file) > 0:
df_cache = pd.read_csv(cache_file, index_col='timestamp', parse_dates=True)
last_cached = int(df_cache.index[-1].timestamp())
if last_cached >= end_ts:
logging.info(f"[{self.symbol}][{tf}] Кеш полностью покрывает диапазон.")
return df_cache
all_chunks.append(df_cache)
current = last_cached + interval_seconds
logging.info(f"[{self.symbol}][{tf}] Докачка с {pd.to_datetime(current, unit='s', utc=True)}")
else:
current = first_from
# Цикл порционной загрузки
while current < end_ts:
df_chunk = api_request(current)
if df_chunk.empty:
logging.info(f"[{self.symbol}][{tf}] Пустой чанк при from={pd.to_datetime(current, unit='s', utc=True)}, выходим.")
break
chunk_idx += 1
t0 = df_chunk.index.min()
t1 = df_chunk.index.max()
logging.info(f"[{self.symbol}][{tf}] Чанк #{chunk_idx}: {len(df_chunk)} баров с {t0} по {t1}")
# Сразу дописываем в CSV
df_chunk.to_csv(
cache_file,
mode='a',
header=(chunk_idx == 1 and os.path.getsize(cache_file) == 0)
)
all_chunks.append(df_chunk)
current = int(t1.timestamp()) + interval_seconds
if not all_chunks:
raise ValueError(f"[{self.symbol}][{tf}] API вернул пустые данные.")
# Собираем и возвращаем единый DataFrame
df_full = pd.concat(all_chunks)
df_full = df_full[~df_full.index.duplicated(keep='first')]
df_full.sort_index(inplace=True)
return df_full
# Запросы для каждого таймфрейма
self.data_1h = fetch_in_chunks('1h', 3600)
self.data_4h = fetch_in_chunks('4h', 14400)
self.data_1d = fetch_in_chunks('1d', 86400)
def preprocess(self):
"""
Ресэмплинг 4h и 1d на 1h с shift, чтобы исключить утечку.
Добавлена проверка на пропущенные часовые бары.
"""
df_1h = self.data_1h.copy()
expected_index = pd.date_range(
start=df_1h.index[0],
end=df_1h.index[-1],
freq='1h',
tz='UTC'
)
missing = expected_index.difference(df_1h.index)
if not missing.empty:
logging.warning(f"[{self.symbol}] Пропущенные часы: {len(missing)} баров: {missing[:5]}...")
df_1h = df_1h.reindex(expected_index, method='ffill')
# 4h
df_4h_shift = self.data_4h.shift(1)
df_4h_h = df_4h_shift.resample('1h').ffill().reindex(df_1h.index, method='ffill')
df_4h_h.columns = [f"{col}_4h" for col in df_4h_h.columns]
# 1d
df_1d_shift = self.data_1d.shift(1)
df_1d_h = df_1d_shift.resample('1h').ffill().reindex(df_1h.index, method='ffill')
df_1d_h.columns = [f"{col}_1d" for col in df_1d_h.columns]
self.full = df_1h.join(df_4h_h, how='left').join(df_1d_h, how='left')
self.full.ffill(inplace=True)
self.full.dropna(inplace=True)
Проверить как далеко Gateio предоставляет данные.
import requests
import pandas as pd
from datetime import datetime, timedelta
import pytz
def _to_unix(iso_str: str) -> int:
dt = pd.to_datetime(iso_str, utc=True)
return int(dt.timestamp())
def _to_datetime(iso: int) -> int:
dt = pd.to_datetime(iso, unit='s', utc=True)
return str(dt)
def get_start_date(days_back: int) -> str:
"""
Возвращает дату, отняв указанное количество дней от текущей даты,
в формате 'YYYY-MM-DD HH:MM:SS+00:00'
Args:
days_back (int): Количество дней для вычитания
Returns:
str: Дата в формате ISO с UTC
"""
current_date = datetime.now(pytz.UTC)
start_date = current_date - timedelta(days=days_back)
return start_date.strftime('%Y-%m-%d %H:%M:%S+00:00')
start = get_start_date(1826)
start_unix = _to_unix(start)
start_unix_r = _to_datetime(start_unix)
print(start_unix_r)
symbol = 'KAS_USDT'
tf = '1d'
url = f"https://api.gateio.ws/api/v4/spot/candlesticks?from={start_unix}&limit=1000¤cy_pair={symbol}&interval={tf}"
resp = requests.get(url, timeout=10)
data = resp.json()
df_chunk = pd.DataFrame(data, columns=[
'timestamp', 'open', 'high', 'low', 'close',
'volume', 'base_volume', 'trade_count'
]
)
get_last = df_chunk['timestamp'].max()
print(get_last)
df_chunk['timestamp'] = pd.to_datetime(df_chunk['timestamp'].astype(float), unit='s', utc=True)
print(df_chunk)
df_chunk = df_chunk[['open', 'high', 'low', 'close', 'volume']].astype(float)
df_chunk.sort_index()
df_chunk.to_csv('123.csv')
Создание признаков (feature engineering) с учетом локальных экстремумов и отбора признаков (feature selection)
class EnhancedFeatureEngineer:
def __init__(self, params: dict):
self.params = params
@staticmethod
# Индекс Относительной Силы
# Измеряет скорость и силу изменения цен. Он показывает, насколько сильно и быстро актив
# рос или падал за заданный период, и помогает понять, перекуплен ли рынок или перепродан.
# 70 и выше → перекупленность
# 30 и ниже → перепроданность
def calculate_rsi(series: pd.Series, period: int = 14) -> pd.Series:
delta = series.diff()
up = delta.clip(lower=0)
down = -delta.clip(upper=0)
ma_up = up.rolling(window=period, min_periods=period).mean()
ma_down = down.rolling(window=period, min_periods=period).mean()
rs = ma_up / ma_down
rsi = 100 - (100 / (1 + rs))
return rsi
@staticmethod
# Текущий уровень волатильности
# Он не показывает направление тренда, только силу движений
# Если ATR растёт → увеличивается шанс сильных движений → ищем пробои, трендовые входы
# Если ATR низкий → рынок в боковике → применяем контртренд/флет-стратегии
# Размер стопа можно задавать как 1.5 * ATR, чтобы быть адаптивным
def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
high_low = df['high'] - df['low']
high_prev = (df['high'] - df['close'].shift(1)).abs()
low_prev = (df['low'] - df['close'].shift(1)).abs()
tr = pd.concat([high_low, high_prev, low_prev], axis=1).max(axis=1)
atr = tr.ewm(alpha=1 / period, adjust=False).mean()
return atr
@staticmethod
# Индикатор силы тренда
# ADX растёт — тренд усиливается.
# ADX падает — тренд теряет силу (вплоть до флэта).
# 0–20 Нет тренда / боковик
# 20–25 Начало возможного тренда
# 25–50 Сильный тренд
# 50–75 Очень сильный тренд
# 75–100 Крайне редкий, экстремальный
# Если ADX растёт и выше 25 → рынок трендовый → ищем пробой, тренд
# Если ADX падает и ниже 20 → рынок в боковике → работаем во флэт-режиме
def calculate_adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
high = df['high']
low = df['low']
close = df['close']
up_move = high.diff()
down_move = low.shift(1) - low
plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0.0)
minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0.0)
tr = EnhancedFeatureEngineer.calculate_atr(df, period)
plus_di = 100 * (plus_dm.ewm(alpha=1 / period, adjust=False).mean() / tr)
minus_di = 100 * (minus_dm.ewm(alpha=1 / period, adjust=False).mean() / tr)
dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
adx = dx.ewm(alpha=1 / period, adjust=False).mean()
return adx
@staticmethod
# Расчета полос Боллинджера
# Волатильностный канал вокруг скользящей средней.
# Он показывает, когда цена далеко отклонилась от "нормы"
# Цена у верхней полосы - Возможная перекупленность
# Цена у нижней полосы - Возможная перепроданность
# Полосы сужаются (узкие) - Волатильность упала, ожидается пробой
# Полосы расширяются - Началось сильное движение
def calculate_bollinger_bands(series: pd.Series, window: int = 20, n_std: int = 2) -> Tuple[pd.Series, pd.Series]:
sma = series.rolling(window=window, min_periods=window).mean()
std = series.rolling(window=window, min_periods=window).std()
upper = sma + (std * n_std)
lower = sma - (std * n_std)
return upper, lower
@staticmethod
# Метод для расчета стохастического осциллятора
# > 80 Перекупленность
# < 20 Перепроданность
# %K пересекает %D Потенциальный разворот
def calculate_stochastic_oscillator(df: pd.DataFrame, k_period: int = 14, d_period: int = 3) -> Tuple[pd.Series, pd.Series]:
low_min = df['low'].rolling(window=k_period, min_periods=k_period).min()
high_max = df['high'].rolling(window=k_period, min_periods=k_period).max()
k = 100 * (df['close'] - low_min) / (high_max - low_min)
d = k.rolling(window=d_period, min_periods=d_period).mean()
return k, d
@staticmethod
# Метод для расчета индикатора Ишимоку
# Tenkan пересекает Kijun снизу вверх - Потенциальный сигнал на покупку
# Цена выше обоих Span → облако снизу - Бычий тренд
# Цена ниже облака - Медвежий тренд
# Chikou пересекает цену вверх - Подтверждение лонга
# Senkou Span A > Span B - Облако зелёное → бычье
def calculate_ichimoku(df: pd.DataFrame) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
high = df['high']
low = df['low']
close = df['close']
high9 = high.rolling(window=9, min_periods=9).max()
low9 = low.rolling(window=9, min_periods=9).min()
tenkan = (high9 + low9) / 2
high26 = high.rolling(window=26, min_periods=26).max()
low26 = low.rolling(window=26, min_periods=26).min()
kijun = (high26 + low26) / 2
span_a = ((tenkan + kijun) / 2).shift(26)
high52 = high.rolling(window=52, min_periods=52).max()
low52 = low.rolling(window=52, min_periods=52).min()
span_b = ((high52 + low52) / 2).shift(26)
chikou = close.shift(-26)
return tenkan, kijun, span_a, span_b, chikou
def add_features(self, full: pd.DataFrame) -> pd.DataFrame:
df = full.copy()
# 1) Простые лаговые признаки: цены и объёмы
df['close_raw'] = df['close']
df['volume_raw'] = df['base_volume']
# 2) Локальные экстремумы на 1h
df['is_local_min'], df['is_local_max'] = find_local_extrema(df['close'], left=5, right=5)
# 3) Локальные экстремумы на 4h и перенос на 1h
df_4h = full[['close_4h']].copy().rename(columns={'close_4h': 'close'})
df_4h['is_min_4h'], df_4h['is_max_4h'] = find_local_extrema(df_4h['close'], left=3, right=3)
df_4h_ext = df_4h[['is_min_4h', 'is_max_4h']].resample('1h').ffill().reindex(df.index, method='ffill')
df = df.join(df_4h_ext)
# 4) Возвратные серии и волатильность
df['returns_1h'] = df['close'].pct_change()
df['volatility_1h'] = df['returns_1h'].rolling(window=24, min_periods=24).std()
df['returns_4h'] = df['close_4h'].pct_change()
df['volatility_4h'] = df['returns_4h'].rolling(window=10, min_periods=10).std()
# 5) ATR и ADX
df['atr_1h'] = EnhancedFeatureEngineer.calculate_atr(df[['high', 'low', 'close']], period=14)
df['atr_4h'] = EnhancedFeatureEngineer.calculate_atr(
df[['high_4h', 'low_4h', 'close_4h']].rename(
columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
), period=14
)
adx_df_4h = df[['high_4h', 'low_4h', 'close_4h']].rename(
columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
)
df['adx_4h'] = EnhancedFeatureEngineer.calculate_adx(adx_df_4h, period=14)
# 6) RSI, BB, Stochastic
df['rsi_1h'] = EnhancedFeatureEngineer.calculate_rsi(df['close'], period=14)
bb_upper, bb_lower = EnhancedFeatureEngineer.calculate_bollinger_bands(df['close'], window=20, n_std=2)
df['bb_upper_1h'] = bb_upper
df['bb_lower_1h'] = bb_lower
stoch_k, stoch_d = EnhancedFeatureEngineer.calculate_stochastic_oscillator(
df[['high', 'low', 'close']], k_period=14, d_period=3
)
df['stoch_k'] = stoch_k
df['stoch_d'] = stoch_d
# 7) Ichimoku (4h)
ichimoku_df_4h = df[['high_4h', 'low_4h', 'close_4h']].rename(
columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
)
tenkan, kijun, span_a, span_b, chikou = EnhancedFeatureEngineer.calculate_ichimoku(ichimoku_df_4h)
df['ichimoku_tenkan'] = tenkan.resample('1h').ffill().reindex(df.index, method='ffill')
df['ichimoku_kijun'] = kijun.resample('1h').ffill().reindex(df.index, method='ffill')
df['ichimoku_span_a'] = span_a.resample('1h').ffill().reindex(df.index, method='ffill')
df['ichimoku_span_b'] = span_b.resample('1h').ffill().reindex(df.index, method='ffill')
df['ichimoku_chikou'] = chikou.resample('1h').ffill().reindex(df.index, method='ffill')
# 8) MA на 1d и тренд-выравнивание
df['ma20_1d'] = df['close_1d'].rolling(window=20, min_periods=20).mean()
df['ma50_1d'] = df['close_1d'].rolling(window=50, min_periods=50).mean()
df['trend_alignment'] = (df['ma20_1d'] > df['ma50_1d']).astype(int)
df['momentum_ratio'] = df['close'] / df['close_4h']
# Удаляем промежуточные столбцы
df.drop(columns=['returns_1h', 'returns_4h'], inplace=True)
df.dropna(inplace=True)
# 9) Feature Selection: LightGBM Importance (если включено)
# "feature_selection": {
# "enabled": true,
# "n_estimators": 100,
# "percentile_drop": 25
# }
# Можно добавить для сортировки важных параметров
if self.params.get('feature_selection', {}).get('enabled', False):
target = self.params['feature_selection']['target_series']
if target is not None and target.nunique() > 1:
model_fs = lgb.LGBMClassifier(
n_estimators=self.params['feature_selection']['n_estimators'],
random_state=42,
**self.params.get('lgb_params', {})
)
common_idx = df.index.intersection(target.index)
X_fs = df.loc[common_idx].drop(columns=['close_raw', 'volume_raw'], errors='ignore')
y_fs = target.loc[common_idx]
model_fs.fit(X_fs, y_fs)
importances = pd.Series(model_fs.feature_importances_, index=X_fs.columns)
important = importances[importances > np.percentile(
importances, self.params['feature_selection']['percentile_drop']
)].index
df = df[important]
else:
logging.info("Feature selection пропущен — нет достаточных классов в целевой серии.")
return df
Сбор данных с bybit и выбор признаков
from pybit.unified_trading import HTTP
import os
import logging
import time
import pandas as pd
import json
from typing import Dict, Tuple
import numpy as np
import importlib
try:
import lightgbm as lgb
import sklearn
import matplotlib.pyplot as plt
import seaborn as sns
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
logging.warning(
"scikit-learn, matplotlib или seaborn не установлены. "
"Установите: pip install scikit-learn matplotlib seaborn"
)
print(
"scikit-learn, matplotlib или seaborn не установлены. "
"Установите: pip install scikit-learn matplotlib seaborn"
)
from dotenv import load_dotenv
# Настройка логирования
logging.basicConfig(
filename="bybit_data.log",
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
encoding='utf-8'
)
# Загружаем переменные окружения из .env
load_dotenv()
# Конфигурация API
API_KEY = os.getenv('API_Key2')
API_SECRET = os.getenv('API_Secret2')
TESTNET = False # Основная сеть
if not API_KEY or not API_SECRET:
raise ValueError("API_Key или API_Secret не найдены в .env файле.")
# Инициализация HTTP клиента
session = HTTP(
testnet=TESTNET,
api_key=API_KEY,
api_secret=API_SECRET
)
class BybitData:
LOOKBACK_LIMITS: Dict[str, int] = {
'1m': 30 * 24 * 3600,
'5m': 30 * 24 * 3600,
'15m': 30 * 24 * 3600
}
INTERVAL_SECONDS: Dict[str, int] = {
'1m': 60,
'5m': 300,
'15m': 900
}
def __init__(self, symbol: str, start: str, end: str):
self.symbol = symbol
self.start = start
self.end = end
self.data_1m = None
self.data_5m = None
self.data_15m = None
self.full = None
def _to_unix_ts(self, iso_str: str) -> int:
dt = pd.to_datetime(iso_str, utc=True)
return int(dt.timestamp() * 1000)
def fetch_data(self):
cache_dir = 'data_cache'
os.makedirs(cache_dir, exist_ok=True)
# Обновляем end_date в конфиге
config_path = 'bybit.json'
try:
with open(config_path, 'r', encoding='utf-8') as f:
cfg = json.load(f)
now_dt = pd.Timestamp.now(tz='UTC')
self.end = now_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
cfg['data_params']['end_date'] = self.end
with open(config_path, 'w', encoding='utf-8') as fw:
json.dump(cfg, fw, ensure_ascii=False, indent=4)
logging.info(f"[{self.symbol}] Обновили end_date в {config_path} до {self.end}")
except FileNotFoundError:
logging.warning(f"[{self.symbol}] {config_path} не найден. Используем заданный end_date: {self.end}")
except Exception as e:
logging.error(f"[{self.symbol}] Ошибка обновления end_date: {e}")
def fetch_in_chunks(tf: str, interval_seconds: int) -> pd.DataFrame:
cache_file = os.path.join(cache_dir, f"{self.symbol}_{tf}.csv")
raw_start_ts = self._to_unix_ts(self.start)
end_ts = self._to_unix_ts(self.end)
max_window = self.LOOKBACK_LIMITS[tf] * 1000
first_from = max(raw_start_ts, end_ts - max_window)
logging.info(
f"[{self.symbol}][{tf}] Начинаем с "
f"{pd.to_datetime(first_from // 1000, unit='s', utc=True)} "
f"до {pd.to_datetime(end_ts // 1000, unit='s', utc=True)}"
)
if not os.path.exists(cache_file):
open(cache_file, 'w').close()
def api_request(from_unix: int, to_unix: int) -> pd.DataFrame:
for attempt in range(5):
try:
response = session.get_kline(
category="spot",
symbol=self.symbol,
interval=tf.replace('m', ''),
start=from_unix,
end=to_unix,
limit=1000
)
if response.get("retCode") == 0:
data = response["result"]["list"]
if not data:
logging.warning(
f"[{self.symbol}][{tf}] Пустой чанк при "
f"from={pd.to_datetime(from_unix // 1000, unit='s', utc=True)} "
f"до {pd.to_datetime(to_unix // 1000, unit='s', utc=True)}"
)
return pd.DataFrame()
df = pd.DataFrame(
data,
columns=['timestamp', 'open', 'high', 'low', 'close', 'base_volume', 'quote_volume']
)
df['timestamp'] = pd.to_datetime(df['timestamp'].astype(float), unit='ms', utc=True)
df.set_index('timestamp', inplace=True)
df = df[['open', 'high', 'low', 'close', 'base_volume']].astype(float)
df.sort_index(inplace=True)
return df
else:
logging.warning(f"[{self.symbol}][{tf}] Ошибка API: {response.get('retMsg')}")
time.sleep(2 ** attempt)
except Exception as e:
logging.warning(f"[{self.symbol}][{tf}] Попытка {attempt+1}/5 не удалась: {e}")
time.sleep(2 ** attempt)
raise ConnectionError(f"[{self.symbol}][{tf}] Не удалось получить данные после 5 попыток.")
all_chunks = []
chunk_idx = 0
# Если есть кеш, докачиваем
if os.path.getsize(cache_file) > 0:
df_cache = pd.read_csv(cache_file, index_col='timestamp', parse_dates=True)
last_cached = int(df_cache.index[-1].timestamp() * 1000)
if last_cached >= end_ts:
logging.info(f"[{self.symbol}][{tf}] Кеш полностью покрывает диапазон.")
return df_cache
all_chunks.append(df_cache)
current = last_cached + interval_seconds * 1000
logging.info(
f"[{self.symbol}][{tf}] Докачка с "
f"{pd.to_datetime(current // 1000, unit='s', utc=True)}"
)
else:
current = first_from
# Основной цикл загрузки чанков
while current < end_ts:
to_unix = min(current + interval_seconds * 1000 * 1000, end_ts)
df_chunk = api_request(current, to_unix)
time.sleep(0.1)
if df_chunk.empty:
current = to_unix
continue
chunk_idx += 1
t0, t1 = df_chunk.index.min(), df_chunk.index.max()
logging.info(
f"[{self.symbol}][{tf}] Чанк #{chunk_idx}: "
f"{len(df_chunk)} баров с {t0} по {t1}"
)
df_chunk.to_csv(
cache_file,
mode='a',
header=(chunk_idx == 1 and os.path.getsize(cache_file) == 0)
)
all_chunks.append(df_chunk)
# Если мы дошли до конца запрошенного диапазона — выходим
if to_unix >= end_ts:
logging.info(f"[{self.symbol}][{tf}] Достигнут конец диапазона, выходим из цикла.")
break
# Старт следующего чанка = timestamp последнего бара
current = int(df_chunk.index.max().timestamp() * 1000)
if not all_chunks:
raise ValueError(f"[{self.symbol}][{tf}] API вернул пустые данные.")
df_full = pd.concat(all_chunks)
df_full = df_full[~df_full.index.duplicated(keep='first')]
df_full.sort_index(inplace=True)
df_full.to_csv(cache_file)
return df_full
self.data_1m = fetch_in_chunks('1m', self.INTERVAL_SECONDS['1m'])
self.data_5m = fetch_in_chunks('5m', self.INTERVAL_SECONDS['5m'])
self.data_15m = fetch_in_chunks('15m', self.INTERVAL_SECONDS['15m'])
def preprocess(self):
df_1m = self.data_1m.copy()
expected_index = pd.date_range(
start=df_1m.index[0],
end=df_1m.index[-1],
freq='1min',
tz='UTC'
)
# Проверка и заполнение пропусков
time_diffs = df_1m.index.to_series().diff().dt.total_seconds() / 60
if time_diffs.max() > 1:
large_gaps = time_diffs[time_diffs > 1]
logging.warning(
f"[{self.symbol}][1m] Обнаружены пропуски: {len(large_gaps)} разрывов, макс {large_gaps.max()} мин"
)
if large_gaps.max() > 60:
raise ValueError(f"[{self.symbol}][1m] Разрыв > 60 мин")
df_1m = df_1m.reindex(expected_index, method='ffill')
# Обработка нулевых объёмов
zeros = df_1m['base_volume'] == 0
if zeros.any():
logging.warning(f"[{self.symbol}] {zeros.sum()} баров с нулевым объёмом — заполняем средним")
df_1m.loc[zeros, 'base_volume'] = df_1m['base_volume'].rolling(window=3, min_periods=1).mean()
if (df_1m['close'] <= 0).any():
raise ValueError(f"[{self.symbol}] Обнаружены некорректные цены: <= 0")
# Проверки 5m и 15m аналогично…
# Объединение в единый DataFrame
df_5m_m = self.data_5m.shift(1).resample('1min').ffill().reindex(df_1m.index, method='ffill')
df_5m_m.columns = [f"{col}_5m" for col in df_5m_m.columns]
df_15m_m = self.data_15m.shift(1).resample('1min').ffill().reindex(df_1m.index, method='ffill')
df_15m_m.columns = [f"{col}_15m" for col in df_15m_m.columns]
self.full = df_1m.join(df_5m_m).join(df_15m_m)
self.full.ffill(inplace=True)
self.full.dropna(inplace=True)
self.full.to_csv(f"data_cache/{self.symbol}_full.csv", index_label='timestamp')
logging.info(f"[{self.symbol}] Объединённые данные сохранены")
return self.full
class EnhancedFeatureEngineer:
def __init__(self, params: dict):
self.params = params
@staticmethod
def calculate_rsi(series: pd.Series, period: int = 7) -> pd.Series:
delta = series.diff()
up = delta.clip(lower=0)
down = -delta.clip(upper=0)
ma_up = up.rolling(window=period, min_periods=period).mean()
ma_down = down.rolling(window=period, min_periods=period).mean()
rs = ma_up / ma_down
rsi = 100 - (100 / (1 + rs))
return rsi
@staticmethod
def calculate_atr(df: pd.DataFrame, period: int = 7) -> pd.Series:
high_low = df['high'] - df['low']
high_prev = (df['high'] - df['close'].shift(1)).abs()
low_prev = (df['low'] - df['close'].shift(1)).abs()
tr = pd.concat([high_low, high_prev, low_prev], axis=1).max(axis=1)
atr = tr.ewm(alpha=1 / period, adjust=False).mean()
return atr
@staticmethod
def calculate_bollinger_bands(series: pd.Series, window: int = 14, n_std: int = 2) -> Tuple[pd.Series, pd.Series]:
sma = series.rolling(window=window, min_periods=window).mean()
std = series.rolling(window=window, min_periods=window).std()
upper = sma + (std * n_std)
lower = sma - (std * n_std)
return upper, lower
@staticmethod
def calculate_ichimoku(df: pd.DataFrame, tenkan_period: int = 5, kijun_period: int = 13, senkou_period: int = 26) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
high = df['high']
low = df['low']
close = df['close']
high_tenkan = high.rolling(window=tenkan_period, min_periods=tenkan_period).max()
low_tenkan = low.rolling(window=tenkan_period, min_periods=tenkan_period).min()
tenkan = (high_tenkan + low_tenkan) / 2
high_kijun = high.rolling(window=kijun_period, min_periods=kijun_period).max()
low_kijun = low.rolling(window=kijun_period, min_periods=kijun_period).min()
kijun = (high_kijun + low_kijun) / 2
span_a = ((tenkan + kijun) / 2).shift(kijun_period)
high_senkou = high.rolling(window=senkou_period, min_periods=senkou_period).max()
low_senkou = low.rolling(window=senkou_period, min_periods=senkou_period).min()
span_b = ((high_senkou + low_senkou) / 2).shift(kijun_period)
chikou = close.shift(kijun_period)
return tenkan, kijun, span_a, span_b, chikou
@staticmethod
def find_local_extrema(series: pd.Series, left: int = 3, right: int = 3) -> Tuple[pd.Series, pd.Series]:
is_min = (series == series.rolling(window=left + right + 1, center=True).min())
is_max = (series == series.rolling(window=left + right + 1, center=True).max())
return is_min.astype(int), is_max.astype(int)
@staticmethod
def calculate_vwap(df: pd.DataFrame, window: int = 14) -> pd.Series:
typical_price = (df['high'] + df['low'] + df['close']) / 3
vwap = (typical_price * df['base_volume']).rolling(window=window).sum() / df['base_volume'].rolling(window=window).sum()
return vwap
@staticmethod
def calculate_volume_spike(df: pd.DataFrame, window: int = 10) -> pd.Series:
volume_ma = df['base_volume'].rolling(window=window, min_periods=window).mean()
return df['base_volume'] / volume_ma
@staticmethod
def calculate_price_momentum(df: pd.DataFrame, period: int = 5) -> pd.Series:
return df['close'].pct_change(periods=period)
@staticmethod
def calculate_spread(df: pd.DataFrame) -> pd.Series:
return df['high'] - df['low']
@staticmethod
def remove_correlated_features(corr_matrix, threshold=0.8):
to_drop = set()
for i in range(len(corr_matrix.columns)):
for j in range(i + 1, len(corr_matrix.columns)):
if abs(corr_matrix.iloc[i, j]) > threshold:
to_drop.add(corr_matrix.columns[j])
return to_drop
def add_features(self, full: pd.DataFrame, symbol: str) -> pd.DataFrame:
df = full.copy()
df['close_raw'] = df['close']
df['volume_raw'] = df['base_volume']
df['spread_1m'] = self.calculate_spread(df[['high', 'low']])
df['is_local_min_1m'], df['is_local_max_1m'] = self.find_local_extrema(df['close'], left=3, right=3)
df_5m = df[['close_5m']].rename(columns={'close_5m': 'close'})
df['is_local_min_5m'], df['is_local_max_5m'] = self.find_local_extrema(df_5m['close'], left=3, right=3)
df['momentum_1m'] = self.calculate_price_momentum(df, period=5)
df['momentum_5m'] = self.calculate_price_momentum(df[['close_5m']].rename(columns={'close_5m': 'close'}), period=5)
df['volume_spike_1m'] = self.calculate_volume_spike(df, window=10)
df['volume_spike_5m'] = self.calculate_volume_spike(df[['base_volume_5m']].rename(columns={'base_volume_5m': 'base_volume'}), window=10)
df['atr_1m'] = self.calculate_atr(df[['high', 'low', 'close']], period=7)
df['rsi_1m'] = self.calculate_rsi(df['close'], period=7)
df['rsi_5m'] = self.calculate_rsi(df['close_5m'], period=7)
bb_upper, bb_lower = self.calculate_bollinger_bands(df['close'], window=14, n_std=2)
df['bb_upper_1m'] = bb_upper
df['bb_lower_1m'] = bb_lower
df['vwap_1m'] = self.calculate_vwap(df[['high', 'low', 'close', 'base_volume']], window=14)
ichimoku_df_5m = df[['high_5m', 'low_5m', 'close_5m']].rename(columns={'high_5m': 'high', 'low_5m': 'low', 'close_5m': 'close'})
tenkan, _, _, _, _ = self.calculate_ichimoku(ichimoku_df_5m, tenkan_period=5, kijun_period=13, senkou_period=26)
df['ichimoku_tenkan_5m'] = tenkan.reindex(df.index, method='ffill')
df['ma20_15m'] = df['close_15m'].rolling(window=20, min_periods=20).mean()
df['trend_alignment_15m'] = (df['ma20_15m'] > df['close_15m'].rolling(window=50, min_periods=50).mean()).astype(int)
df['rsi_divergence_1m'] = (df['close'] / df['close'].shift(1) - 1) - (df['rsi_1m'] / df['rsi_1m'].shift(1) - 1)
df['target'] = (df['close'].shift(-5) > df['close']).astype(int)
if self.params.get('feature_selection', {}).get('enabled', False) and SKLEARN_AVAILABLE:
logging.info(f"[{full.index[-1]}] Проверка корреляции признаков")
feature_columns = [col for col in df.columns if col not in [
'open', 'high', 'low', 'close', 'base_volume',
'open_5m', 'high_5m', 'low_5m', 'close_5m', 'base_volume_5m',
'open_15m', 'high_15m', 'low_15m', 'close_15m', 'base_volume_15m',
'close_raw', 'volume_raw', 'target'
]]
corr_matrix = df[feature_columns].corr()
to_drop = self.remove_correlated_features(corr_matrix, threshold=0.8)
feature_columns = [col for col in feature_columns if col not in to_drop]
logging.info(f"[{full.index[-1]}] Исключены коррелирующие признаки: {to_drop}")
high_corr = corr_matrix.abs().stack().reset_index().rename(columns={0: 'correlation'})
high_corr = high_corr.loc[(high_corr['level_0'] != high_corr['level_1']) & (high_corr['correlation'] > 0.8)]
if not high_corr.empty:
logging.info(f"[{full.index[-1]}] Высокая корреляция между признаками:\n{high_corr}")
if importlib.util.find_spec('seaborn'):
plt.figure(figsize=(12, 10))
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, annot_kws={"size": 8})
plt.title(f"Correlation Matrix for {symbol}")
plt.tight_layout()
plt.savefig(f"data_cache/{symbol}_correlation_heatmap.png")
plt.close()
logging.info(f"[{full.index[-1]}] Корреляционная матрица сохранена в data_cache/{symbol}_correlation_heatmap.png")
logging.info(f"[{full.index[-1]}] Запуск Feature Selection с LightGBM")
X = df[feature_columns].dropna()
y = df['target'].loc[X.index]
if len(X) < 100 or y.nunique() <= 1:
logging.warning(f"[{full.index[-1]}] Недостаточно данных или классов для Feature Selection. Пропускаем.")
else:
model_fs = lgb.LGBMClassifier(
n_estimators=self.params['feature_selection']['n_estimators'],
**self.params['feature_selection'].get('lgb_params', {})
)
model_fs.fit(X, y)
importances = pd.Series(model_fs.feature_importances_, index=X.columns)
logging.info(f"[{full.index[-1]}] Важность признаков:\n{importances.sort_values(ascending=False)}")
important = importances[importances > np.percentile(importances, self.params['feature_selection']['percentile_drop'])].index
df = df[list(important) + ['close_raw', 'volume_raw', 'target']]
logging.info(f"[{full.index[-1]}] Отобраны признаки: {list(important)}")
if importlib.util.find_spec('matplotlib'):
plt.figure(figsize=(10, 6))
importances.sort_values().plot(kind='barh')
plt.title(f"Feature Importance for {symbol}")
plt.xlabel("Importance")
plt.tight_layout()
plt.savefig(f"data_cache/{symbol}_feature_importance.png")
plt.close()
logging.info(f"[{full.index[-1]}] График важности сохранён в data_cache/{symbol}_feature_importance.png")
df.dropna(inplace=True)
return df
if __name__ == "__main__":
config_path = 'bybit.json'
try:
with open(config_path, 'r', encoding='utf-8') as f:
params = json.load(f)
except FileNotFoundError:
logging.error(f"{config_path} не найден. Используем значения по умолчанию.")
params = {
"symbols": ["KASUSDT", "SOLUSDT"],
"data_params": {
"start_date": "2024-01-01T00:00:00Z",
"end_date": "2025-07-09T17:23:00Z"
},
"feature_selection": {
"enabled": True,
"n_estimators": 100,
"percentile_drop": 25,
"lgb_params": {
"learning_rate": 0.05,
"max_depth": 10,
"random_state": 42,
"class_weight": "balanced",
"min_child_samples": 30,
"reg_lambda": 0.2
}
}
}
for symbol in params['symbols']:
logging.info(f"Обработка {symbol}...")
try:
data_fetcher = BybitData(
symbol=symbol,
start=params['data_params']['start_date'],
end=params['data_params']['end_date']
)
data_fetcher.fetch_data()
data_fetcher.preprocess()
logging.info(f"Данные для {symbol} (последние 5 строк):\n{data_fetcher.full.tail()}")
engineer = EnhancedFeatureEngineer(params)
df_with_features = engineer.add_features(data_fetcher.full, symbol)
df_with_features.to_csv(f"data_cache/{symbol}_features.csv")
logging.info(f"Признаки сохранены в data_cache/{symbol}_features.csv")
logging.info(f"Признаки для {symbol} (последние 5 строк):\n{df_with_features.tail()}")
logging.info(f"Список признаков: {list(df_with_features.columns)}")
except ValueError as e:
logging.error(f"[{symbol}] Ошибка обработки: {e}")
continue
Комментарии
kaban1106 (09.07.2025, 04:06:47):
<script>alert("XSS")</script>