Barfinex

Event Bus Architecture — Redis pub/sub backbone

Redis pub/sub event bus: BaseEvent envelope, 35+ typed channels across all services, source enforcement, Redis Streams dual-write for replay, latency observability.

Overview

The event bus is the nervous system of the Barfinex platform. Every inter-service communication — market data, signals, decisions, risk events, execution outcomes — travels as a typed, enveloped message over Redis pub/sub. No service calls another service's REST API for real-time data; they publish and subscribe through the bus.

Library: libs/event-bus — Redis pub/sub abstraction with exponential-backoff reconnect, dedup subscription, and per-interval latency metrics. Reads EVENT_BUS_REDIS_URL / REDIS_URL / REDIS_HOST+REDIS_PORT.


BaseEvent envelope

Every message on every channel is wrapped in a BaseEvent:

{
  eventId:       string    -- UUID, unique per event instance
  type:          string    -- SubscriptionType enum key
  source:        string    -- service name: 'provider' | 'detector' | 'advisor' | 'inspector'
  timestamp:     number    -- Unix ms
  correlationId: string?   -- links events in one decision cycle
  causationId:   string?   -- the eventId that caused this event
  payload:       T         -- typed payload per EventPayloadMap
}

Source enforcement: assertEventSourceMatch(type, source) is called at publish time — each service can only emit events with its own prefix. Detector cannot publish PROVIDER_*; Advisor cannot publish INSPECTOR_*. Compile-time guard.


Channel categories

All channel names are defined in libs/types/src/subscription.interface.ts as the SubscriptionType enum. Never use string literals — always import from @barfinex/types.

Provider channels — publisher: Provider (8081)

ChannelPayloadDescription
PROVIDER_MARKETDATA_TRADEProviderTradeEventLive trade tick: symbol, price, volume, side, timestamp
PROVIDER_MARKETDATA_ORDERBOOKProviderOrderBookEventOrder book snapshot with bids/asks
PROVIDER_MARKETDATA_CANDLEProviderCandleEventCandle close: OHLCV + symbol + interval
PROVIDER_ACCOUNT_EVENTProviderAccountEventAccount balance/equity update from exchange
PROVIDER_ORDER_CREATEProviderOrderCreateEventOrder acknowledgement (REST response — not a fill)
PROVIDER_ORDER_CLOSEProviderOrderCloseEventOrder closed
PROVIDER_INSTRUMENTSProviderInstrumentsEventInstrument list update
PROVIDER_INSTRUMENT_PRICESProviderInstrumentPricesEventPrice snapshot for all instruments
PROVIDER_MARKET_INTELLIGENCE_EVENTMarketIntelligenceEventBreaking news, macro, on-chain, whale alert

Detector channels — publisher: Detector (8101)

ChannelPayloadDescription
DETECTOR_SIGNAL_GENERATEDDetectorSignalPayloadNew strategy signal: symbol, side, confidence, strategyId
DETECTOR_SIGNAL_UPDATEDDetectorSignalPayloadSignal updated (UI/observability only)
DETECTOR_SIGNAL_INVALIDATEDDetectorSignalPayloadSignal invalidated
DETECTOR_POSITION_OPEN_REQUESTDetectorSignalPayloadRequest to open position
DETECTOR_POSITION_CLOSE_REQUESTDetectorSignalPayloadRequest to close position
DETECTOR_POSITION_REDUCE_REQUESTDetectorSignalPayloadRequest to reduce position
DETECTOR_POSITION_FLIP_REQUESTDetectorSignalPayloadRequest to flip direction

DETECTOR_SIGNAL_GENERATED is the primary trigger for the Advisor pipeline. It carries a strategyId (detector sysname) used by DecisionWindowService to deduplicate per-strategy windows — two detectors on the same symbol each get an independent decision cycle.

Advisor channels — publisher: Advisor (8009)

Execution intents (consumed by Inspector — runtime validated):

ChannelDescription
ADVISOR_EXECUTION_INTENT_OPENIntent to open a new position
ADVISOR_EXECUTION_INTENT_CLOSEIntent to close an existing position
ADVISOR_EXECUTION_INTENT_REDUCEIntent to reduce position size
ADVISOR_EXECUTION_INTENT_FLIPIntent to flip direction (close + open opposite)

Decision events:

ChannelDescription
ADVISOR_DECISION_MADEEvery completed decision cycle
ADVISOR_DECISION_REQUESTManual decision trigger (consumed by Advisor)
ADVISOR_DECISION_RESPONSEDecision response (UI/observability)
ADVISOR_SIGNAL_GENERATEDAdvisor signal: direction, regime, volatilityBucket

Telemetry (28 event types):

ChannelDescription
ADVISOR_CONVICTION_SNAPSHOTConviction score snapshot
ADVISOR_CONVICTION_CALIBRATION_SNAPSHOTCalibration weights (Platt/isotonic)
ADVISOR_PORTFOLIO_ALLOCATION_SNAPSHOTPortfolio allocation per symbol
ADVISOR_PORTFOLIO_CORRELATION_SNAPSHOTCross-symbol correlation adjustment
ADVISOR_REGIME_CAPITAL_ROTATION_SNAPSHOTRegime-based capital rotation
ADVISOR_NET_EXPOSURE_SNAPSHOTNet long/short exposure
ADVISOR_FACTOR_EXPOSURE_SNAPSHOTFactor exposure distribution
ADVISOR_PORTFOLIO_RISK_MODE_SNAPSHOTRisk mode: NORMAL / DEFENSIVE / SURVIVAL
ADVISOR_PERFORMANCE_FEEDBACK_PROCESSEDTrade outcome processed, weights updated
ADVISOR_OUTCOME_ATTRIBUTION_SNAPSHOTOutcome counts by type/symbol/connector
ADVISOR_AUTONOMOUS_CYCLE_STARTEDAutonomous cycle began
ADVISOR_AUTONOMOUS_CYCLE_COMPLETEDAutonomous cycle ended (outcome enum)

