One Event, Start to Finish

Tracing a single leave approval from emit_async through schema extraction, Sidekiq dispatch, subscriber routing, and email delivery. The whole chain, no gaps.

Five posts of layers. Schemas, emitters, subscribers, notifiers - each one explained in isolation. But the system only clicks when you see every layer fire in sequence for one real event.

So let’s trace a leave request approval. From the moment a manager clicks “Approve” to the moment Grace gets an email telling her she’s cleared for time off. Every hop, every class, every line that matters.

The Setup

Grace Wambui, a software engineer in the Engineering department, submitted a leave request for April 10-14. Three days of annual leave. Her department manager, David Mwangi, opens the pending request and clicks Approve.

The controller calls the model. The model delegates to the operation.

Step 1: The Operation

module Leave
class ApproveRequest
include Events::Emitter
def initialize(request:, approver:, comments: nil)
@request = request
@approver = approver
@comments = comments
end
def call
return authorization_error unless authorized?
balance = current_balance
balance_before = balance&.available
@request.transaction do
@request.approve_by!(approver: @approver, comments: @comments)
track_event(balance_before, balance&.reload&.available)
emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments)
end
Result.success(request: @request, balance_before:, balance_after: balance&.reload&.available)
end
end
end

Three things inside the transaction: approve the request, track an audit event, emit. That last line is the one we’re following:

emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments)

A symbol key (:leave_request_approved), the subject (the leave request), and two keyword arguments (the approver and optional comments). The operation doesn’t know what happens next. It doesn’t know that emails, Slack messages, or analytics events exist. Its job is done.

Step 2: The Emitter Builds the Schema

emit_async comes from Events::Emitter, a concern included in the operation:

def emit_async(event_key, subject = nil, **params)
schema = build_schema(event_key, subject, params)
return unless schema.valid?
captured_context = current_event_context
Events::DispatchJob.perform_later(schema.full_name, schema.extract, captured_context)
end

Three things happen, and each matters.

build_schema looks up :leave_request_approved in the schema registry, which maps it to Events::Schema::Leave::RequestApproved - auto-registered when the class declared event_name "leave.request.approved". If the schema doesn’t exist, a NullSchema returns and valid? is false. The event silently disappears with an error log. No crash.

current_event_context captures who triggered this event, right now, while Current.user and the request middleware context still exist:

def current_event_context
current_context = {
user_id: Current.user&.id,
company_id: Current.company&.id
}.compact
Events::Context.current.merge(current_context)
end

Events::Context.current was populated by middleware earlier in the request: request_id, ip_address, user_agent, browser_name, device_type. The emitter merges in fresh Current values. This combined context hash is critical - three seconds from now, when a Sidekiq job processes this event, Current.user will be nil. The context has to travel with the event.

schema.extract calls the schema to build the payload.

Step 3: Schema Extraction

module Events::Schema::Leave
class RequestApproved < Events::Schema::Base
event_name "leave.request.approved"
from_subject do |request|
{
leave_request_id: request.id,
leave_request_reference: request.reference,
company_id: request.employee&.company_id,
company_name: request.employee&.company&.name,
employee_id: request.employee_id,
employee_reference: request.employee&.reference,
leave_kind: request.leave_kind&.code,
start_date: request.start_date,
end_date: request.end_date,
num_days: request.num_days
}
end
params do
required :actor
optional :comments
end
end
end

When extract is called:

  1. A UUID v7 idempotency key is generated: "019505a3-7b2f-7d4e-8a1c-...". This key follows the event through every downstream layer.

  2. from_subject runs with Grace’s leave request. IDs, references, leave kind code, dates, day count. Note: request.employee&.company_id, not request.employee.company. The schema extracts scalar values only. No ActiveRecord objects survive - the base class converts any AR instance to its ID.

  3. The actor param (David) gets special handling. The base class extracts actor_id, actor_type, and actor_reference automatically. David the Employee becomes { actor_id: 7, actor_type: "Employee" }.

  4. comments passes through as a plain string, truncated at 10,000 characters.

The resulting payload:

{
schema_version: 1,
idempotency_key: "019505a3-7b2f-7d4e-8a1c-d3f2a1b4e5c6",
leave_request_id: 42,
leave_request_reference: "LR-2026-0042",
company_id: 1,
company_name: "Kazisafi Payroll",
employee_id: 15,
employee_reference: "EMP050",
leave_kind: "annual",
start_date: "2026-04-10",
end_date: "2026-04-14",
num_days: 3,
actor_id: 7,
actor_type: "Employee",
comments: "Approved, enjoy your break"
}

Plain data. IDs, strings, dates. No objects, no associations, no surprises.

Step 4: The Job Queue

Events::DispatchJob.perform_later("leave.request.approved", payload, captured_context)

