Barfinex

Indicator System — real-time pipeline for RSI, MACD, ADX and all technical indicators

How Barfinex computes, stores and exposes technical indicator values across Detector and Advisor services: 3-tier storage, QuestDB time-series, REST history, and WebSocket push.

Why this module exists

Before the Indicator Publisher, every indicator value — RSI, ADX, MACD, Bollinger Bands — existed only inside the internal IndicatorEngine of each service. No external consumer could read current values without a full dependency on the engine's private state.

This created three concrete problems:

  • Studio could not display indicators — no way to build an RSI mini-chart or show ADX on the signal dashboard
  • Advisor had no structured indicator context — when calling the LLM, Advisor could not attach a current indicator snapshot as part of the reasoning input
  • No historical record — no systematic log of what RSI or ATR was three hours ago

The IndicatorPublisherModule closes all three gaps in a single shared module without restructuring existing services.


Data flow

Exchange (Binance / Bybit / …)
        │
        ▼
   [ Provider ]   — normalizes market data, publishes candle-close events
        │
        ▼
   [ Redis event bus ]   — CANDLE_CLOSE typed events
        │
        ├──────────────────────────┐
        ▼                          ▼
   [ Detector ]              [ Advisor ]
   FollowTrendService       AdvisorIndicatorService
   onCandleClose()          update(event) — only on candle type
        │                          │
        ▼                          ▼
   IndicatorEngine.update()  IndicatorEngine.update()
   engine.getSnapshot()      engine.getSnapshot()
        │                          │
        └──────────┬───────────────┘
                   ▼
       IndicatorSnapshotStoreService
          │
          ├── [1] Current snapshot Map   → REST /snapshot endpoints
          ├── [2] Recent buffer (10 bars) → REST /recent endpoints
          ├── [3] updates$ Observable    → WebSocket /indicators namespace
          └── [4] IndicatorQuestDbSink   → QuestDB indicator_values table
                          │
                          ▼
                   [ Provider ]
            GET /indicator-history/:scope/:symbol

Every indicator update follows this single path. Services do not have separate indicator logging — there is one store, one sink, one socket namespace.


Two scopes

The system separates indicator context into two independent scopes:

ScopeSymbol formatSource serviceUpdate cadence
detectorBTCUSDT:1h, ETHUSDT:5minFollowTrendServicePer candle-close per interval
advisorBTCUSDT, ETHUSDTAdvisorIndicatorServicePer candle-close only

Advisor intentionally publishes only on candle events. Trades and order book updates fire thousands of times per second, but RSI, MACD, and similar indicators are recalculated only when a candle closes — publishing more frequently would flood the store with identical values.


Three-tier storage

Tier 1 — current snapshot (in-memory Map)

Map<scope → Map<symbol → { indicators, timestamp }>>
  • Latency: 0 ms, O(1) lookup
  • Lifetime: until service restart
  • Used by: GET /indicators/:scope/snapshot/:symbol, WebSocket reconnect (client gets current snapshot immediately on subscribe, without waiting for the next candle)

Tier 2 — recent buffer (10-bar ring)

Map<scope → Map<symbol → SnapshotEntry[]>>   // max INDICATOR_RECENT_SIZE bars
  • Default depth: 10 bars (env: INDICATOR_RECENT_SIZE, hard cap 50)
  • Memory footprint: 10 bars × 13 indicators × 6 symbols × ~100 bytes ≈ 78 KB
  • Used by: GET /indicators/:scope/recent/:symbol — the last few bars for sparklines and mini-charts without a database round-trip
  • Eviction: oldest bar is dropped on each new write (shift + push)

Tier 3 — QuestDB (persistent, arbitrary time-range)

QuestDB stores the full history in a narrow-format table with dictionary-encoded SYMBOL columns:

CREATE TABLE indicator_values (
  ts             TIMESTAMP,
  scope          SYMBOL CAPACITY 8   CACHE,   -- 'detector' | 'advisor'
  symbol         SYMBOL CAPACITY 200 CACHE,   -- 'BTCUSDT:1h'
  indicator_key  SYMBOL CAPACITY 200 CACHE,   -- 'rsi14', 'macd', 'bands20'
  value          DOUBLE,                      -- primary numeric value
  extra_json     STRING                       -- secondary fields: histogram, upper/lower, etc.
) timestamp(ts) PARTITION BY DAY WAL;

Why narrow format instead of a JSON blob per row:

PropertyNarrow (chosen)JSON blob
Query a single indicatorWHERE indicator_key = 'rsi14' — index scanParse JSON for every row
Symbol column storageDictionary-encoded: 'rsi14' → int16 onceFull string repeated
Latest-value queryLATEST ON ts PARTITION BY indicator_keyFull scan + parse
Extra fields (MACD histogram)extra_json STRING columnEmbedded in blob

At 78 writes/min across all detector symbols, the annual volume is ~41 million rows — roughly 8 GB/year. QuestDB handles 4+ million rows/sec via ILP; actual load is 5,000× below its limit.


Write pipeline: batching and circuit breaker

The IndicatorQuestDbSink never writes row-by-row. It queues ILP lines and flushes them in controlled batches:

