Event Bus Architecture — Redis pub/sub backbone
Redis pub/sub event bus: BaseEvent envelope, 35+ typed channels across all services, source enforcement, Redis Streams dual-write for replay, latency observability.
Overview
The event bus is the nervous system of the Barfinex platform. Every inter-service communication — market data, signals, decisions, risk events, execution outcomes — travels as a typed, enveloped message over Redis pub/sub. No service calls another service's REST API for real-time data; they publish and subscribe through the bus.
Library: libs/event-bus — Redis pub/sub abstraction with exponential-backoff reconnect, dedup subscription, and per-interval latency metrics. Reads EVENT_BUS_REDIS_URL / REDIS_URL / REDIS_HOST+REDIS_PORT.
BaseEvent envelope
Every message on every channel is wrapped in a BaseEvent:
{
eventId: string -- UUID, unique per event instance
type: string -- SubscriptionType enum key
source: string -- service name: 'provider' | 'detector' | 'advisor' | 'inspector'
timestamp: number -- Unix ms
correlationId: string? -- links events in one decision cycle
causationId: string? -- the eventId that caused this event
payload: T -- typed payload per EventPayloadMap
}
Source enforcement: assertEventSourceMatch(type, source) is called at publish time — each service can only emit events with its own prefix. Detector cannot publish PROVIDER_*; Advisor cannot publish INSPECTOR_*. Compile-time guard.
Channel categories
All channel names are defined in libs/types/src/subscription.interface.ts as the SubscriptionType enum. Never use string literals — always import from @barfinex/types.
Provider channels — publisher: Provider (8081)
| Channel | Payload | Description |
|---|---|---|
PROVIDER_MARKETDATA_TRADE | ProviderTradeEvent | Live trade tick: symbol, price, volume, side, timestamp |
PROVIDER_MARKETDATA_ORDERBOOK | ProviderOrderBookEvent | Order book snapshot with bids/asks |
PROVIDER_MARKETDATA_CANDLE | ProviderCandleEvent | Candle close: OHLCV + symbol + interval |
PROVIDER_ACCOUNT_EVENT | ProviderAccountEvent | Account balance/equity update from exchange |
PROVIDER_ORDER_CREATE | ProviderOrderCreateEvent | Order acknowledgement (REST response — not a fill) |
PROVIDER_ORDER_CLOSE | ProviderOrderCloseEvent | Order closed |
PROVIDER_INSTRUMENTS | ProviderInstrumentsEvent | Instrument list update |
PROVIDER_INSTRUMENT_PRICES | ProviderInstrumentPricesEvent | Price snapshot for all instruments |
PROVIDER_MARKET_INTELLIGENCE_EVENT | MarketIntelligenceEvent | Breaking news, macro, on-chain, whale alert |
Detector channels — publisher: Detector (8101)
| Channel | Payload | Description |
|---|---|---|
DETECTOR_SIGNAL_GENERATED | DetectorSignalPayload | New strategy signal: symbol, side, confidence, strategyId |
DETECTOR_SIGNAL_UPDATED | DetectorSignalPayload | Signal updated (UI/observability only) |
DETECTOR_SIGNAL_INVALIDATED | DetectorSignalPayload | Signal invalidated |
DETECTOR_POSITION_OPEN_REQUEST | DetectorSignalPayload | Request to open position |
DETECTOR_POSITION_CLOSE_REQUEST | DetectorSignalPayload | Request to close position |
DETECTOR_POSITION_REDUCE_REQUEST | DetectorSignalPayload | Request to reduce position |
DETECTOR_POSITION_FLIP_REQUEST | DetectorSignalPayload | Request to flip direction |
DETECTOR_SIGNAL_GENERATED is the primary trigger for the Advisor pipeline. It carries a strategyId (detector sysname) used by DecisionWindowService to deduplicate per-strategy windows — two detectors on the same symbol each get an independent decision cycle.
Advisor channels — publisher: Advisor (8009)
Execution intents (consumed by Inspector — runtime validated):
| Channel | Description |
|---|---|
ADVISOR_EXECUTION_INTENT_OPEN | Intent to open a new position |
ADVISOR_EXECUTION_INTENT_CLOSE | Intent to close an existing position |
ADVISOR_EXECUTION_INTENT_REDUCE | Intent to reduce position size |
ADVISOR_EXECUTION_INTENT_FLIP | Intent to flip direction (close + open opposite) |
Decision events:
| Channel | Description |
|---|---|
ADVISOR_DECISION_MADE | Every completed decision cycle |
ADVISOR_DECISION_REQUEST | Manual decision trigger (consumed by Advisor) |
ADVISOR_DECISION_RESPONSE | Decision response (UI/observability) |
ADVISOR_SIGNAL_GENERATED | Advisor signal: direction, regime, volatilityBucket |
Telemetry (28 event types):
| Channel | Description |
|---|---|
ADVISOR_CONVICTION_SNAPSHOT | Conviction score snapshot |
ADVISOR_CONVICTION_CALIBRATION_SNAPSHOT | Calibration weights (Platt/isotonic) |
ADVISOR_PORTFOLIO_ALLOCATION_SNAPSHOT | Portfolio allocation per symbol |
ADVISOR_PORTFOLIO_CORRELATION_SNAPSHOT | Cross-symbol correlation adjustment |
ADVISOR_REGIME_CAPITAL_ROTATION_SNAPSHOT | Regime-based capital rotation |
ADVISOR_NET_EXPOSURE_SNAPSHOT | Net long/short exposure |
ADVISOR_FACTOR_EXPOSURE_SNAPSHOT | Factor exposure distribution |
ADVISOR_PORTFOLIO_RISK_MODE_SNAPSHOT | Risk mode: NORMAL / DEFENSIVE / SURVIVAL |
ADVISOR_PERFORMANCE_FEEDBACK_PROCESSED | Trade outcome processed, weights updated |
ADVISOR_OUTCOME_ATTRIBUTION_SNAPSHOT | Outcome counts by type/symbol/connector |
ADVISOR_AUTONOMOUS_CYCLE_STARTED | Autonomous cycle began |
ADVISOR_AUTONOMOUS_CYCLE_COMPLETED | Autonomous cycle ended (outcome enum) |
Alert channels:
| Channel | Description |
|---|---|
ADVISOR_CONFIDENCE_LOW | Conviction below threshold (default 0.3) → Inspector tightens risk to 50% for 10 min |
ADVISOR_MODEL_SWITCHED | LLM model fallback triggered |
ADVISOR_HALLUCINATION_DETECTED | Decision anomaly → Inspector tightens risk to 30% for 15 min |
Inspector channels — publisher: Inspector (8008)
Market regime events:
| Channel | Description |
|---|---|
INSPECTOR_TREND_DETECTED | Trend regime identified from price/MA analysis |
INSPECTOR_RANGE_DETECTED | Range-bound regime identified |
INSPECTOR_VOLATILITY_SPIKE | ATR/volatility exceeds threshold |
INSPECTOR_LIQUIDITY_DROPPED | Liquidity score drops below threshold |
INSPECTOR_DATA_GAP_DETECTED | Gap in market data stream |
INSPECTOR_DATA_OUTLIER_DETECTED | Statistical outlier (z-score) |
INSPECTOR_DATA_MATURITY_UPDATE | Data maturity flag changed |
Execution events:
| Channel | Description |
|---|---|
INSPECTOR_TRADE_OUTCOME | Trade result: PnL, slippage, latency, fees |
INSPECTOR_RISK_LIMIT_BREACH | Exposure cap breached |
INSPECTOR_RISK_MARGIN_UPDATE | Margin requirement changed |
INSPECTOR_RISK_KILL_SWITCH | Kill switch triggered |
Alert events (auto-forwarded to Telegram):
| Channel | Description |
|---|---|
INSPECTOR_ORDER_REJECTED | Order rejected by exchange |
INSPECTOR_LIQUIDATION_WARNING | Position approaching liquidation |
INSPECTOR_EXECUTION_LATENCY_ALERT | Execution latency exceeds threshold |
INSPECTOR_RISK_LIMIT_APPROACHING | Exposure approaching cap (pre-breach) |
INSPECTOR_CORRELATION_SPIKE | Cross-symbol correlation spike |
INSPECTOR_FUNDING_RATE_ALERT | Funding rate exceeds threshold |
Note: INSPECTOR_RISK_GOVERNOR_DECISION is NOT a Redis channel. It is an audit log label used with auditService.log(). Do not add to SubscriptionType.
Execution-critical channels: Redis Streams dual-write
Four execution-critical channels dual-write to Redis Streams for bounded replay:
ADVISOR_EXECUTION_INTENT_OPENADVISOR_EXECUTION_INTENT_CLOSEADVISOR_EXECUTION_INTENT_REDUCEADVISOR_EXECUTION_INTENT_FLIP
Also runtime-validated at publish/subscribe boundaries:
DETECTOR_SIGNAL_GENERATEDDETECTOR_POSITION_*_REQUESTPROVIDER_ORDER_FILLPROVIDER_ORDER_TERMINAL
Redis Streams provide bounded replay via stable subscriptionId — consumers can re-read the last N messages after reconnect without losing execution intents.
Cross-service subscription matrix
| Consumer → | PROVIDER_* | DETECTOR_* | ADVISOR_* | INSPECTOR_* |
|---|---|---|---|---|
| Detector | TRADE, ORDERBOOK, CANDLE | — | — | TREND, RANGE, VOLATILITY, LIQUIDITY, DATA_GAP, DATA_OUTLIER |
| Advisor | TRADE, ORDERBOOK, CANDLE, MARKET_INTEL | SIGNAL_GENERATED | DECISION_REQUEST, CONFIDENCE_LOW, HALLUCINATION | TREND, RANGE, VOLATILITY, LIQUIDITY, RISK_LIMIT, CORRELATION, FUNDING |
| Inspector | ALL market data + account | POSITION_*_REQUEST | EXECUTION_INTENT_* | — |
| Studio | ALL (via WS bridge) | SIGNAL_* | DECISION_, CONVICTION_, PORTFOLIO_* | ALL |
Latency observability
All services record event bus latency via @barfinex/utils:
event_bus_latency_ms— round-trip pub→sub latencyevent_trace_latency_ms— end-to-end trace latency from event metadata- p50 / p95 / p99 / p999 quantile gauges — exposed on Prometheus
/metricsendpoint
Provider publishes these for market/order events. Detector and Advisor record from event metadata. Inspector records on execution/risk ingress.
Deprecated aliases (never use in new code)
| Deprecated | Maps to |
|---|---|
EXECUTION_INTENT_OPEN | ADVISOR_EXECUTION_INTENT_OPEN |
EXECUTION_INTENT_CLOSE | ADVISOR_EXECUTION_INTENT_CLOSE |
EXECUTION_INTENT_REDUCE | ADVISOR_EXECUTION_INTENT_REDUCE |
EXECUTION_INTENT_FLIP | ADVISOR_EXECUTION_INTENT_FLIP |