One Event, Start to Finish
Tracing a single leave approval from emit_async through schema extraction, Sidekiq dispatch, subscriber routing, and email delivery. The whole chain, no gaps.
Five posts of layers. Schemas, emitters, subscribers, notifiers - each one explained in isolation. But the system only clicks when you see every layer fire in sequence for one real event.
So let’s trace a leave request approval. From the moment a manager clicks “Approve” to the moment Grace gets an email telling her she’s cleared for time off. Every hop, every class, every line that matters.
The Setup
Grace Wambui, a software engineer in the Engineering department, submitted a leave request for April 10-14. Three days of annual leave. Her department manager, David Mwangi, opens the pending request and clicks Approve.
The controller calls the model. The model delegates to the operation.
Step 1: The Operation
module Leave class ApproveRequest include Events::Emitter
def initialize(request:, approver:, comments: nil) @request = request @approver = approver @comments = comments end
def call return authorization_error unless authorized?
balance = current_balance balance_before = balance&.available
@request.transaction do @request.approve_by!(approver: @approver, comments: @comments)
track_event(balance_before, balance&.reload&.available) emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments) end
Result.success(request: @request, balance_before:, balance_after: balance&.reload&.available) end endendThree things inside the transaction: approve the request, track an audit event, emit. That last line is the one we’re following:
emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments)A symbol key (:leave_request_approved), the subject (the leave request), and two keyword arguments (the approver and optional comments). The operation doesn’t know what happens next. It doesn’t know that emails, Slack messages, or analytics events exist. Its job is done.
Step 2: The Emitter Builds the Schema
emit_async comes from Events::Emitter, a concern included in the operation:
def emit_async(event_key, subject = nil, **params) schema = build_schema(event_key, subject, params) return unless schema.valid?
captured_context = current_event_context
Events::DispatchJob.perform_later(schema.full_name, schema.extract, captured_context)endThree things happen, and each matters.
build_schema looks up :leave_request_approved in the schema registry, which maps it to Events::Schema::Leave::RequestApproved - auto-registered when the class declared event_name "leave.request.approved". If the schema doesn’t exist, a NullSchema returns and valid? is false. The event silently disappears with an error log. No crash.
current_event_context captures who triggered this event, right now, while Current.user and the request middleware context still exist:
def current_event_context current_context = { user_id: Current.user&.id, company_id: Current.company&.id }.compact
Events::Context.current.merge(current_context)endEvents::Context.current was populated by middleware earlier in the request: request_id, ip_address, user_agent, browser_name, device_type. The emitter merges in fresh Current values. This combined context hash is critical - three seconds from now, when a Sidekiq job processes this event, Current.user will be nil. The context has to travel with the event.
schema.extract calls the schema to build the payload.
Step 3: Schema Extraction
module Events::Schema::Leave class RequestApproved < Events::Schema::Base event_name "leave.request.approved"
from_subject do |request| { leave_request_id: request.id, leave_request_reference: request.reference, company_id: request.employee&.company_id, company_name: request.employee&.company&.name, employee_id: request.employee_id, employee_reference: request.employee&.reference, leave_kind: request.leave_kind&.code, start_date: request.start_date, end_date: request.end_date, num_days: request.num_days } end
params do required :actor optional :comments end endendWhen extract is called:
-
A UUID v7 idempotency key is generated:
"019505a3-7b2f-7d4e-8a1c-...". This key follows the event through every downstream layer. -
from_subjectruns with Grace’s leave request. IDs, references, leave kind code, dates, day count. Note:request.employee&.company_id, notrequest.employee.company. The schema extracts scalar values only. No ActiveRecord objects survive - the base class converts any AR instance to its ID. -
The
actorparam (David) gets special handling. The base class extractsactor_id,actor_type, andactor_referenceautomatically. David the Employee becomes{ actor_id: 7, actor_type: "Employee" }. -
commentspasses through as a plain string, truncated at 10,000 characters.
The resulting payload:
{ schema_version: 1, idempotency_key: "019505a3-7b2f-7d4e-8a1c-d3f2a1b4e5c6", leave_request_id: 42, leave_request_reference: "LR-2026-0042", company_id: 1, company_name: "Kazisafi Payroll", employee_id: 15, employee_reference: "EMP050", leave_kind: "annual", start_date: "2026-04-10", end_date: "2026-04-14", num_days: 3, actor_id: 7, actor_type: "Employee", comments: "Approved, enjoy your break"}Plain data. IDs, strings, dates. No objects, no associations, no surprises.
Step 4: The Job Queue
Events::DispatchJob.perform_later("leave.request.approved", payload, captured_context)The dispatch job decides which queue:
class DispatchJob < ApplicationJob CRITICAL_PREFIXES = %w[leave. payroll. billing. invoice. auth.].freeze
queue_as do event_name = arguments.first.to_s (CRITICAL_PREFIXES.any? { event_name.start_with?(it) }) ? :events_critical : :events end
sidekiq_options retry: 3, dead: trueend"leave.request.approved" starts with "leave." - critical prefix. The job goes to :events_critical. Grace’s approval notification won’t wait behind a batch of analytics events from an employee import.
Three retries, then dead letter queue. Sentry gets a structured error with the event name and idempotency key.
Step 5: The Job Fires
Sidekiq picks up the job:
def perform(event_name, payload, context = {}) Events::Context.clear
key = payload["idempotency_key"] || payload[:idempotency_key] if key && !claim_event(key) log_info "Skipping duplicate event", event_name:, idempotency_key: key return end
Events::Subscribers.dispatch(event_name, payload:, context:)ensure Events::Context.clearendFirst: clear stale context from a previous job in this thread. Sidekiq reuses threads.
Second: idempotency check. claim_event calls Redis SETNX with a 24-hour TTL:
def claim_event(key) redis = FastRedis.for("events:dispatch") redis.setnx_with_expiry("dispatched:#{key}", Time.current.to_i, ex: 24.hours.to_i)endIf this is a retry and the event was already dispatched, SETNX returns false. Log, return. No double dispatch.
This is the first of three idempotency checks in the chain. The dispatch job checks once. Individual subscribers check again. Notifiers check a third time. Belt, suspenders, backup belt.
Third: dispatch to all subscribers.
Step 6: The Bus
module Events::Subscribers REGISTRY = []
def self.dispatch(event_name, payload:, context:) event = { name: event_name, payload:, context:, occurred_at: Time.current }
REGISTRY.each do |subscriber| dispatch_to_subscriber(subscriber, event) end end
def self.dispatch_to_subscriber(subscriber, event) return unless subscriber.handles?(event) subscriber.consume(event) rescue => e Rails.logger.error "[Events] #{subscriber.name} failed for #{event[:name]}: #{e.message}" endendBuild event hash, iterate all registered subscribers. For each: check, consume, rescue. That rescue block is everything. PostHog down? Logged, but the loop continues. Slack, the notifier, the logger - all still run.
Now let’s walk through each subscriber for "leave.request.approved":
LogSubscriber
Handles everything. Writes a structured log line:
[Event] leave.request.approved | {leave_request_id: 42, company_id: 1, employee_id: 15, ...}Severity :info. Would be :error if the event name contained .failed.
SentrySubscriber
Also handles everything. Adds a breadcrumb to the Sentry scope:
Sentry.add_breadcrumb(Sentry::Breadcrumb.new( category: "event", message: "leave.request.approved", data: sanitized_payload, level: "info"))If something breaks later in this job, the Sentry report shows this event in the breadcrumb trail.
PosthogSubscriber
Subscribes to "leave.". Pattern match succeeds. Two checks before sending:
def self.consume(event) return unless claim_event!(event) return if circuit_open?
POSTHOG_CLIENT.capture( distinct_id: distinct_id_from(event), event: event[:name], properties: build_properties(event) ) reset_circuit!endclaim_event! - subscriber-level idempotency. Same Redis pattern as the dispatch job, but scoped to "PosthogSubscriber:019505a3-...". Even if the dispatch job somehow runs twice, PostHog sees the event once.
circuit_open? - five consecutive failures open the circuit. Skip for 60 seconds. Grace’s approval doesn’t depend on analytics working.
Properties go through a whitelist:
ALLOWED_PAYLOAD_FIELDS = %i[ employee_id user_id company_id leave_request_id leave_kind status num_days employee_count].freezeNo names, no emails, no comments. PII stays in the database.
SlackSubscriber
Subscribes to "leave.request.". Same idempotency and circuit breaker pattern. Posts a formatted message to #leave:
Leave request approved: Grace Wambui (Engineering) - Annual Leave, Apr 10-14 (3 days). Approved by David Mwangi.
TodoSubscriber
Subscribes to specific names: "leave.request.submitted", "payroll.created", "billing.invoice.overdue". "leave.request.approved" isn’t in the list. handles? returns false. Skip.
The normal case. Most subscribers skip most events. A string prefix check, nothing more.
EmployeeSubscriber and BillingSubscriber
Domain-specific. Neither handles leave events. Both skip.
NotificationSubscriber
The interesting one. No subscribes_to patterns:
class NotificationSubscriber def self.handles?(event) Notifier::Registry.handles?(event[:name]) end
def self.consume(event) Notifier::Registry.route(event) endendIt delegates entirely to the notifier registry. Does any notifier handle "leave.request.approved"? The registry checks every registered notifier:
def self.handles?(event_name) notifiers.any? { |notifier| notifier.handles_event?(event_name) }endLeave::RequestNotifier has subscribe_to "leave.request.approved" => :approved. Match. consume calls Notifier::Registry.route.
Step 7: The Notifier Registry Routes
class Notifier::Registry def self.route(event) notifiers.each do |notifier_class| next unless notifier_class.handles_event?(event[:name])
notifier_class.deliver_for_event(event) rescue => e Rails.logger.error "[Notifier::Registry] #{notifier_class.name} failed: #{e.message}" end endendSame isolation pattern as the subscriber bus. One notifier fails, others still deliver. The registry finds Leave::RequestNotifier and calls deliver_for_event.
Step 8: The Notifier Delivers
deliver_for_event on Notifier::Base:
def self.deliver_for_event(event) return unless claim_event!(event)
action = event_mappings[event[:name]] return unless action
subject = extract_subject(event) return unless subject
notifier = new(subject) context = extract_context(event)
notifier.send(action, **context).deliverendThird idempotency check: claim_event! scoped to "Leave::RequestNotifier:019505a3-...". Three different Redis keys for the same event - dispatch, PostHog subscriber, notifier. Each layer guarantees its own exactly-once.
action maps "leave.request.approved" to :approved via the subscribe_to hash.
extract_subject is where the payload comes back to life. The event carries leave_request_id: 42. The extractor looks up the model:
SUBJECT_MODELS = { leave_request: "Leave::Request", employee: "Employee", payroll_run: "PayrollRun", # ...}Finds the Leave::Request with ID 42. Then the multi-tenant safety check: does request.employee.company_id match payload[:company_id]? If not, log and bail. A misconfigured event can’t trigger a notification for the wrong company.
extract_context rehydrates the actor. actor_id: 7, actor_type: "Employee" becomes the actual Employee record - David Mwangi. Passed as keyword arguments to the action method.
The notifier is instantiated with Grace’s leave request as source, and :approved is called:
class Leave::RequestNotifier < Notifier::Base subscribe_to "leave.request.approved" => :approved
def recipients case current_action when :submitted, :reminder pending_approvers when :approved, :rejected, :cancelled [source.employee] when :escalated hr_admins end end
def approved(**) @current_action = :approved email { LeaveMailer.request_approved(source) } endendapproved sets @current_action and captures the email block. Then deliver:
def deliver Array(recipients).each do |recipient| @recipient = recipient deliver_email if recipient_has_email? endendcurrent_action is :approved, so recipients returns [source.employee]. Grace.
deliver_email evaluates the block: LeaveMailer.request_approved(source). Since delivery_method defaults to :later, it calls .deliver_later. The email gets queued to Sidekiq’s mailer queue.
Step 9: Grace Gets Her Email
The mailer renders the template:
Your leave request for April 10-14 (3 days, Annual Leave) has been approved by David Mwangi.
The Full Chain
David clicked Approve. Every stop that event made:
| # | Layer | Class | What Happened |
|---|---|---|---|
| 1 | Operation | Leave::ApproveRequest | Approved the request, called emit_async |
| 2 | Emitter | Events::Emitter | Looked up schema, captured context |
| 3 | Schema | Events::Schema::Leave::RequestApproved | Extracted payload from the leave request |
| 4 | Job queue | Events::DispatchJob | Enqueued to :events_critical |
| 5 | Job | Events::DispatchJob#perform | Idempotency check, dispatched to bus |
| 6 | Bus | Events::Subscribers | Routed to all subscribers |
| 7 | LogSubscriber | Wrote structured log line | |
| 8 | SentrySubscriber | Added breadcrumb | |
| 9 | PosthogSubscriber | Sent analytics (whitelisted fields only) | |
| 10 | SlackSubscriber | Posted to #leave channel | |
| 11 | NotificationSubscriber | Delegated to Notifier::Registry | |
| 12 | Registry | Notifier::Registry | Found Leave::RequestNotifier |
| 13 | Notifier | Leave::RequestNotifier | Extracted subject, resolved recipients |
| 14 | Mailer | LeaveMailer | Queued email to Grace |
Fourteen hops. One emit_async call in the operation. Five side effects - log, breadcrumb, analytics, Slack, email - and none of them coupled to each other or to the operation that triggered them.
PostHog down? Circuit breaker absorbs it. Slack API slow? Email still sends. Mailer fails? Analytics still track. Each subscriber catches its own errors. Each layer has its own idempotency check.
The operation’s entire awareness of this chain:
emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments)That’s the system.
Part 6 of a series on event-driven development at Kazisafi. Start from Part 1 if you haven’t already.