Flush triggers:

  1. Queue reaches INDICATOR_SINK_BATCH_SIZE rows (default 100)
  2. Timer fires every INDICATOR_SINK_FLUSH_MS milliseconds (default 5 000)
  3. onModuleDestroy() — drains the remaining queue on graceful shutdown

Backpressure: when the queue reaches INDICATOR_SINK_QUEUE_MAX (default 2 000 rows), the oldest entries are dropped — not the newest. Current indicator values matter more than values from five seconds ago.

Circuit breaker: after 5 consecutive write failures, the sink enters a 30-second cooldown and stops attempting writes. This ensures a QuestDB outage does not add latency to the indicator pipeline running at candle frequency.

Result: one TCP message every 5 seconds instead of 78+ per minute — a 94% reduction in write overhead.


REST API

All Detector and Advisor indicator endpoints are also accessible via Provider proxy at /api/detector-proxy/indicators/* and /api/advisor-proxy/indicators/*.

Snapshot endpoints (current values)

MethodPathDescription
GET/api/indicators/healthHealth check — returns store stats
GET/api/indicators/scopesList of active scopes
GET/api/indicators/snapshotAll scopes and symbols
GET/api/indicators/:scope/snapshotAll symbols in scope
GET/api/indicators/:scope/snapshot/:symbolCurrent indicator snapshot for one symbol

Recent-buffer endpoints (last N bars, no DB)

MethodPathDescription
GET/api/indicators/recentBuffer metadata (depth, oldest/newest ts per scope)
GET/api/indicators/:scope/recentBuffer metadata for one scope
GET/api/indicators/:scope/recent/:symbol?limit=NLast N bars (default: INDICATOR_RECENT_SIZE)

History endpoints (QuestDB, via Provider only)

MethodPathDescription
GET/api/indicator-history/:scope/:symbol?from=ms&to=ms&limit=200&indicators=rsi14,macdTime-range history, pivoted by timestamp
GET/api/indicator-history/:scope/:symbol/latestMost recent value for all indicators (LATEST ON)

History endpoints live in Provider — Provider is the only service with QuestDBQueryService. Detector and Advisor have no direct QuestDB read access.


WebSocket (real-time push)

Socket.io namespace /indicators on each app (Detector port 8101, Advisor port 8009).

Client → server:

indicators:subscribe   { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:unsubscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:ping

Server → client:

indicators:update  {
  scope:      'detector',
  symbol:     'BTCUSDT:1h',
  indicators: { rsi14: { value: 54.2 }, macd: { value: -1.2, histogram: -1.5 }, ... },
  timestamp:  1716000000000
}
indicators:pong    { ts: number }

On subscribe, the client immediately receives the current snapshot (Tier 1) if one exists — no waiting for the next candle. Every subsequent candle-close fires a new indicators:update event to all subscribed rooms.

WebSocket connections go directly to Detector or Advisor — they do not route through Provider. Provider's /ws endpoint is a Redis pub/sub bridge designed for low-frequency events; indicator updates at candle frequency would add an unnecessary hop and latency.


Key QuestDB queries

Last 100 RSI values (time series):

SELECT ts, value
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' AND indicator_key = 'rsi14'
ORDER BY ts DESC LIMIT 100;

Latest snapshot for all indicators (no gaps, one row per indicator):

SELECT ts, indicator_key, value, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
LATEST ON ts PARTITION BY indicator_key;

MACD histogram over the last 24 hours:

SELECT ts, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
  AND indicator_key = 'macd'
  AND ts >= dateadd('h', -24, now())
ORDER BY ts ASC;

Optional data retention (drop partitions older than 90 days):

ALTER TABLE indicator_values DROP PARTITION WHERE ts < dateadd('d', -90, now());

Environment variables

VariableDefaultDescription
INDICATOR_RECENT_SIZE10Depth of in-memory recent buffer (max 50)
INDICATOR_SINK_BATCH_SIZE100Rows per QuestDB ILP batch flush
INDICATOR_SINK_FLUSH_MS5000Maximum flush interval in milliseconds
INDICATOR_SINK_QUEUE_MAX2000Maximum queued rows before oldest are dropped

The QuestDB sink is optional — if INDICATOR_HISTORY_SINK is not provided, indicator data is only stored in Tiers 1 and 2.


Indicators published per scope

Detector (FollowTrend strategy)

rsi14, adx14, macd, ema20, ema50, bands20, atr14, dc20, stoch, cci20, mfi14, obv, vwap

Advisor

rsi14, macd, adx14, ema20, ema50, atr14, bands20, obv, mfi14, stoch, cci20, dc20, vwap, ac

Multi-value indicators store their primary field in value and secondary fields in extra_json. For example, MACD: value = macd_line, extra_json = {"histogram": -1.5, "signal": 0.3}. Bollinger Bands: value = middle, extra_json = {"upper": 51200, "lower": 49800}.


Service ports

ServicePortIndicator namespace
Provider8081/api/indicator-history/* (QuestDB read)
Detector8101/api/indicators/*, WebSocket /indicators
Advisor8009/api/indicators/*, WebSocket /indicators

Let’s Get in Touch

Have questions or want to explore Barfinex? Send us a message.