Lines
Slide
Slide
Slide
Slide
Slide
Slide
Slide
Slide

INVENTED
WORLDS

Торговый бот для крипты

Запустить на сервере

sudo nano /etc/systemd/system/gateio_1h_bot.service
[Unit]
Description=GateIO Trading Bot Service
After=network-online.target
Wants=network-online.target

[Service]
User=andedali
WorkingDirectory=/media/andedali/Data/bots/gateio_1h_bot
ExecStart=/media/andedali/Data/bots/gateio_1h_bot/.venv/bin/python /media/andedali/Data/bots/gateio_1h_bot/gateio_strategy.py --live --balance 10000
Restart=always
RestartSec=10
StandardOutput=append:/media/andedali/Data/bots/gateio_1h_bot/bot.log
StandardError=append:/media/andedali/Data/bots/gateio_1h_bot/bot_error.log

[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl start gateio-1h-bot
sudo systemctl enable gateio-1h-bot
sudo systemctl status gateio-1h-bot

Сбор данных. MultiTimeframeAnalyzer: загрузка и ресэмплинг.

class MultiTimeframeAnalyzer:

Эти значения ограничивают, как далеко в прошлое можно запросить данные для каждого таймфрейма, чтобы избежать перегрузки API или работы с устаревшими данными.

LOOKBACK_LIMITS: Dict[str, int] = {
        '1h': 416 * 24 * 3600,
        '4h': 993 * 24 * 3600,
        '1d': 1826 * 24 * 3600,
    }
def __init__(self, symbol: str, start: str, end: str):
        self.symbol = symbol
        self.start = start
        self.end = end
        self.data_1h = None
        self.data_4h = None
        self.data_1d = None
        self.full = None

    def _to_unix_ts(self, iso_str: str) -> int:
        dt = pd.to_datetime(iso_str, utc=True)
        return int(dt.timestamp())

    def fetch_data(self):
        cache_dir = 'data_cache'
        os.makedirs(cache_dir, exist_ok=True)

        # Обновляем end_date в JSON, если в live-режиме старый конец
        config_path = 'strategy_params.json'
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                cfg = json.load(f)
            old_end = cfg.get('data_params', {}).get('end_date')
            now_dt = pd.Timestamp.now(tz='UTC')
            if old_end is None or pd.to_datetime(old_end, utc=True) < now_dt:
                new_end = now_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
                self.end = new_end
                cfg.setdefault('data_params', {})['end_date'] = new_end
                with open(config_path, 'w', encoding='utf-8') as fw:
                    json.dump(cfg, fw, ensure_ascii=False, indent=4)
        except FileNotFoundError:
            logging.warning(f"{config_path} не найден. Продолжаем без обновления end_date.")
        except Exception as e:
            logging.error(f"Ошибка обновления end_date: {e}")

        # Возвращает pd.DataFrame — DataFrame с данными свечей.
        def fetch_in_chunks(tf: str, interval_seconds: int) -> pd.DataFrame:

            cache_dir = 'data_cache'
            os.makedirs(cache_dir, exist_ok=True)
            cache_file = os.path.join(cache_dir, f"{self.symbol}_{tf}.csv")

            # Вспомогательные timestamps
            raw_start_ts = self._to_unix_ts(self.start)
            end_ts       = self._to_unix_ts(self.end)
            max_window   = self.LOOKBACK_LIMITS[tf]

            # Первый from не раньше, чем end_ts - max_window
            first_from = max(raw_start_ts, end_ts - max_window)
            logging.info(f"[{self.symbol}][{tf}] Начинаем с {pd.to_datetime(first_from, unit='s', utc=True)} до {pd.to_datetime(end_ts, unit='s', utc=True)}")

            # Убедимся, что файл есть
            if not os.path.exists(cache_file):
                open(cache_file, 'w').close()

            # Определяет вложенную функцию api_request, которая выполняет запрос к API Gate.io для получения свечей.
            def api_request(from_unix: int) -> pd.DataFrame:
                url = (
                    f"https://api.gateio.ws/api/v4/spot/candlesticks"
                    f"?currency_pair={self.symbol}"
                    f"&interval={tf}"
                    f"&from={from_unix}"
                    f"&limit=1000"
                )
                for attempt in range(3):
                    try:
                        resp = requests.get(url, timeout=10)
                        resp.raise_for_status()
                        data = resp.json()
                        if not data:
                            return pd.DataFrame()
                        df = pd.DataFrame(
                            data,
                            columns=[
                                'timestamp',     # 0
                                'quote_volume',  # 1
                                'close',         # 2
                                'high',          # 3
                                'low',           # 4
                                'open',          # 5
                                'base_volume',   # 6
                                'is_closed'      # 7
                            ]
                        )
                        df['timestamp'] = pd.to_datetime(df['timestamp'].astype(float), unit='s', utc=True)
                        df.set_index('timestamp', inplace=True)
                        ohlcv = df[['open', 'high', 'low', 'close', 'base_volume']].astype(float)
                        return ohlcv
                    except Exception as e:
                        logging.warning(f"[{self.symbol}][{tf}] Попытка {attempt+1}/3 не удалась: {e}")
                        time.sleep(2 ** attempt)
                raise ConnectionError(f"[{self.symbol}][{tf}] Не удалось получить данные после 3 попыток.")

            all_chunks = []
            chunk_idx = 0

            # Если в кеше уже есть данные — докачиваем с места последней свечи
            if os.path.getsize(cache_file) > 0:
                df_cache = pd.read_csv(cache_file, index_col='timestamp', parse_dates=True)
                last_cached = int(df_cache.index[-1].timestamp())
                if last_cached >= end_ts:
                    logging.info(f"[{self.symbol}][{tf}] Кеш полностью покрывает диапазон.")
                    return df_cache
                all_chunks.append(df_cache)
                current = last_cached + interval_seconds
                logging.info(f"[{self.symbol}][{tf}] Докачка с {pd.to_datetime(current, unit='s', utc=True)}")
            else:
                current = first_from


            # Цикл порционной загрузки
            while current < end_ts:
                df_chunk = api_request(current)
                if df_chunk.empty:
                    logging.info(f"[{self.symbol}][{tf}] Пустой чанк при from={pd.to_datetime(current, unit='s', utc=True)}, выходим.")
                    break

                chunk_idx += 1
                t0 = df_chunk.index.min()
                t1 = df_chunk.index.max()
                logging.info(f"[{self.symbol}][{tf}] Чанк #{chunk_idx}: {len(df_chunk)} баров с {t0} по {t1}")

                # Сразу дописываем в CSV
                df_chunk.to_csv(
                    cache_file,
                    mode='a',
                    header=(chunk_idx == 1 and os.path.getsize(cache_file) == 0)
                )

                all_chunks.append(df_chunk)
                current = int(t1.timestamp()) + interval_seconds

            if not all_chunks:
                raise ValueError(f"[{self.symbol}][{tf}] API вернул пустые данные.")

            # Собираем и возвращаем единый DataFrame
            df_full = pd.concat(all_chunks)
            df_full = df_full[~df_full.index.duplicated(keep='first')]
            df_full.sort_index(inplace=True)
            return df_full
        
        # Запросы для каждого таймфрейма
        self.data_1h = fetch_in_chunks('1h', 3600)
        self.data_4h = fetch_in_chunks('4h', 14400)
        self.data_1d = fetch_in_chunks('1d', 86400)

    def preprocess(self):
        """
        Ресэмплинг 4h и 1d на 1h с shift, чтобы исключить утечку.
        Добавлена проверка на пропущенные часовые бары.
        """
        df_1h = self.data_1h.copy()
        expected_index = pd.date_range(
            start=df_1h.index[0],
            end=df_1h.index[-1],
            freq='1h',
            tz='UTC'
        )
        missing = expected_index.difference(df_1h.index)
        if not missing.empty:
            logging.warning(f"[{self.symbol}] Пропущенные часы: {len(missing)} баров: {missing[:5]}...")
            df_1h = df_1h.reindex(expected_index, method='ffill')

        # 4h
        df_4h_shift = self.data_4h.shift(1)
        df_4h_h = df_4h_shift.resample('1h').ffill().reindex(df_1h.index, method='ffill')
        df_4h_h.columns = [f"{col}_4h" for col in df_4h_h.columns]

        # 1d
        df_1d_shift = self.data_1d.shift(1)
        df_1d_h = df_1d_shift.resample('1h').ffill().reindex(df_1h.index, method='ffill')
        df_1d_h.columns = [f"{col}_1d" for col in df_1d_h.columns]

        self.full = df_1h.join(df_4h_h, how='left').join(df_1d_h, how='left')
        self.full.ffill(inplace=True)
        self.full.dropna(inplace=True)

Проверить как далеко Gateio предоставляет данные.

import requests
import pandas as pd
from datetime import datetime, timedelta
import pytz

def _to_unix(iso_str: str) -> int:
    dt = pd.to_datetime(iso_str, utc=True)
    return int(dt.timestamp())

def _to_datetime(iso: int) -> int:
    dt = pd.to_datetime(iso, unit='s', utc=True)
    return str(dt)

def get_start_date(days_back: int) -> str:
    """
    Возвращает дату, отняв указанное количество дней от текущей даты,
    в формате 'YYYY-MM-DD HH:MM:SS+00:00'
    
    Args:
        days_back (int): Количество дней для вычитания
        
    Returns:
        str: Дата в формате ISO с UTC
    """
    current_date = datetime.now(pytz.UTC)
    start_date = current_date - timedelta(days=days_back)
    return start_date.strftime('%Y-%m-%d %H:%M:%S+00:00')

start = get_start_date(1826)
start_unix = _to_unix(start)

start_unix_r = _to_datetime(start_unix)
print(start_unix_r)

symbol = 'KAS_USDT'
tf = '1d'
url = f"https://api.gateio.ws/api/v4/spot/candlesticks?from={start_unix}&limit=1000&currency_pair={symbol}&interval={tf}"
resp = requests.get(url, timeout=10)
data = resp.json()

df_chunk = pd.DataFrame(data, columns=[
                                'timestamp', 'open', 'high', 'low', 'close',
                                'volume', 'base_volume', 'trade_count'
                            ]
                        )

get_last = df_chunk['timestamp'].max()
print(get_last)
df_chunk['timestamp'] = pd.to_datetime(df_chunk['timestamp'].astype(float), unit='s', utc=True)
print(df_chunk)
df_chunk = df_chunk[['open', 'high', 'low', 'close', 'volume']].astype(float)
df_chunk.sort_index()
df_chunk.to_csv('123.csv')

Создание признаков (feature engineering) с учетом локальных экстремумов и отбора признаков (feature selection)

class EnhancedFeatureEngineer:
    def __init__(self, params: dict):
        self.params = params

    @staticmethod
    # Индекс Относительной Силы
    # Измеряет скорость и силу изменения цен. Он показывает, насколько сильно и быстро актив 
    # рос или падал за заданный период, и помогает понять, перекуплен ли рынок или перепродан.
    # 70 и выше → перекупленность
    # 30 и ниже → перепроданность
    def calculate_rsi(series: pd.Series, period: int = 14) -> pd.Series:
        delta = series.diff()
        up = delta.clip(lower=0)
        down = -delta.clip(upper=0)
        ma_up = up.rolling(window=period, min_periods=period).mean()
        ma_down = down.rolling(window=period, min_periods=period).mean()
        rs = ma_up / ma_down
        rsi = 100 - (100 / (1 + rs))
        return rsi

    @staticmethod
    # Текущий уровень волатильности
    # Он не показывает направление тренда, только силу движений
    # Если ATR растёт → увеличивается шанс сильных движений → ищем пробои, трендовые входы
    # Если ATR низкий → рынок в боковике → применяем контртренд/флет-стратегии
    # Размер стопа можно задавать как 1.5 * ATR, чтобы быть адаптивным
    def calculate_atr(df: pd.DataFrame, period: int = 14) -> pd.Series:
        high_low = df['high'] - df['low']
        high_prev = (df['high'] - df['close'].shift(1)).abs()
        low_prev = (df['low'] - df['close'].shift(1)).abs()
        tr = pd.concat([high_low, high_prev, low_prev], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1 / period, adjust=False).mean()
        return atr

    @staticmethod
    # Индикатор силы тренда
    # ADX растёт — тренд усиливается.
    # ADX падает — тренд теряет силу (вплоть до флэта).

    # 0–20 Нет тренда / боковик
    # 20–25	Начало возможного тренда
    # 25–50	Сильный тренд
    # 50–75	Очень сильный тренд
    # 75–100 Крайне редкий, экстремальный

    # Если ADX растёт и выше 25 → рынок трендовый → ищем пробой, тренд
    # Если ADX падает и ниже 20 → рынок в боковике → работаем во флэт-режиме
    def calculate_adx(df: pd.DataFrame, period: int = 14) -> pd.Series:
        high = df['high']
        low = df['low']
        close = df['close']
        up_move = high.diff()
        down_move = low.shift(1) - low
        plus_dm = up_move.where((up_move > down_move) & (up_move > 0), 0.0)
        minus_dm = down_move.where((down_move > up_move) & (down_move > 0), 0.0)
        tr = EnhancedFeatureEngineer.calculate_atr(df, period)
        plus_di = 100 * (plus_dm.ewm(alpha=1 / period, adjust=False).mean() / tr)
        minus_di = 100 * (minus_dm.ewm(alpha=1 / period, adjust=False).mean() / tr)
        dx = (abs(plus_di - minus_di) / (plus_di + minus_di)) * 100
        adx = dx.ewm(alpha=1 / period, adjust=False).mean()
        return adx

    @staticmethod
    # Расчета полос Боллинджера
    # Волатильностный канал вокруг скользящей средней.
    # Он показывает, когда цена далеко отклонилась от "нормы"

    # Цена у верхней полосы	- Возможная перекупленность
    # Цена у нижней полосы - Возможная перепроданность
    # Полосы сужаются (узкие) - Волатильность упала, ожидается пробой
    # Полосы расширяются - Началось сильное движение
    def calculate_bollinger_bands(series: pd.Series, window: int = 20, n_std: int = 2) -> Tuple[pd.Series, pd.Series]:
        sma = series.rolling(window=window, min_periods=window).mean()
        std = series.rolling(window=window, min_periods=window).std()
        upper = sma + (std * n_std)
        lower = sma - (std * n_std)
        return upper, lower

    @staticmethod
    # Метод для расчета стохастического осциллятора
    # > 80	Перекупленность
    # < 20	Перепроданность
    # %K пересекает %D	Потенциальный разворот
    def calculate_stochastic_oscillator(df: pd.DataFrame, k_period: int = 14, d_period: int = 3) -> Tuple[pd.Series, pd.Series]:
        low_min = df['low'].rolling(window=k_period, min_periods=k_period).min()
        high_max = df['high'].rolling(window=k_period, min_periods=k_period).max()
        k = 100 * (df['close'] - low_min) / (high_max - low_min)
        d = k.rolling(window=d_period, min_periods=d_period).mean()
        return k, d

    @staticmethod
    # Метод для расчета индикатора Ишимоку
    # Tenkan пересекает Kijun снизу вверх - Потенциальный сигнал на покупку
    # Цена выше обоих Span → облако снизу - Бычий тренд
    # Цена ниже облака - Медвежий тренд
    # Chikou пересекает цену вверх - Подтверждение лонга
    # Senkou Span A > Span B - Облако зелёное → бычье
    def calculate_ichimoku(df: pd.DataFrame) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
        high = df['high']
        low = df['low']
        close = df['close']
        high9 = high.rolling(window=9, min_periods=9).max()
        low9 = low.rolling(window=9, min_periods=9).min()
        tenkan = (high9 + low9) / 2
        high26 = high.rolling(window=26, min_periods=26).max()
        low26 = low.rolling(window=26, min_periods=26).min()
        kijun = (high26 + low26) / 2
        span_a = ((tenkan + kijun) / 2).shift(26)
        high52 = high.rolling(window=52, min_periods=52).max()
        low52 = low.rolling(window=52, min_periods=52).min()
        span_b = ((high52 + low52) / 2).shift(26)
        chikou = close.shift(-26)
        return tenkan, kijun, span_a, span_b, chikou

    def add_features(self, full: pd.DataFrame) -> pd.DataFrame:
        df = full.copy()

        # 1) Простые лаговые признаки: цены и объёмы
        df['close_raw'] = df['close']
        df['volume_raw'] = df['base_volume']

        # 2) Локальные экстремумы на 1h
        df['is_local_min'], df['is_local_max'] = find_local_extrema(df['close'], left=5, right=5)

        # 3) Локальные экстремумы на 4h и перенос на 1h
        df_4h = full[['close_4h']].copy().rename(columns={'close_4h': 'close'})
        df_4h['is_min_4h'], df_4h['is_max_4h'] = find_local_extrema(df_4h['close'], left=3, right=3)
        df_4h_ext = df_4h[['is_min_4h', 'is_max_4h']].resample('1h').ffill().reindex(df.index, method='ffill')
        df = df.join(df_4h_ext)

        # 4) Возвратные серии и волатильность
        df['returns_1h'] = df['close'].pct_change()
        df['volatility_1h'] = df['returns_1h'].rolling(window=24, min_periods=24).std()
        df['returns_4h'] = df['close_4h'].pct_change()
        df['volatility_4h'] = df['returns_4h'].rolling(window=10, min_periods=10).std()

        # 5) ATR и ADX
        df['atr_1h'] = EnhancedFeatureEngineer.calculate_atr(df[['high', 'low', 'close']], period=14)
        df['atr_4h'] = EnhancedFeatureEngineer.calculate_atr(
            df[['high_4h', 'low_4h', 'close_4h']].rename(
                columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
            ), period=14
        )
        adx_df_4h = df[['high_4h', 'low_4h', 'close_4h']].rename(
            columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
        )
        df['adx_4h'] = EnhancedFeatureEngineer.calculate_adx(adx_df_4h, period=14)

        # 6) RSI, BB, Stochastic
        df['rsi_1h'] = EnhancedFeatureEngineer.calculate_rsi(df['close'], period=14)
        bb_upper, bb_lower = EnhancedFeatureEngineer.calculate_bollinger_bands(df['close'], window=20, n_std=2)
        df['bb_upper_1h'] = bb_upper
        df['bb_lower_1h'] = bb_lower
        stoch_k, stoch_d = EnhancedFeatureEngineer.calculate_stochastic_oscillator(
            df[['high', 'low', 'close']], k_period=14, d_period=3
        )
        df['stoch_k'] = stoch_k
        df['stoch_d'] = stoch_d

        # 7) Ichimoku (4h)
        ichimoku_df_4h = df[['high_4h', 'low_4h', 'close_4h']].rename(
            columns={'high_4h': 'high', 'low_4h': 'low', 'close_4h': 'close'}
        )
        tenkan, kijun, span_a, span_b, chikou = EnhancedFeatureEngineer.calculate_ichimoku(ichimoku_df_4h)
        df['ichimoku_tenkan'] = tenkan.resample('1h').ffill().reindex(df.index, method='ffill')
        df['ichimoku_kijun'] = kijun.resample('1h').ffill().reindex(df.index, method='ffill')
        df['ichimoku_span_a'] = span_a.resample('1h').ffill().reindex(df.index, method='ffill')
        df['ichimoku_span_b'] = span_b.resample('1h').ffill().reindex(df.index, method='ffill')
        df['ichimoku_chikou'] = chikou.resample('1h').ffill().reindex(df.index, method='ffill')

        # 8) MA на 1d и тренд-выравнивание
        df['ma20_1d'] = df['close_1d'].rolling(window=20, min_periods=20).mean()
        df['ma50_1d'] = df['close_1d'].rolling(window=50, min_periods=50).mean()
        df['trend_alignment'] = (df['ma20_1d'] > df['ma50_1d']).astype(int)
        df['momentum_ratio'] = df['close'] / df['close_4h']

        # Удаляем промежуточные столбцы
        df.drop(columns=['returns_1h', 'returns_4h'], inplace=True)
        df.dropna(inplace=True)

        # 9) Feature Selection: LightGBM Importance (если включено)
        # "feature_selection": {
        #     "enabled": true,
        #     "n_estimators": 100,
        #     "percentile_drop": 25
        # }
        # Можно добавить для сортировки важных параметров
        if self.params.get('feature_selection', {}).get('enabled', False):
            target = self.params['feature_selection']['target_series']
            if target is not None and target.nunique() > 1:
                model_fs = lgb.LGBMClassifier(
                    n_estimators=self.params['feature_selection']['n_estimators'],
                    random_state=42,
                    **self.params.get('lgb_params', {})
                )
                common_idx = df.index.intersection(target.index)
                X_fs = df.loc[common_idx].drop(columns=['close_raw', 'volume_raw'], errors='ignore')
                y_fs = target.loc[common_idx]
                model_fs.fit(X_fs, y_fs)
                importances = pd.Series(model_fs.feature_importances_, index=X_fs.columns)
                important = importances[importances > np.percentile(
                    importances, self.params['feature_selection']['percentile_drop']
                )].index
                df = df[important]
            else:
                logging.info("Feature selection пропущен — нет достаточных классов в целевой серии.")
        return df

Сбор данных с bybit и выбор признаков

from pybit.unified_trading import HTTP
import os
import logging
import time
import pandas as pd
import json
from typing import Dict, Tuple
import numpy as np

import importlib

try:
    import lightgbm as lgb
    import sklearn
    import matplotlib.pyplot as plt
    import seaborn as sns
    SKLEARN_AVAILABLE = True
except ImportError:
    SKLEARN_AVAILABLE = False
    logging.warning(
        "scikit-learn, matplotlib или seaborn не установлены. "
        "Установите: pip install scikit-learn matplotlib seaborn"
    )
    print(
        "scikit-learn, matplotlib или seaborn не установлены. "
        "Установите: pip install scikit-learn matplotlib seaborn"
    )

from dotenv import load_dotenv

# Настройка логирования
logging.basicConfig(
    filename="bybit_data.log",
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
    encoding='utf-8'
)

# Загружаем переменные окружения из .env
load_dotenv()

# Конфигурация API
API_KEY = os.getenv('API_Key2')
API_SECRET = os.getenv('API_Secret2')
TESTNET = False  # Основная сеть

if not API_KEY or not API_SECRET:
    raise ValueError("API_Key или API_Secret не найдены в .env файле.")

# Инициализация HTTP клиента
session = HTTP(
    testnet=TESTNET,
    api_key=API_KEY,
    api_secret=API_SECRET
)


class BybitData:
    LOOKBACK_LIMITS: Dict[str, int] = {
        '1m': 30 * 24 * 3600,
        '5m': 30 * 24 * 3600,
        '15m': 30 * 24 * 3600
    }
    INTERVAL_SECONDS: Dict[str, int] = {
        '1m': 60,
        '5m': 300,
        '15m': 900
    }

    def __init__(self, symbol: str, start: str, end: str):
        self.symbol = symbol
        self.start = start
        self.end = end
        self.data_1m = None
        self.data_5m = None
        self.data_15m = None
        self.full = None

    def _to_unix_ts(self, iso_str: str) -> int:
        dt = pd.to_datetime(iso_str, utc=True)
        return int(dt.timestamp() * 1000)

    def fetch_data(self):
        cache_dir = 'data_cache'
        os.makedirs(cache_dir, exist_ok=True)

        # Обновляем end_date в конфиге
        config_path = 'bybit.json'
        try:
            with open(config_path, 'r', encoding='utf-8') as f:
                cfg = json.load(f)
            now_dt = pd.Timestamp.now(tz='UTC')
            self.end = now_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
            cfg['data_params']['end_date'] = self.end
            with open(config_path, 'w', encoding='utf-8') as fw:
                json.dump(cfg, fw, ensure_ascii=False, indent=4)
            logging.info(f"[{self.symbol}] Обновили end_date в {config_path} до {self.end}")
        except FileNotFoundError:
            logging.warning(f"[{self.symbol}] {config_path} не найден. Используем заданный end_date: {self.end}")
        except Exception as e:
            logging.error(f"[{self.symbol}] Ошибка обновления end_date: {e}")

        def fetch_in_chunks(tf: str, interval_seconds: int) -> pd.DataFrame:
            cache_file = os.path.join(cache_dir, f"{self.symbol}_{tf}.csv")
            raw_start_ts = self._to_unix_ts(self.start)
            end_ts = self._to_unix_ts(self.end)
            max_window = self.LOOKBACK_LIMITS[tf] * 1000
            first_from = max(raw_start_ts, end_ts - max_window)
            logging.info(
                f"[{self.symbol}][{tf}] Начинаем с "
                f"{pd.to_datetime(first_from // 1000, unit='s', utc=True)} "
                f"до {pd.to_datetime(end_ts // 1000, unit='s', utc=True)}"
            )

            if not os.path.exists(cache_file):
                open(cache_file, 'w').close()

            def api_request(from_unix: int, to_unix: int) -> pd.DataFrame:
                for attempt in range(5):
                    try:
                        response = session.get_kline(
                            category="spot",
                            symbol=self.symbol,
                            interval=tf.replace('m', ''),
                            start=from_unix,
                            end=to_unix,
                            limit=1000
                        )
                        if response.get("retCode") == 0:
                            data = response["result"]["list"]
                            if not data:
                                logging.warning(
                                    f"[{self.symbol}][{tf}] Пустой чанк при "
                                    f"from={pd.to_datetime(from_unix // 1000, unit='s', utc=True)} "
                                    f"до {pd.to_datetime(to_unix // 1000, unit='s', utc=True)}"
                                )
                                return pd.DataFrame()
                            df = pd.DataFrame(
                                data,
                                columns=['timestamp', 'open', 'high', 'low', 'close', 'base_volume', 'quote_volume']
                            )
                            df['timestamp'] = pd.to_datetime(df['timestamp'].astype(float), unit='ms', utc=True)
                            df.set_index('timestamp', inplace=True)
                            df = df[['open', 'high', 'low', 'close', 'base_volume']].astype(float)
                            df.sort_index(inplace=True)
                            return df
                        else:
                            logging.warning(f"[{self.symbol}][{tf}] Ошибка API: {response.get('retMsg')}")
                            time.sleep(2 ** attempt)
                    except Exception as e:
                        logging.warning(f"[{self.symbol}][{tf}] Попытка {attempt+1}/5 не удалась: {e}")
                        time.sleep(2 ** attempt)
                raise ConnectionError(f"[{self.symbol}][{tf}] Не удалось получить данные после 5 попыток.")

            all_chunks = []
            chunk_idx = 0

            # Если есть кеш, докачиваем
            if os.path.getsize(cache_file) > 0:
                df_cache = pd.read_csv(cache_file, index_col='timestamp', parse_dates=True)
                last_cached = int(df_cache.index[-1].timestamp() * 1000)
                if last_cached >= end_ts:
                    logging.info(f"[{self.symbol}][{tf}] Кеш полностью покрывает диапазон.")
                    return df_cache
                all_chunks.append(df_cache)
                current = last_cached + interval_seconds * 1000
                logging.info(
                    f"[{self.symbol}][{tf}] Докачка с "
                    f"{pd.to_datetime(current // 1000, unit='s', utc=True)}"
                )
            else:
                current = first_from

            # Основной цикл загрузки чанков
            while current < end_ts:
                to_unix = min(current + interval_seconds * 1000 * 1000, end_ts)
                df_chunk = api_request(current, to_unix)
                time.sleep(0.1)
                if df_chunk.empty:
                    current = to_unix
                    continue

                chunk_idx += 1
                t0, t1 = df_chunk.index.min(), df_chunk.index.max()
                logging.info(
                    f"[{self.symbol}][{tf}] Чанк #{chunk_idx}: "
                    f"{len(df_chunk)} баров с {t0} по {t1}"
                )
                df_chunk.to_csv(
                    cache_file,
                    mode='a',
                    header=(chunk_idx == 1 and os.path.getsize(cache_file) == 0)
                )
                all_chunks.append(df_chunk)

                # Если мы дошли до конца запрошенного диапазона — выходим
                if to_unix >= end_ts:
                    logging.info(f"[{self.symbol}][{tf}] Достигнут конец диапазона, выходим из цикла.")
                    break

                # Старт следующего чанка = timestamp последнего бара
                current = int(df_chunk.index.max().timestamp() * 1000)

            if not all_chunks:
                raise ValueError(f"[{self.symbol}][{tf}] API вернул пустые данные.")

            df_full = pd.concat(all_chunks)
            df_full = df_full[~df_full.index.duplicated(keep='first')]
            df_full.sort_index(inplace=True)
            df_full.to_csv(cache_file)
            return df_full

        self.data_1m = fetch_in_chunks('1m', self.INTERVAL_SECONDS['1m'])
        self.data_5m = fetch_in_chunks('5m', self.INTERVAL_SECONDS['5m'])
        self.data_15m = fetch_in_chunks('15m', self.INTERVAL_SECONDS['15m'])

    def preprocess(self):
        df_1m = self.data_1m.copy()
        expected_index = pd.date_range(
            start=df_1m.index[0],
            end=df_1m.index[-1],
            freq='1min',
            tz='UTC'
        )

        # Проверка и заполнение пропусков
        time_diffs = df_1m.index.to_series().diff().dt.total_seconds() / 60
        if time_diffs.max() > 1:
            large_gaps = time_diffs[time_diffs > 1]
            logging.warning(
                f"[{self.symbol}][1m] Обнаружены пропуски: {len(large_gaps)} разрывов, макс {large_gaps.max()} мин"
            )
            if large_gaps.max() > 60:
                raise ValueError(f"[{self.symbol}][1m] Разрыв > 60 мин")
        df_1m = df_1m.reindex(expected_index, method='ffill')

        # Обработка нулевых объёмов
        zeros = df_1m['base_volume'] == 0
        if zeros.any():
            logging.warning(f"[{self.symbol}] {zeros.sum()} баров с нулевым объёмом — заполняем средним")
            df_1m.loc[zeros, 'base_volume'] = df_1m['base_volume'].rolling(window=3, min_periods=1).mean()

        if (df_1m['close'] <= 0).any():
            raise ValueError(f"[{self.symbol}] Обнаружены некорректные цены: <= 0")

        # Проверки 5m и 15m аналогично…

        # Объединение в единый DataFrame
        df_5m_m = self.data_5m.shift(1).resample('1min').ffill().reindex(df_1m.index, method='ffill')
        df_5m_m.columns = [f"{col}_5m" for col in df_5m_m.columns]
        df_15m_m = self.data_15m.shift(1).resample('1min').ffill().reindex(df_1m.index, method='ffill')
        df_15m_m.columns = [f"{col}_15m" for col in df_15m_m.columns]

        self.full = df_1m.join(df_5m_m).join(df_15m_m)
        self.full.ffill(inplace=True)
        self.full.dropna(inplace=True)
        self.full.to_csv(f"data_cache/{self.symbol}_full.csv", index_label='timestamp')
        logging.info(f"[{self.symbol}] Объединённые данные сохранены")

        return self.full


class EnhancedFeatureEngineer:
    def __init__(self, params: dict):
        self.params = params

    @staticmethod
    def calculate_rsi(series: pd.Series, period: int = 7) -> pd.Series:
        delta = series.diff()
        up = delta.clip(lower=0)
        down = -delta.clip(upper=0)
        ma_up = up.rolling(window=period, min_periods=period).mean()
        ma_down = down.rolling(window=period, min_periods=period).mean()
        rs = ma_up / ma_down
        rsi = 100 - (100 / (1 + rs))
        return rsi

    @staticmethod
    def calculate_atr(df: pd.DataFrame, period: int = 7) -> pd.Series:
        high_low = df['high'] - df['low']
        high_prev = (df['high'] - df['close'].shift(1)).abs()
        low_prev = (df['low'] - df['close'].shift(1)).abs()
        tr = pd.concat([high_low, high_prev, low_prev], axis=1).max(axis=1)
        atr = tr.ewm(alpha=1 / period, adjust=False).mean()
        return atr

    @staticmethod
    def calculate_bollinger_bands(series: pd.Series, window: int = 14, n_std: int = 2) -> Tuple[pd.Series, pd.Series]:
        sma = series.rolling(window=window, min_periods=window).mean()
        std = series.rolling(window=window, min_periods=window).std()
        upper = sma + (std * n_std)
        lower = sma - (std * n_std)
        return upper, lower

    @staticmethod
    def calculate_ichimoku(df: pd.DataFrame, tenkan_period: int = 5, kijun_period: int = 13, senkou_period: int = 26) -> Tuple[pd.Series, pd.Series, pd.Series, pd.Series, pd.Series]:
        high = df['high']
        low = df['low']
        close = df['close']
        high_tenkan = high.rolling(window=tenkan_period, min_periods=tenkan_period).max()
        low_tenkan = low.rolling(window=tenkan_period, min_periods=tenkan_period).min()
        tenkan = (high_tenkan + low_tenkan) / 2
        high_kijun = high.rolling(window=kijun_period, min_periods=kijun_period).max()
        low_kijun = low.rolling(window=kijun_period, min_periods=kijun_period).min()
        kijun = (high_kijun + low_kijun) / 2
        span_a = ((tenkan + kijun) / 2).shift(kijun_period)
        high_senkou = high.rolling(window=senkou_period, min_periods=senkou_period).max()
        low_senkou = low.rolling(window=senkou_period, min_periods=senkou_period).min()
        span_b = ((high_senkou + low_senkou) / 2).shift(kijun_period)
        chikou = close.shift(kijun_period)
        return tenkan, kijun, span_a, span_b, chikou

    @staticmethod
    def find_local_extrema(series: pd.Series, left: int = 3, right: int = 3) -> Tuple[pd.Series, pd.Series]:
        is_min = (series == series.rolling(window=left + right + 1, center=True).min())
        is_max = (series == series.rolling(window=left + right + 1, center=True).max())
        return is_min.astype(int), is_max.astype(int)

    @staticmethod
    def calculate_vwap(df: pd.DataFrame, window: int = 14) -> pd.Series:
        typical_price = (df['high'] + df['low'] + df['close']) / 3
        vwap = (typical_price * df['base_volume']).rolling(window=window).sum() / df['base_volume'].rolling(window=window).sum()
        return vwap

    @staticmethod
    def calculate_volume_spike(df: pd.DataFrame, window: int = 10) -> pd.Series:
        volume_ma = df['base_volume'].rolling(window=window, min_periods=window).mean()
        return df['base_volume'] / volume_ma

    @staticmethod
    def calculate_price_momentum(df: pd.DataFrame, period: int = 5) -> pd.Series:
        return df['close'].pct_change(periods=period)

    @staticmethod
    def calculate_spread(df: pd.DataFrame) -> pd.Series:
        return df['high'] - df['low']

    @staticmethod
    def remove_correlated_features(corr_matrix, threshold=0.8):
        to_drop = set()
        for i in range(len(corr_matrix.columns)):
            for j in range(i + 1, len(corr_matrix.columns)):
                if abs(corr_matrix.iloc[i, j]) > threshold:
                    to_drop.add(corr_matrix.columns[j])
        return to_drop

    def add_features(self, full: pd.DataFrame, symbol: str) -> pd.DataFrame:
        df = full.copy()
        df['close_raw'] = df['close']
        df['volume_raw'] = df['base_volume']
        df['spread_1m'] = self.calculate_spread(df[['high', 'low']])
        df['is_local_min_1m'], df['is_local_max_1m'] = self.find_local_extrema(df['close'], left=3, right=3)
        df_5m = df[['close_5m']].rename(columns={'close_5m': 'close'})
        df['is_local_min_5m'], df['is_local_max_5m'] = self.find_local_extrema(df_5m['close'], left=3, right=3)
        df['momentum_1m'] = self.calculate_price_momentum(df, period=5)
        df['momentum_5m'] = self.calculate_price_momentum(df[['close_5m']].rename(columns={'close_5m': 'close'}), period=5)
        df['volume_spike_1m'] = self.calculate_volume_spike(df, window=10)
        df['volume_spike_5m'] = self.calculate_volume_spike(df[['base_volume_5m']].rename(columns={'base_volume_5m': 'base_volume'}), window=10)
        df['atr_1m'] = self.calculate_atr(df[['high', 'low', 'close']], period=7)
        df['rsi_1m'] = self.calculate_rsi(df['close'], period=7)
        df['rsi_5m'] = self.calculate_rsi(df['close_5m'], period=7)
        bb_upper, bb_lower = self.calculate_bollinger_bands(df['close'], window=14, n_std=2)
        df['bb_upper_1m'] = bb_upper
        df['bb_lower_1m'] = bb_lower
        df['vwap_1m'] = self.calculate_vwap(df[['high', 'low', 'close', 'base_volume']], window=14)
        ichimoku_df_5m = df[['high_5m', 'low_5m', 'close_5m']].rename(columns={'high_5m': 'high', 'low_5m': 'low', 'close_5m': 'close'})
        tenkan, _, _, _, _ = self.calculate_ichimoku(ichimoku_df_5m, tenkan_period=5, kijun_period=13, senkou_period=26)
        df['ichimoku_tenkan_5m'] = tenkan.reindex(df.index, method='ffill')
        df['ma20_15m'] = df['close_15m'].rolling(window=20, min_periods=20).mean()
        df['trend_alignment_15m'] = (df['ma20_15m'] > df['close_15m'].rolling(window=50, min_periods=50).mean()).astype(int)
        df['rsi_divergence_1m'] = (df['close'] / df['close'].shift(1) - 1) - (df['rsi_1m'] / df['rsi_1m'].shift(1) - 1)
        df['target'] = (df['close'].shift(-5) > df['close']).astype(int)

        if self.params.get('feature_selection', {}).get('enabled', False) and SKLEARN_AVAILABLE:
            logging.info(f"[{full.index[-1]}] Проверка корреляции признаков")
            feature_columns = [col for col in df.columns if col not in [
                'open', 'high', 'low', 'close', 'base_volume',
                'open_5m', 'high_5m', 'low_5m', 'close_5m', 'base_volume_5m',
                'open_15m', 'high_15m', 'low_15m', 'close_15m', 'base_volume_15m',
                'close_raw', 'volume_raw', 'target'
            ]]
            corr_matrix = df[feature_columns].corr()
            to_drop = self.remove_correlated_features(corr_matrix, threshold=0.8)
            feature_columns = [col for col in feature_columns if col not in to_drop]
            logging.info(f"[{full.index[-1]}] Исключены коррелирующие признаки: {to_drop}")
            high_corr = corr_matrix.abs().stack().reset_index().rename(columns={0: 'correlation'})
            high_corr = high_corr.loc[(high_corr['level_0'] != high_corr['level_1']) & (high_corr['correlation'] > 0.8)]
            
            if not high_corr.empty:
                logging.info(f"[{full.index[-1]}] Высокая корреляция между признаками:\n{high_corr}")

            if importlib.util.find_spec('seaborn'):
                plt.figure(figsize=(12, 10))
                sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, annot_kws={"size": 8})
                plt.title(f"Correlation Matrix for {symbol}")
                plt.tight_layout()
                plt.savefig(f"data_cache/{symbol}_correlation_heatmap.png")
                plt.close()
                logging.info(f"[{full.index[-1]}] Корреляционная матрица сохранена в data_cache/{symbol}_correlation_heatmap.png")

            logging.info(f"[{full.index[-1]}] Запуск Feature Selection с LightGBM")
            X = df[feature_columns].dropna()
            y = df['target'].loc[X.index]
            if len(X) < 100 or y.nunique() <= 1:
                logging.warning(f"[{full.index[-1]}] Недостаточно данных или классов для Feature Selection. Пропускаем.")
            else:
                model_fs = lgb.LGBMClassifier(
                    n_estimators=self.params['feature_selection']['n_estimators'],
                    **self.params['feature_selection'].get('lgb_params', {})
                )
                model_fs.fit(X, y)
                importances = pd.Series(model_fs.feature_importances_, index=X.columns)
                logging.info(f"[{full.index[-1]}] Важность признаков:\n{importances.sort_values(ascending=False)}")
                important = importances[importances > np.percentile(importances, self.params['feature_selection']['percentile_drop'])].index
                df = df[list(important) + ['close_raw', 'volume_raw', 'target']]
                logging.info(f"[{full.index[-1]}] Отобраны признаки: {list(important)}")

                if importlib.util.find_spec('matplotlib'):
                    plt.figure(figsize=(10, 6))
                    importances.sort_values().plot(kind='barh')
                    plt.title(f"Feature Importance for {symbol}")
                    plt.xlabel("Importance")
                    plt.tight_layout()
                    plt.savefig(f"data_cache/{symbol}_feature_importance.png")
                    plt.close()
                    logging.info(f"[{full.index[-1]}] График важности сохранён в data_cache/{symbol}_feature_importance.png")

        df.dropna(inplace=True)
        return df

