Indicator System — real-time pipeline for RSI, MACD, ADX and all technical indicators
How Barfinex computes, stores and exposes technical indicator values across Detector and Advisor services: 3-tier storage, QuestDB time-series, REST history, and WebSocket push.
Why this module exists
Before the Indicator Publisher, every indicator value — RSI, ADX, MACD, Bollinger Bands — existed only inside the internal IndicatorEngine of each service. No external consumer could read current values without a full dependency on the engine's private state.
This created three concrete problems:
- Studio could not display indicators — no way to build an RSI mini-chart or show ADX on the signal dashboard
- Advisor had no structured indicator context — when calling the LLM, Advisor could not attach a current indicator snapshot as part of the reasoning input
- No historical record — no systematic log of what RSI or ATR was three hours ago
The IndicatorPublisherModule closes all three gaps in a single shared module without restructuring existing services.
Data flow
Exchange (Binance / Bybit / …)
│
▼
[ Provider ] — normalizes market data, publishes candle-close events
│
▼
[ Redis event bus ] — CANDLE_CLOSE typed events
│
├──────────────────────────┐
▼ ▼
[ Detector ] [ Advisor ]
FollowTrendService AdvisorIndicatorService
onCandleClose() update(event) — only on candle type
│ │
▼ ▼
IndicatorEngine.update() IndicatorEngine.update()
engine.getSnapshot() engine.getSnapshot()
│ │
└──────────┬───────────────┘
▼
IndicatorSnapshotStoreService
│
├── [1] Current snapshot Map → REST /snapshot endpoints
├── [2] Recent buffer (10 bars) → REST /recent endpoints
├── [3] updates$ Observable → WebSocket /indicators namespace
└── [4] IndicatorQuestDbSink → QuestDB indicator_values table
│
▼
[ Provider ]
GET /indicator-history/:scope/:symbol
Every indicator update follows this single path. Services do not have separate indicator logging — there is one store, one sink, one socket namespace.
Two scopes
The system separates indicator context into two independent scopes:
| Scope | Symbol format | Source service | Update cadence |
|---|---|---|---|
detector | BTCUSDT:1h, ETHUSDT:5min | FollowTrendService | Per candle-close per interval |
advisor | BTCUSDT, ETHUSDT | AdvisorIndicatorService | Per candle-close only |
Advisor intentionally publishes only on candle events. Trades and order book updates fire thousands of times per second, but RSI, MACD, and similar indicators are recalculated only when a candle closes — publishing more frequently would flood the store with identical values.
Three-tier storage
Tier 1 — current snapshot (in-memory Map)
Map<scope → Map<symbol → { indicators, timestamp }>>
- Latency: 0 ms, O(1) lookup
- Lifetime: until service restart
- Used by:
GET /indicators/:scope/snapshot/:symbol, WebSocket reconnect (client gets current snapshot immediately on subscribe, without waiting for the next candle)
Tier 2 — recent buffer (10-bar ring)
Map<scope → Map<symbol → SnapshotEntry[]>> // max INDICATOR_RECENT_SIZE bars
- Default depth: 10 bars (env:
INDICATOR_RECENT_SIZE, hard cap 50) - Memory footprint: 10 bars × 13 indicators × 6 symbols × ~100 bytes ≈ 78 KB
- Used by:
GET /indicators/:scope/recent/:symbol— the last few bars for sparklines and mini-charts without a database round-trip - Eviction: oldest bar is dropped on each new write (shift + push)
Tier 3 — QuestDB (persistent, arbitrary time-range)
QuestDB stores the full history in a narrow-format table with dictionary-encoded SYMBOL columns:
CREATE TABLE indicator_values ( ts TIMESTAMP, scope SYMBOL CAPACITY 8 CACHE, -- 'detector' | 'advisor' symbol SYMBOL CAPACITY 200 CACHE, -- 'BTCUSDT:1h' indicator_key SYMBOL CAPACITY 200 CACHE, -- 'rsi14', 'macd', 'bands20' value DOUBLE, -- primary numeric value extra_json STRING -- secondary fields: histogram, upper/lower, etc. ) timestamp(ts) PARTITION BY DAY WAL;
Why narrow format instead of a JSON blob per row:
| Property | Narrow (chosen) | JSON blob |
|---|---|---|
| Query a single indicator | WHERE indicator_key = 'rsi14' — index scan | Parse JSON for every row |
| Symbol column storage | Dictionary-encoded: 'rsi14' → int16 once | Full string repeated |
| Latest-value query | LATEST ON ts PARTITION BY indicator_key | Full scan + parse |
| Extra fields (MACD histogram) | extra_json STRING column | Embedded in blob |
At 78 writes/min across all detector symbols, the annual volume is ~41 million rows — roughly 8 GB/year. QuestDB handles 4+ million rows/sec via ILP; actual load is 5,000× below its limit.
Write pipeline: batching and circuit breaker
The IndicatorQuestDbSink never writes row-by-row. It queues ILP lines and flushes them in controlled batches:
Flush triggers:
- Queue reaches
INDICATOR_SINK_BATCH_SIZErows (default 100) - Timer fires every
INDICATOR_SINK_FLUSH_MSmilliseconds (default 5 000) onModuleDestroy()— drains the remaining queue on graceful shutdown
Backpressure: when the queue reaches INDICATOR_SINK_QUEUE_MAX (default 2 000 rows), the oldest entries are dropped — not the newest. Current indicator values matter more than values from five seconds ago.
Circuit breaker: after 5 consecutive write failures, the sink enters a 30-second cooldown and stops attempting writes. This ensures a QuestDB outage does not add latency to the indicator pipeline running at candle frequency.
Result: one TCP message every 5 seconds instead of 78+ per minute — a 94% reduction in write overhead.
REST API
All Detector and Advisor indicator endpoints are also accessible via Provider proxy at /api/detector-proxy/indicators/* and /api/advisor-proxy/indicators/*.
Snapshot endpoints (current values)
| Method | Path | Description |
|---|---|---|
| GET | /api/indicators/health | Health check — returns store stats |
| GET | /api/indicators/scopes | List of active scopes |
| GET | /api/indicators/snapshot | All scopes and symbols |
| GET | /api/indicators/:scope/snapshot | All symbols in scope |
| GET | /api/indicators/:scope/snapshot/:symbol | Current indicator snapshot for one symbol |
Recent-buffer endpoints (last N bars, no DB)
| Method | Path | Description |
|---|---|---|
| GET | /api/indicators/recent | Buffer metadata (depth, oldest/newest ts per scope) |
| GET | /api/indicators/:scope/recent | Buffer metadata for one scope |
| GET | /api/indicators/:scope/recent/:symbol?limit=N | Last N bars (default: INDICATOR_RECENT_SIZE) |
History endpoints (QuestDB, via Provider only)
| Method | Path | Description |
|---|---|---|
| GET | /api/indicator-history/:scope/:symbol?from=ms&to=ms&limit=200&indicators=rsi14,macd | Time-range history, pivoted by timestamp |
| GET | /api/indicator-history/:scope/:symbol/latest | Most recent value for all indicators (LATEST ON) |
History endpoints live in Provider — Provider is the only service with QuestDBQueryService. Detector and Advisor have no direct QuestDB read access.
WebSocket (real-time push)
Socket.io namespace /indicators on each app (Detector port 8101, Advisor port 8009).
Client → server:
indicators:subscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:unsubscribe { scope: 'detector', symbol: 'BTCUSDT:1h' }
indicators:ping
Server → client:
indicators:update {
scope: 'detector',
symbol: 'BTCUSDT:1h',
indicators: { rsi14: { value: 54.2 }, macd: { value: -1.2, histogram: -1.5 }, ... },
timestamp: 1716000000000
}
indicators:pong { ts: number }
On subscribe, the client immediately receives the current snapshot (Tier 1) if one exists — no waiting for the next candle. Every subsequent candle-close fires a new indicators:update event to all subscribed rooms.
WebSocket connections go directly to Detector or Advisor — they do not route through Provider. Provider's /ws endpoint is a Redis pub/sub bridge designed for low-frequency events; indicator updates at candle frequency would add an unnecessary hop and latency.
Key QuestDB queries
Last 100 RSI values (time series):
SELECT ts, value FROM indicator_values WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' AND indicator_key = 'rsi14' ORDER BY ts DESC LIMIT 100;
Latest snapshot for all indicators (no gaps, one row per indicator):
SELECT ts, indicator_key, value, extra_json FROM indicator_values WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h' LATEST ON ts PARTITION BY indicator_key;
MACD histogram over the last 24 hours:
SELECT ts, extra_json
FROM indicator_values
WHERE scope = 'detector' AND symbol = 'BTCUSDT:1h'
AND indicator_key = 'macd'
AND ts >= dateadd('h', -24, now())
ORDER BY ts ASC;
Optional data retention (drop partitions older than 90 days):
ALTER TABLE indicator_values DROP PARTITION WHERE ts < dateadd('d', -90, now());
Environment variables
| Variable | Default | Description |
|---|---|---|
INDICATOR_RECENT_SIZE | 10 | Depth of in-memory recent buffer (max 50) |
INDICATOR_SINK_BATCH_SIZE | 100 | Rows per QuestDB ILP batch flush |
INDICATOR_SINK_FLUSH_MS | 5000 | Maximum flush interval in milliseconds |
INDICATOR_SINK_QUEUE_MAX | 2000 | Maximum queued rows before oldest are dropped |
The QuestDB sink is optional — if INDICATOR_HISTORY_SINK is not provided, indicator data is only stored in Tiers 1 and 2.
Indicators published per scope
Detector (FollowTrend strategy)
rsi14, adx14, macd, ema20, ema50, bands20, atr14, dc20, stoch, cci20, mfi14, obv, vwap
Advisor
rsi14, macd, adx14, ema20, ema50, atr14, bands20, obv, mfi14, stoch, cci20, dc20, vwap, ac
Multi-value indicators store their primary field in value and secondary fields in extra_json. For example, MACD: value = macd_line, extra_json = {"histogram": -1.5, "signal": 0.3}. Bollinger Bands: value = middle, extra_json = {"upper": 51200, "lower": 49800}.
Service ports
| Service | Port | Indicator namespace |
|---|---|---|
| Provider | 8081 | /api/indicator-history/* (QuestDB read) |
| Detector | 8101 | /api/indicators/*, WebSocket /indicators |
| Advisor | 8009 | /api/indicators/*, WebSocket /indicators |