The dispatch job decides which queue:

class DispatchJob < ApplicationJob
CRITICAL_PREFIXES = %w[leave. payroll. billing. invoice. auth.].freeze
queue_as do
event_name = arguments.first.to_s
(CRITICAL_PREFIXES.any? { event_name.start_with?(it) }) ? :events_critical : :events
end
sidekiq_options retry: 3, dead: true
end

"leave.request.approved" starts with "leave." - critical prefix. The job goes to :events_critical. Grace’s approval notification won’t wait behind a batch of analytics events from an employee import.

Three retries, then dead letter queue. Sentry gets a structured error with the event name and idempotency key.

Step 5: The Job Fires

Sidekiq picks up the job:

def perform(event_name, payload, context = {})
Events::Context.clear
key = payload["idempotency_key"] || payload[:idempotency_key]
if key && !claim_event(key)
log_info "Skipping duplicate event", event_name:, idempotency_key: key
return
end
Events::Subscribers.dispatch(event_name, payload:, context:)
ensure
Events::Context.clear
end

First: clear stale context from a previous job in this thread. Sidekiq reuses threads.

Second: idempotency check. claim_event calls Redis SETNX with a 24-hour TTL:

def claim_event(key)
redis = FastRedis.for("events:dispatch")
redis.setnx_with_expiry("dispatched:#{key}", Time.current.to_i, ex: 24.hours.to_i)
end

If this is a retry and the event was already dispatched, SETNX returns false. Log, return. No double dispatch.

This is the first of three idempotency checks in the chain. The dispatch job checks once. Individual subscribers check again. Notifiers check a third time. Belt, suspenders, backup belt.

Third: dispatch to all subscribers.

Step 6: The Bus

module Events::Subscribers
REGISTRY = []
def self.dispatch(event_name, payload:, context:)
event = { name: event_name, payload:, context:, occurred_at: Time.current }
REGISTRY.each do |subscriber|
dispatch_to_subscriber(subscriber, event)
end
end
def self.dispatch_to_subscriber(subscriber, event)
return unless subscriber.handles?(event)
subscriber.consume(event)
rescue => e
Rails.logger.error "[Events] #{subscriber.name} failed for #{event[:name]}: #{e.message}"
end
end

Build event hash, iterate all registered subscribers. For each: check, consume, rescue. That rescue block is everything. PostHog down? Logged, but the loop continues. Slack, the notifier, the logger - all still run.

Now let’s walk through each subscriber for "leave.request.approved":

LogSubscriber

Handles everything. Writes a structured log line:

[Event] leave.request.approved | {leave_request_id: 42, company_id: 1, employee_id: 15, ...}

Severity :info. Would be :error if the event name contained .failed.

SentrySubscriber

Also handles everything. Adds a breadcrumb to the Sentry scope:

Sentry.add_breadcrumb(Sentry::Breadcrumb.new(
category: "event",
message: "leave.request.approved",
data: sanitized_payload,
level: "info"
))

If something breaks later in this job, the Sentry report shows this event in the breadcrumb trail.

PosthogSubscriber

Subscribes to "leave.". Pattern match succeeds. Two checks before sending:

def self.consume(event)
return unless claim_event!(event)
return if circuit_open?
POSTHOG_CLIENT.capture(
distinct_id: distinct_id_from(event),
event: event[:name],
properties: build_properties(event)
)
reset_circuit!
end

claim_event! - subscriber-level idempotency. Same Redis pattern as the dispatch job, but scoped to "PosthogSubscriber:019505a3-...". Even if the dispatch job somehow runs twice, PostHog sees the event once.

circuit_open? - five consecutive failures open the circuit. Skip for 60 seconds. Grace’s approval doesn’t depend on analytics working.

Properties go through a whitelist:

ALLOWED_PAYLOAD_FIELDS = %i[
employee_id user_id company_id leave_request_id
leave_kind status num_days employee_count
].freeze

No names, no emails, no comments. PII stays in the database.

SlackSubscriber

Subscribes to "leave.request.". Same idempotency and circuit breaker pattern. Posts a formatted message to #leave:

Leave request approved: Grace Wambui (Engineering) - Annual Leave, Apr 10-14 (3 days). Approved by David Mwangi.

TodoSubscriber

Subscribes to specific names: "leave.request.submitted", "payroll.created", "billing.invoice.overdue". "leave.request.approved" isn’t in the list. handles? returns false. Skip.

The normal case. Most subscribers skip most events. A string prefix check, nothing more.

EmployeeSubscriber and BillingSubscriber

Domain-specific. Neither handles leave events. Both skip.

NotificationSubscriber

The interesting one. No subscribes_to patterns:

