Subscribers and the Art of Not Caring

How our event bus dispatches to subscribers with pattern matching, circuit breakers, and exactly-once delivery via Redis.

An event fires. A leave request was approved, a payroll run processed, an employee onboarded. The event carries a structured payload and request context. Now what?

Something has to receive it. In our system, that’s subscribers - classes that declare which events they care about and what to do when they arrive.

The Bus

The dispatcher is deliberately simple:

module Events::Subscribers
REGISTRY = []
def self.dispatch(event_name, payload:, context: {})
event = { name: event_name, payload:, context:, occurred_at: Time.current }
REGISTRY.each do |subscriber|
dispatch_to_subscriber(subscriber, event)
end
end
def self.dispatch_to_subscriber(subscriber, event)
return unless subscriber.handles?(event)
subscriber.consume(event)
rescue => e
Rails.logger.error "[Events] #{subscriber.name} failed for #{event[:name]}: #{e.message}"
end
end

Iterate registered subscribers, check if they handle this event, call consume.

The critical design choice is that rescue block. Per-subscriber error catching. If Slack is down, it doesn’t prevent PostHog from tracking the event. If analytics fails, emails still send. One failure is isolated. The loop keeps going.

Pattern Matching

Subscribers declare what they care about with a DSL:

module Events::Subscribable
def subscribes_to(*patterns)
@event_patterns = patterns.map(&:to_s).freeze
end
def handles?(event)
return true if @event_patterns.nil?
name = event[:name]
@event_patterns.any? do |pattern|
pattern.end_with?(".") ? name.start_with?(pattern) : name == pattern
end
end
end

Exact match: "payroll.failed" handles only that event. Prefix match: "leave.request." handles every leave request event - submitted, approved, rejected, cancelled, escalated. Omit subscribes_to entirely and the subscriber handles everything.

We considered regex patterns and topic hierarchies. Didn’t need them.

The Subscribers

Every event passes through a set of registered subscribers at boot:

LogSubscriber handles everything. Every event gets a structured log line with the event name, payload summary, and severity - :error for failed events, :info for the rest.

SentrySubscriber also handles everything. It adds breadcrumbs to the current Sentry scope and captures error events directly. When a bug report comes in three days later, the breadcrumb trail shows every event that fired in that request. Worth its weight.

PosthogSubscriber sends analytics to PostHog, but only whitelisted fields:

ALLOWED_PAYLOAD_FIELDS = %i[
employee_id user_id company_id leave_request_id
payroll_run_id invoice_id leave_kind status
previous_status num_days employee_count
].freeze
def build_properties(event)
event[:payload].slice(*ALLOWED_PAYLOAD_FIELDS)
end

No names, no emails, no salaries. Just IDs and status codes. PII doesn’t leave the system.

SlackSubscriber sends formatted messages to team channels. Payroll events go to #payroll, leave events to #leave, billing events to #billing. Each channel only sees what’s relevant.

TodoSubscriber auto-creates tasks. A payroll run gets created? Todo for the approver. An invoice goes overdue? Todo for finance. An employee is onboarded? Todo for HR to verify documents.

NotificationSubscriber bridges events and email delivery - it routes events to the notifier system, which is the subject of the next post.

Plus two domain-specific subscribers for billing and employee lifecycle side effects.

Circuit Breakers

PostHog and Slack are external services. They go down. When they do, you don’t want the subscriber filling the error log with thousands of identical failures, one per event.

Both use a circuit breaker:

class PosthogSubscriber
extend ::CircuitBreaker, Events::Subscribable, Events::Idempotent
def self.consume(event)
return unless claim_event!(event)
return if circuit_open?
POSTHOG_CLIENT.capture(distinct_id: distinct_id_from(event), event: event[:name])
reset_circuit!
rescue => e
record_failure!
end
end

Five consecutive failures open the circuit. No more attempts for 60 seconds. After that, one test request. Success closes the circuit. Failure means another 60 seconds.

Three lines of integration for any subscriber that talks to an external service. The event system keeps running. The external service recovers on its own timeline.

Exactly Once

Background jobs retry. Network hiccups cause duplicate dispatches. Every subscriber with side effects - sending emails, posting to Slack, tracking analytics - needs idempotency.

def claim_event!(event)
cache_key = "#{name}:#{event.dig(:payload, :idempotency_key)}"
redis.setnx_with_expiry(cache_key, Time.current.to_i, ex: 24.hours)
end

Redis SETNX with a 24-hour expiry. Atomic. If the key exists, the event was already processed - return false, skip. If the key doesn’t exist, set it, proceed.

The idempotency key comes from the schema - a UUID v7 generated when the event is emitted. Same event, same key, no matter how many times the job retries.

Async by Default

Most events are dispatched through Events::DispatchJob, a Sidekiq job with two queues:

:events_critical for payroll, leave, billing, and auth events. :events for everything else. A payroll notification shouldn’t wait behind analytics tracking.

Subscribers react. But the most interesting subscriber doesn’t handle events directly - it routes them to notifiers.