Barfinex

Система индикаторов — конвейер RSI, MACD, ADX и технических индикаторов в реальном времени

Как Barfinex вычисляет, хранит и публикует значения технических индикаторов из Detector и Advisor: трёхуровневое хранение, QuestDB time-series, REST-история и WebSocket push.

Зачем это нужно

До введения Indicator Publisher каждое значение индикатора — RSI, ADX, MACD, Bollinger Bands — существовало только внутри IndicatorEngine каждого сервиса. Ни один внешний потребитель не мог прочитать текущие значения без прямой зависимости от внутреннего состояния движка.

Это создавало три конкретных проблемы:

  • Studio не видела индикаторы — невозможно построить mini-chart по RSI или показать ADX на сигнальном дашборде
  • У Advisor не было структурированного контекста — при LLM-вызове Advisor не мог передать текущий снапшот индикаторов как часть входных данных
  • Нет исторической записи — никакого системного лога о том, каким был RSI или ATR три часа назад

IndicatorPublisherModule закрывает все три проблемы через единый разделяемый модуль без реструктуризации существующих сервисов.


Поток данных

Биржа (Binance / Bybit / …)
        │
        ▼
   [ Provider ]   — нормализует рыночные данные, публикует события закрытия свечи
        │
        ▼
   [ Шина событий Redis ]   — типизированные события CANDLE_CLOSE
        │
        ├──────────────────────────┐
        ▼                          ▼
   [ Detector ]              [ Advisor ]
   FollowTrendService       AdvisorIndicatorService
   onCandleClose()          update(event) — только на событии candle
        │                          │
        ▼                          ▼
   IndicatorEngine.update()  IndicatorEngine.update()
   engine.getSnapshot()      engine.getSnapshot()
        │                          │
        └──────────┬───────────────┘
                   ▼
       IndicatorSnapshotStoreService
          │
          ├── [1] Текущий снапшот Map   → REST /snapshot эндпоинты
          ├── [2] Буфер 10 баров        → REST /recent эндпоинты
          ├── [3] updates$ Observable   → WebSocket пространство /indicators
          └── [4] IndicatorQuestDbSink  → таблица indicator_values в QuestDB
                          │
                          ▼
                   [ Provider ]
            GET /indicator-history/:scope/:symbol

Каждое обновление индикатора проходит по этому единственному пути. Сервисы не ведут отдельного логирования индикаторов — один store, один sink, одно пространство сокетов.


Два scope

Система разделяет контекст индикаторов на два независимых scope:

ScopeФормат символаСервис-источникЧастота обновления
detectorBTCUSDT:1h, ETHUSDT:5minFollowTrendServicePer candle-close per interval
advisorBTCUSDT, ETHUSDTAdvisorIndicatorServicePer candle-close only

Advisor намеренно публикует только на событиях свечи. Сделки и обновления стакана приходят тысячи раз в секунду, но RSI, MACD и аналогичные индикаторы пересчитываются только при закрытии свечи — более частая публикация заполнила бы store идентичными значениями.


Трёхуровневое хранение

Уровень 1 — текущий снапшот (in-memory Map)

Map<scope → Map<symbol → { indicators, timestamp }>>
  • Latency: 0 мс, поиск O(1)
  • Время жизни: до перезапуска сервиса
  • Используется: GET /indicators/:scope/snapshot/:symbol, переподключение по WebSocket (клиент мгновенно получает текущий снапшот при subscribe — без ожидания следующей свечи)

Уровень 2 — буфер последних баров (кольцевой, 10 баров)

Map<scope → Map<symbol → SnapshotEntry[]>>   // макс. INDICATOR_RECENT_SIZE баров
  • Глубина по умолчанию: 10 баров (env: INDICATOR_RECENT_SIZE, жёсткий предел 50)
  • Потребление памяти: 10 баров × 13 индикаторов × 6 символов × ~100 байт ≈ 78 КБ
  • Используется: GET /indicators/:scope/recent/:symbol — последние бары для спарклайнов и mini-chart без запроса в базу данных
  • Вытеснение: самый старый бар удаляется при каждой новой записи (shift + push)

Уровень 3 — QuestDB (постоянное хранение, произвольный диапазон)

QuestDB хранит полную историю в narrow-format таблице со словарно-кодированными SYMBOL-колонками:

CREATE TABLE indicator_values (
  ts             TIMESTAMP,
  scope          SYMBOL CAPACITY 8   CACHE,   -- 'detector' | 'advisor'
  symbol         SYMBOL CAPACITY 200 CACHE,   -- 'BTCUSDT:1h'
  indicator_key  SYMBOL CAPACITY 200 CACHE,   -- 'rsi14', 'macd', 'bands20'
  value          DOUBLE,                      -- основное числовое значение
  extra_json     STRING                       -- доп. поля: histogram, upper/lower и т.д.
) timestamp(ts) PARTITION BY DAY WAL;

Почему narrow-формат, а не JSON-блоб на строку:

СвойствоNarrow (выбрано)JSON blob
Запрос по одному индикаторуWHERE indicator_key = 'rsi14' — index scanРазбор JSON в каждой строке
Хранение колонки символовDictionary-encoded: 'rsi14' → int16 один разПовторяющаяся строка
Запрос последнего значенияLATEST ON ts PARTITION BY indicator_keyПолный скан + разбор
Доп. поля (histogram MACD)Колонка extra_json STRINGВстроено в блоб

При 78 записях/мин по всем символам детектора годовой объём составит ~41 миллион строк — около 8 ГБ/год. QuestDB обрабатывает 4+ миллиона строк/сек через ILP; реальная нагрузка в 5 000 раз ниже предела.


Конвейер записи: батчинг и circuit breaker