class NotificationSubscriber
def self.handles?(event)
Notifier::Registry.handles?(event[:name])
end
def self.consume(event)
Notifier::Registry.route(event)
end
end

It delegates entirely to the notifier registry. Does any notifier handle "leave.request.approved"? The registry checks every registered notifier:

def self.handles?(event_name)
notifiers.any? { |notifier| notifier.handles_event?(event_name) }
end

Leave::RequestNotifier has subscribe_to "leave.request.approved" => :approved. Match. consume calls Notifier::Registry.route.

Step 7: The Notifier Registry Routes

class Notifier::Registry
def self.route(event)
notifiers.each do |notifier_class|
next unless notifier_class.handles_event?(event[:name])
notifier_class.deliver_for_event(event)
rescue => e
Rails.logger.error "[Notifier::Registry] #{notifier_class.name} failed: #{e.message}"
end
end
end

Same isolation pattern as the subscriber bus. One notifier fails, others still deliver. The registry finds Leave::RequestNotifier and calls deliver_for_event.

Step 8: The Notifier Delivers

deliver_for_event on Notifier::Base:

def self.deliver_for_event(event)
return unless claim_event!(event)
action = event_mappings[event[:name]]
return unless action
subject = extract_subject(event)
return unless subject
notifier = new(subject)
context = extract_context(event)
notifier.send(action, **context).deliver
end

Third idempotency check: claim_event! scoped to "Leave::RequestNotifier:019505a3-...". Three different Redis keys for the same event - dispatch, PostHog subscriber, notifier. Each layer guarantees its own exactly-once.

action maps "leave.request.approved" to :approved via the subscribe_to hash.

extract_subject is where the payload comes back to life. The event carries leave_request_id: 42. The extractor looks up the model:

SUBJECT_MODELS = {
leave_request: "Leave::Request",
employee: "Employee",
payroll_run: "PayrollRun",
# ...
}

Finds the Leave::Request with ID 42. Then the multi-tenant safety check: does request.employee.company_id match payload[:company_id]? If not, log and bail. A misconfigured event can’t trigger a notification for the wrong company.

extract_context rehydrates the actor. actor_id: 7, actor_type: "Employee" becomes the actual Employee record - David Mwangi. Passed as keyword arguments to the action method.

The notifier is instantiated with Grace’s leave request as source, and :approved is called:

class Leave::RequestNotifier < Notifier::Base
subscribe_to "leave.request.approved" => :approved
def recipients
case current_action
when :submitted, :reminder
pending_approvers
when :approved, :rejected, :cancelled
[source.employee]
when :escalated
hr_admins
end
end
def approved(**)
@current_action = :approved
email { LeaveMailer.request_approved(source) }
end
end

approved sets @current_action and captures the email block. Then deliver:

def deliver
Array(recipients).each do |recipient|
@recipient = recipient
deliver_email if recipient_has_email?
end
end

current_action is :approved, so recipients returns [source.employee]. Grace.

deliver_email evaluates the block: LeaveMailer.request_approved(source). Since delivery_method defaults to :later, it calls .deliver_later. The email gets queued to Sidekiq’s mailer queue.

Step 9: Grace Gets Her Email

The mailer renders the template:

Your leave request for April 10-14 (3 days, Annual Leave) has been approved by David Mwangi.


The Full Chain

David clicked Approve. Every stop that event made:

#LayerClassWhat Happened
1OperationLeave::ApproveRequestApproved the request, called emit_async
2EmitterEvents::EmitterLooked up schema, captured context
3SchemaEvents::Schema::Leave::RequestApprovedExtracted payload from the leave request
4Job queueEvents::DispatchJobEnqueued to :events_critical
5JobEvents::DispatchJob#performIdempotency check, dispatched to bus
6BusEvents::SubscribersRouted to all subscribers
7LogSubscriberWrote structured log line
8SentrySubscriberAdded breadcrumb
9PosthogSubscriberSent analytics (whitelisted fields only)
10SlackSubscriberPosted to #leave channel
11NotificationSubscriberDelegated to Notifier::Registry
12RegistryNotifier::RegistryFound Leave::RequestNotifier
13NotifierLeave::RequestNotifierExtracted subject, resolved recipients
14MailerLeaveMailerQueued email to Grace

Fourteen hops. One emit_async call in the operation. Five side effects - log, breadcrumb, analytics, Slack, email - and none of them coupled to each other or to the operation that triggered them.

PostHog down? Circuit breaker absorbs it. Slack API slow? Email still sends. Mailer fails? Analytics still track. Each subscriber catches its own errors. Each layer has its own idempotency check.

The operation’s entire awareness of this chain:

emit_async(:leave_request_approved, @request, actor: @approver, comments: @comments)

That’s the system.


Part 6 of a series on event-driven development at Kazisafi. Start from Part 1 if you haven’t already.