Alert channels:

ChannelDescription
ADVISOR_CONFIDENCE_LOWConviction below threshold (default 0.3) → Inspector tightens risk to 50% for 10 min
ADVISOR_MODEL_SWITCHEDLLM model fallback triggered
ADVISOR_HALLUCINATION_DETECTEDDecision anomaly → Inspector tightens risk to 30% for 15 min

Inspector channels — publisher: Inspector (8008)

Market regime events:

ChannelDescription
INSPECTOR_TREND_DETECTEDTrend regime identified from price/MA analysis
INSPECTOR_RANGE_DETECTEDRange-bound regime identified
INSPECTOR_VOLATILITY_SPIKEATR/volatility exceeds threshold
INSPECTOR_LIQUIDITY_DROPPEDLiquidity score drops below threshold
INSPECTOR_DATA_GAP_DETECTEDGap in market data stream
INSPECTOR_DATA_OUTLIER_DETECTEDStatistical outlier (z-score)
INSPECTOR_DATA_MATURITY_UPDATEData maturity flag changed

Execution events:

ChannelDescription
INSPECTOR_TRADE_OUTCOMETrade result: PnL, slippage, latency, fees
INSPECTOR_RISK_LIMIT_BREACHExposure cap breached
INSPECTOR_RISK_MARGIN_UPDATEMargin requirement changed
INSPECTOR_RISK_KILL_SWITCHKill switch triggered

Alert events (auto-forwarded to Telegram):

ChannelDescription
INSPECTOR_ORDER_REJECTEDOrder rejected by exchange
INSPECTOR_LIQUIDATION_WARNINGPosition approaching liquidation
INSPECTOR_EXECUTION_LATENCY_ALERTExecution latency exceeds threshold
INSPECTOR_RISK_LIMIT_APPROACHINGExposure approaching cap (pre-breach)
INSPECTOR_CORRELATION_SPIKECross-symbol correlation spike
INSPECTOR_FUNDING_RATE_ALERTFunding rate exceeds threshold

Note: INSPECTOR_RISK_GOVERNOR_DECISION is NOT a Redis channel. It is an audit log label used with auditService.log(). Do not add to SubscriptionType.


Execution-critical channels: Redis Streams dual-write

Four execution-critical channels dual-write to Redis Streams for bounded replay:

  • ADVISOR_EXECUTION_INTENT_OPEN
  • ADVISOR_EXECUTION_INTENT_CLOSE
  • ADVISOR_EXECUTION_INTENT_REDUCE
  • ADVISOR_EXECUTION_INTENT_FLIP

Also runtime-validated at publish/subscribe boundaries:

  • DETECTOR_SIGNAL_GENERATED
  • DETECTOR_POSITION_*_REQUEST
  • PROVIDER_ORDER_FILL
  • PROVIDER_ORDER_TERMINAL

Redis Streams provide bounded replay via stable subscriptionId — consumers can re-read the last N messages after reconnect without losing execution intents.


Cross-service subscription matrix

Consumer →PROVIDER_*DETECTOR_*ADVISOR_*INSPECTOR_*
DetectorTRADE, ORDERBOOK, CANDLETREND, RANGE, VOLATILITY, LIQUIDITY, DATA_GAP, DATA_OUTLIER
AdvisorTRADE, ORDERBOOK, CANDLE, MARKET_INTELSIGNAL_GENERATEDDECISION_REQUEST, CONFIDENCE_LOW, HALLUCINATIONTREND, RANGE, VOLATILITY, LIQUIDITY, RISK_LIMIT, CORRELATION, FUNDING
InspectorALL market data + accountPOSITION_*_REQUESTEXECUTION_INTENT_*
StudioALL (via WS bridge)SIGNAL_*DECISION_, CONVICTION_, PORTFOLIO_*ALL

Latency observability

All services record event bus latency via @barfinex/utils:

  • event_bus_latency_ms — round-trip pub→sub latency
  • event_trace_latency_ms — end-to-end trace latency from event metadata
  • p50 / p95 / p99 / p999 quantile gauges — exposed on Prometheus /metrics endpoint

Provider publishes these for market/order events. Detector and Advisor record from event metadata. Inspector records on execution/risk ingress.


Deprecated aliases (never use in new code)

DeprecatedMaps to
EXECUTION_INTENT_OPENADVISOR_EXECUTION_INTENT_OPEN
EXECUTION_INTENT_CLOSEADVISOR_EXECUTION_INTENT_CLOSE
EXECUTION_INTENT_REDUCEADVISOR_EXECUTION_INTENT_REDUCE
EXECUTION_INTENT_FLIPADVISOR_EXECUTION_INTENT_FLIP

Let’s Get in Touch

Have questions or want to explore Barfinex? Send us a message.