Система индикаторов — конвейер RSI, MACD, ADX и технических индикаторов в реальном времени
Как Barfinex вычисляет, хранит и публикует значения технических индикаторов из Detector и Advisor: трёхуровневое хранение, QuestDB time-series, REST-история и WebSocket push.
Зачем это нужно
До введения Indicator Publisher каждое значение индикатора — RSI, ADX, MACD, Bollinger Bands — существовало только внутри IndicatorEngine каждого сервиса. Ни один внешний потребитель не мог прочитать текущие значения без прямой зависимости от внутреннего состояния движка.
Это создавало три конкретных проблемы:
- Studio не видела индикаторы — невозможно построить mini-chart по RSI или показать ADX на сигнальном дашборде
- У Advisor не было структурированного контекста — при LLM-вызове Advisor не мог передать текущий снапшот индикаторов как часть входных данных
- Нет исторической записи — никакого системного лога о том, каким был RSI или ATR три часа назад
IndicatorPublisherModule закрывает все три проблемы через единый разделяемый модуль без реструктуризации существующих сервисов.
Поток данных
Биржа (Binance / Bybit / …)
│
▼
[ Provider ] — нормализует рыночные данные, публикует события закрытия свечи
│
▼
[ Шина событий Redis ] — типизированные события CANDLE_CLOSE
│
├──────────────────────────┐
▼ ▼
[ Detector ] [ Advisor ]
FollowTrendService AdvisorIndicatorService
onCandleClose() update(event) — только на событии candle
│ │
▼ ▼
IndicatorEngine.update() IndicatorEngine.update()
engine.getSnapshot() engine.getSnapshot()
│ │
└──────────┬───────────────┘
▼
IndicatorSnapshotStoreService
│
├── [1] Текущий снапшот Map → REST /snapshot эндпоинты
├── [2] Буфер 10 баров → REST /recent эндпоинты
├── [3] updates$ Observable → WebSocket пространство /indicators
└── [4] IndicatorQuestDbSink → таблица indicator_values в QuestDB
│
▼
[ Provider ]
GET /indicator-history/:scope/:symbol
Каждое обновление индикатора проходит по этому единственному пути. Сервисы не ведут отдельного логирования индикаторов — один store, один sink, одно пространство сокетов.
Два scope
Система разделяет контекст индикаторов на два независимых scope:
| Scope | Формат символа | Сервис-источник | Частота обновления |
|---|---|---|---|
detector | BTCUSDT:1h, ETHUSDT:5min | FollowTrendService | Per candle-close per interval |
advisor | BTCUSDT, ETHUSDT | AdvisorIndicatorService | Per candle-close only |
Advisor намеренно публикует только на событиях свечи. Сделки и обновления стакана приходят тысячи раз в секунду, но RSI, MACD и аналогичные индикаторы пересчитываются только при закрытии свечи — более частая публикация заполнила бы store идентичными значениями.
Трёхуровневое хранение
Уровень 1 — текущий снапшот (in-memory Map)
Map<scope → Map<symbol → { indicators, timestamp }>>
- Latency: 0 мс, поиск O(1)
- Время жизни: до перезапуска сервиса
- Используется:
GET /indicators/:scope/snapshot/:symbol, переподключение по WebSocket (клиент мгновенно получает текущий снапшот при subscribe — без ожидания следующей свечи)
Уровень 2 — буфер последних баров (кольцевой, 10 баров)
Map<scope → Map<symbol → SnapshotEntry[]>> // макс. INDICATOR_RECENT_SIZE баров
- Глубина по умолчанию: 10 баров (env:
INDICATOR_RECENT_SIZE, жёсткий предел 50) - Потребление памяти: 10 баров × 13 индикаторов × 6 символов × ~100 байт ≈ 78 КБ
- Используется:
GET /indicators/:scope/recent/:symbol— последние бары для спарклайнов и mini-chart без запроса в базу данных - Вытеснение: самый старый бар удаляется при каждой новой записи (shift + push)
Уровень 3 — QuestDB (постоянное хранение, произвольный диапазон)
QuestDB хранит полную историю в narrow-format таблице со словарно-кодированными SYMBOL-колонками:
CREATE TABLE indicator_values ( ts TIMESTAMP, scope SYMBOL CAPACITY 8 CACHE, -- 'detector' | 'advisor' symbol SYMBOL CAPACITY 200 CACHE, -- 'BTCUSDT:1h' indicator_key SYMBOL CAPACITY 200 CACHE, -- 'rsi14', 'macd', 'bands20' value DOUBLE, -- основное числовое значение extra_json STRING -- доп. поля: histogram, upper/lower и т.д. ) timestamp(ts) PARTITION BY DAY WAL;
Почему narrow-формат, а не JSON-блоб на строку:
| Свойство | Narrow (выбрано) | JSON blob |
|---|---|---|
| Запрос по одному индикатору | WHERE indicator_key = 'rsi14' — index scan | Разбор JSON в каждой строке |
| Хранение колонки символов | Dictionary-encoded: 'rsi14' → int16 один раз | Повторяющаяся строка |
| Запрос последнего значения | LATEST ON ts PARTITION BY indicator_key | Полный скан + разбор |
| Доп. поля (histogram MACD) | Колонка extra_json STRING | Встроено в блоб |
При 78 записях/мин по всем символам детектора годовой объём составит ~41 миллион строк — около 8 ГБ/год. QuestDB обрабатывает 4+ миллиона строк/сек через ILP; реальная нагрузка в 5 000 раз ниже предела.
Конвейер записи: батчинг и circuit breaker
IndicatorQuestDbSink никогда не пишет строки по одной. Он ставит ILP-строки в очередь и сбрасывает их контролируемыми батчами:
Триггеры flush:
- Очередь достигает
INDICATOR_SINK_BATCH_SIZEстрок (по умолчанию 100) - Таймер срабатывает каждые
INDICATOR_SINK_FLUSH_MSмиллисекунд (по умолчанию 5 000) onModuleDestroy()— дренирует остаток очереди при штатном завершении
Обратное давление: когда очередь достигает INDICATOR_SINK_QUEUE_MAX (по умолчанию 2 000 строк), удаляются старые записи, а не новые. Актуальные значения индикаторов важнее значений пятисекундной давности.
Circuit breaker: после 5 последовательных ошибок записи sink переходит в cooldown на 30 секунд и прекращает попытки. Это гарантирует, что недоступность QuestDB не добавит latency в индикаторный pipeline, работающий с частотой свечи.
Результат: одно TCP-сообщение каждые 5 секунд вместо 78+ в минуту — снижение overhead записи на 94%.
REST API
Все эндпоинты индикаторов Detector и Advisor также доступны через прокси Provider по путям /api/detector-proxy/indicators/* и /api/advisor-proxy/indicators/*.
Snapshot эндпоинты (текущие значения)
| Метод | Путь | Описание |
|---|---|---|
| GET | /api/indicators/health | Health check — возвращает статистику store |
| GET | /api/indicators/scopes | Список активных scope |
| GET | /api/indicators/snapshot | Все scope и символы |
| GET | /api/indicators/:scope/snapshot | Все символы в scope |
| GET | /api/indicators/:scope/snapshot/:symbol | Текущий снапшот индикаторов для одного символа |
Recent-буфер эндпоинты (последние N баров, без DB)
| Метод | Путь | Описание |
|---|---|---|
| GET | /api/indicators/recent | Метаданные буфера (глубина, oldest/newest по scope) |
| GET | /api/indicators/:scope/recent | Метаданные буфера для одного scope |
| GET | /api/indicators/:scope/recent/:symbol?limit=N | Последние N баров (по умолчанию: INDICATOR_RECENT_SIZE) |
History эндпоинты (QuestDB, только через Provider)
| Метод | Путь | Описание |
|---|---|---|
| GET | /api/indicator-history/:scope/:symbol?from=ms&to=ms&limit=200&indicators=rsi14,macd | История за диапазон, сведённая по timestamp |
| GET | /api/indicator-history/:scope/:symbol/latest | Последнее значение всех индикаторов (LATEST ON) |
History эндпоинты находятся в Provider — только у него есть QuestDBQueryService. У Detector и Advisor нет прямого доступа к QuestDB на чтение.
WebSocket (push в реальном времени)
Пространство Socket.io /indicators на каждом приложении (Detector порт 8101, Advisor порт 8009).
Клиент → сервер:
indicators:subscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:unsubscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:ping
Сервер → клиент:
indicators:update {
scope: 'detector',
symbol: 'BTCUSDT:1h',
indicators: { rsi14: { value: 54.2 }, macd: { value: -1.2, histogram: -1.5 }, ... },
timestamp: 1716000000000
}
indicators:pong { ts: number }
При subscribe клиент немедленно получает текущий снапшот (Уровень 1) если он уже есть — без ожидания следующей свечи. Каждое закрытие свечи отправляет событие indicators:update всем комнатам-подписчикам.
WebSocket-соединения идут напрямую к Detector или Advisor — не через Provider. Endpoint /ws Provider — это Redis pub/sub мост для низкочастотных событий; индикаторные обновления с частотой свечи добавили бы лишний hop и latency.
Ключевые запросы к QuestDB
Последние 100 значений RSI (временной ряд):
SELECT ts, value FROM indicator_values WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' AND indicator_key = 'rsi14' ORDER BY ts DESC LIMIT 100;
Последний снапшот всех индикаторов (без пробелов, одна строка на индикатор):
SELECT ts, indicator_key, value, extra_json FROM indicator_values WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' LATEST ON ts PARTITION BY indicator_key;
Histogram MACD за последние 24 часа:
SELECT ts, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
AND indicator_key = 'macd'
AND ts >= dateadd('h', -24, now())
ORDER BY ts ASC;
Опциональное хранение (удаление партиций старше 90 дней):
ALTER TABLE indicator_values DROP PARTITION WHERE ts < dateadd('d', -90, now());
Переменные окружения
| Переменная | По умолчанию | Описание |
|---|---|---|
INDICATOR_RECENT_SIZE | 10 | Глубина in-memory буфера (макс. 50) |
INDICATOR_SINK_BATCH_SIZE | 100 | Строк на один ILP-батч |
INDICATOR_SINK_FLUSH_MS | 5000 | Максимальный интервал flush в миллисекундах |
INDICATOR_SINK_QUEUE_MAX | 2000 | Макс. строк в очереди до вытеснения старых |
QuestDB sink необязателен — если INDICATOR_HISTORY_SINK не задан, данные хранятся только на уровнях 1 и 2.
Публикуемые индикаторы
Detector (стратегия FollowTrend)
rsi14, adx14, macd, ema20, ema50, bands20, atr14, dc20, stoch, cci20, mfi14, obv, vwap
Advisor
rsi14, macd, adx14, ema20, ema50, atr14, bands20, obv, mfi14, stoch, cci20, dc20, vwap, ac
Многозначные индикаторы хранят основное поле в value, дополнительные поля — в extra_json. Например, MACD: value = macd_line, extra_json = {"histogram": -1.5, "signal": 0.3}. Bollinger Bands: value = middle, extra_json = {"upper": 51200, "lower": 49800}.
Порты сервисов
| Сервис | Порт | Эндпоинты индикаторов |
|---|---|---|
| Provider | 8081 | /api/indicator-history/* (чтение из QuestDB) |
| Detector | 8101 | /api/indicators/*, WebSocket /indicators |
| Advisor | 8009 | /api/indicators/*, WebSocket /indicators |