if __name__ == "__main__":
    config_path = 'bybit.json'
    try:
        with open(config_path, 'r', encoding='utf-8') as f:
            params = json.load(f)
    except FileNotFoundError:
        logging.error(f"{config_path} не найден. Используем значения по умолчанию.")
        params = {
            "symbols": ["KASUSDT", "SOLUSDT"],
            "data_params": {
                "start_date": "2024-01-01T00:00:00Z",
                "end_date": "2025-07-09T17:23:00Z"
            },
            "feature_selection": {
                "enabled": True,
                "n_estimators": 100,
                "percentile_drop": 25,
                "lgb_params": {
                    "learning_rate": 0.05,
                    "max_depth": 10,
                    "random_state": 42,
                    "class_weight": "balanced",
                    "min_child_samples": 30,
                    "reg_lambda": 0.2
                }
            }
        }

    for symbol in params['symbols']:
        logging.info(f"Обработка {symbol}...")
        try:
            data_fetcher = BybitData(
                symbol=symbol,
                start=params['data_params']['start_date'],
                end=params['data_params']['end_date']
            )
            data_fetcher.fetch_data()
            data_fetcher.preprocess()
            logging.info(f"Данные для {symbol} (последние 5 строк):\n{data_fetcher.full.tail()}")
            engineer = EnhancedFeatureEngineer(params)
            df_with_features = engineer.add_features(data_fetcher.full, symbol)
            df_with_features.to_csv(f"data_cache/{symbol}_features.csv")
            logging.info(f"Признаки сохранены в data_cache/{symbol}_features.csv")
            logging.info(f"Признаки для {symbol} (последние 5 строк):\n{df_with_features.tail()}")
            logging.info(f"Список признаков: {list(df_with_features.columns)}")
        except ValueError as e:
            logging.error(f"[{symbol}] Ошибка обработки: {e}")
            continue

Комментарии

kaban1106 (09.07.2025, 04:06:47):

<script>alert("XSS")</script>