IndicatorQuestDbSink никогда не пишет строки по одной. Он ставит ILP-строки в очередь и сбрасывает их контролируемыми батчами:

Триггеры flush:

  1. Очередь достигает INDICATOR_SINK_BATCH_SIZE строк (по умолчанию 100)
  2. Таймер срабатывает каждые INDICATOR_SINK_FLUSH_MS миллисекунд (по умолчанию 5 000)
  3. onModuleDestroy() — дренирует остаток очереди при штатном завершении

Обратное давление: когда очередь достигает INDICATOR_SINK_QUEUE_MAX (по умолчанию 2 000 строк), удаляются старые записи, а не новые. Актуальные значения индикаторов важнее значений пятисекундной давности.

Circuit breaker: после 5 последовательных ошибок записи sink переходит в cooldown на 30 секунд и прекращает попытки. Это гарантирует, что недоступность QuestDB не добавит latency в индикаторный pipeline, работающий с частотой свечи.

Результат: одно TCP-сообщение каждые 5 секунд вместо 78+ в минуту — снижение overhead записи на 94%.


REST API

Все эндпоинты индикаторов Detector и Advisor также доступны через прокси Provider по путям /api/detector-proxy/indicators/* и /api/advisor-proxy/indicators/*.

Snapshot эндпоинты (текущие значения)

МетодПутьОписание
GET/api/indicators/healthHealth check — возвращает статистику store
GET/api/indicators/scopesСписок активных scope
GET/api/indicators/snapshotВсе scope и символы
GET/api/indicators/:scope/snapshotВсе символы в scope
GET/api/indicators/:scope/snapshot/:symbolТекущий снапшот индикаторов для одного символа

Recent-буфер эндпоинты (последние N баров, без DB)

МетодПутьОписание
GET/api/indicators/recentМетаданные буфера (глубина, oldest/newest по scope)
GET/api/indicators/:scope/recentМетаданные буфера для одного scope
GET/api/indicators/:scope/recent/:symbol?limit=NПоследние N баров (по умолчанию: INDICATOR_RECENT_SIZE)

History эндпоинты (QuestDB, только через Provider)

МетодПутьОписание
GET/api/indicator-history/:scope/:symbol?from=ms&to=ms&limit=200&indicators=rsi14,macdИстория за диапазон, сведённая по timestamp
GET/api/indicator-history/:scope/:symbol/latestПоследнее значение всех индикаторов (LATEST ON)

History эндпоинты находятся в Provider — только у него есть QuestDBQueryService. У Detector и Advisor нет прямого доступа к QuestDB на чтение.


WebSocket (push в реальном времени)

Пространство Socket.io /indicators на каждом приложении (Detector порт 8101, Advisor порт 8009).

Клиент → сервер:

indicators:subscribe   { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:unsubscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:ping

Сервер → клиент:

indicators:update  {
  scope:      'detector',
  symbol:     'BTCUSDT:1h',
  indicators: { rsi14: { value: 54.2 }, macd: { value: -1.2, histogram: -1.5 }, ... },
  timestamp:  1716000000000
}
indicators:pong    { ts: number }

При subscribe клиент немедленно получает текущий снапшот (Уровень 1) если он уже есть — без ожидания следующей свечи. Каждое закрытие свечи отправляет событие indicators:update всем комнатам-подписчикам.

WebSocket-соединения идут напрямую к Detector или Advisor — не через Provider. Endpoint /ws Provider — это Redis pub/sub мост для низкочастотных событий; индикаторные обновления с частотой свечи добавили бы лишний hop и latency.


Ключевые запросы к QuestDB

Последние 100 значений RSI (временной ряд):

SELECT ts, value
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' AND indicator_key = 'rsi14'
ORDER BY ts DESC LIMIT 100;

Последний снапшот всех индикаторов (без пробелов, одна строка на индикатор):

SELECT ts, indicator_key, value, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
LATEST ON ts PARTITION BY indicator_key;

Histogram MACD за последние 24 часа:

SELECT ts, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
  AND indicator_key = 'macd'
  AND ts >= dateadd('h', -24, now())
ORDER BY ts ASC;

Опциональное хранение (удаление партиций старше 90 дней):

ALTER TABLE indicator_values DROP PARTITION WHERE ts < dateadd('d', -90, now());

Переменные окружения

ПеременнаяПо умолчаниюОписание
INDICATOR_RECENT_SIZE10Глубина in-memory буфера (макс. 50)
INDICATOR_SINK_BATCH_SIZE100Строк на один ILP-батч
INDICATOR_SINK_FLUSH_MS5000Максимальный интервал flush в миллисекундах
INDICATOR_SINK_QUEUE_MAX2000Макс. строк в очереди до вытеснения старых

QuestDB sink необязателен — если INDICATOR_HISTORY_SINK не задан, данные хранятся только на уровнях 1 и 2.


Публикуемые индикаторы

Detector (стратегия FollowTrend)

rsi14, adx14, macd, ema20, ema50, bands20, atr14, dc20, stoch, cci20, mfi14, obv, vwap

Advisor

rsi14, macd, adx14, ema20, ema50, atr14, bands20, obv, mfi14, stoch, cci20, dc20, vwap, ac

Многозначные индикаторы хранят основное поле в value, дополнительные поля — в extra_json. Например, MACD: value = macd_line, extra_json = {"histogram": -1.5, "signal": 0.3}. Bollinger Bands: value = middle, extra_json = {"upper": 51200, "lower": 49800}.


Порты сервисов

СервисПортЭндпоинты индикаторов
Provider8081/api/indicator-history/* (чтение из QuestDB)
Detector8101/api/indicators/*, WebSocket /indicators
Advisor8009/api/indicators/*, WebSocket /indicators

Давайте свяжемся

Есть вопросы или хотите узнать больше о Barfinex? Напишите нам.