Dispatching: PubSub через Effect
Механизм публикации и подписки на доменные события. EventBus как Driven Port. Effect PubSub для in-process dispatching. Синхронная vs асинхронная доставка. Гарантии доставки: at-most-once, at-least-once, exactly-once. Ordering и partitioning. Обработчики событий: EventHandler. Декларативная регистрация подписчиков. Ошибки обработчиков. Dead Letter Queue. Полная реализация EventBus для Todo-домена.
Введение: проблема доставки событий
Агрегат породил событие TodoCompleted. Что дальше? Кто-то должен доставить это событие всем заинтересованным подписчикам: сервису статистики, системе уведомлений, проекции для дашборда.
Эта задача решается паттерном Publish-Subscribe (PubSub) — издатель публикует событие, не зная о подписчиках, а подписчики получают события, не зная об издателе.
В Hexagonal Architecture механизм доставки событий — это Driven Port. Домен знает контракт (EventBus), но не знает, доставляются ли события через in-memory PubSub, через Kafka, через Redis Streams или через другой транспорт.
EventBus как Driven Port
Определение порта
import { Context, Effect } from "effect"
// ─── Порт: EventBus ─────────────────────────────────────────
// Контракт доставки событий — часть доменного слоя
// Реализация (in-memory, Kafka, Redis) — в инфраструктуре
class EventBus extends Context.Tag("EventBus")<
EventBus,
{
/** Опубликовать одно событие */
readonly publish: <E extends TodoEvent>(
event: E
) => Effect.Effect<void>
/** Опубликовать несколько событий атомарно */
readonly publishAll: (
events: ReadonlyArray<TodoEvent>
) => Effect.Effect<void>
/** Подписаться на события определённого типа */
readonly subscribe: <Tag extends TodoEvent["_tag"]>(
tag: Tag,
handler: EventHandler<Extract<TodoEvent, { readonly _tag: Tag }>>
) => Effect.Effect<void>
/** Подписаться на все события */
readonly subscribeAll: (
handler: EventHandler<TodoEvent>
) => Effect.Effect<void>
}
>() {}
// ─── Тип обработчика событий ────────────────────────────────
type EventHandler<E extends TodoEvent> = (
event: E
) => Effect.Effect<void, EventHandlerError>
// ─── Ошибка обработчика ─────────────────────────────────────
class EventHandlerError extends Schema.TaggedError<EventHandlerError>()(
"EventHandlerError",
{
handlerName: Schema.String,
eventTag: Schema.String,
eventId: Schema.String,
cause: Schema.Unknown,
}
) {}
Почему EventBus — Driven Port
EventBus — Driven (Secondary) Port, потому что:
- Домен использует его для вывода информации (публикации фактов)
- Домен вызывает EventBus, но не определяет, кто подписан
- Конкретная реализация — деталь инфраструктуры
Hexagonal Architecture
┌─────────────────────────┐
│ Application Core │
│ │
HTTP ────▶│ UseCase ──▶ Aggregate │
(Driving) │ │ │
│ ▼ │
│ EventBus Port ───│──▶ InMemory PubSub
│ (Driven Port) │ (Driven Adapter)
└─────────────────────────┘
Effect PubSub: in-process реализация
Effect предоставляет PubSub — примитив для in-process publish-subscribe коммуникации:
Основы Effect PubSub
import { PubSub, Queue, Effect, Layer, Chunk } from "effect"
// Создание PubSub с буфером
const program = Effect.gen(function* () {
// Создаём PubSub с bounded buffer (capacity = 100)
const pubsub = yield* PubSub.bounded<TodoEvent>(100)
// Публикация
yield* PubSub.publish(pubsub, someEvent)
// Подписка — создаёт Queue для получения событий
const subscription = yield* PubSub.subscribe(pubsub)
// Получение событий из очереди подписки
const event = yield* Queue.take(subscription)
})
Типы PubSub
Effect предоставляет несколько стратегий буферизации:
// Bounded — блокирует публикацию, если буфер полон
const bounded = PubSub.bounded<TodoEvent>(100)
// Unbounded — неограниченный буфер (осторожно с памятью)
const unbounded = PubSub.unbounded<TodoEvent>()
// Dropping — отбрасывает новые события при полном буфере
const dropping = PubSub.dropping<TodoEvent>(100)
// Sliding — отбрасывает старые события при полном буфере
const sliding = PubSub.sliding<TodoEvent>(100)
Для доменных событий в типичном приложении:
- bounded — для production (backpressure при перегрузке)
- unbounded — для тестов и разработки
- dropping — если потеря события допустима (метрики, логи)
- sliding — если важнее свежие события, чем старые
Реализация EventBus адаптера
InMemory EventBus
import { PubSub, Queue, Effect, Layer, Ref, HashMap, Fiber, Chunk } from "effect"
// ─── InMemory реализация EventBus ───────────────────────────
const InMemoryEventBusLive = Layer.effect(
EventBus,
Effect.gen(function* () {
// Основной PubSub канал
const pubsub = yield* PubSub.bounded<TodoEvent>(1000)
// Хранилище обработчиков для каждого типа события
const handlersRef = yield* Ref.make<
HashMap.HashMap<string, ReadonlyArray<EventHandler<any>>>
>(HashMap.empty())
// Хранилище обработчиков для всех событий
const globalHandlersRef = yield* Ref.make<
ReadonlyArray<EventHandler<TodoEvent>>
>([])
// Запускаем фоновый процесс обработки событий
yield* Effect.fork(
Effect.gen(function* () {
const subscription = yield* PubSub.subscribe(pubsub)
// Бесконечный цикл обработки
yield* Effect.forever(
Effect.gen(function* () {
const event = yield* Queue.take(subscription)
// Получаем обработчики для конкретного типа события
const handlers = yield* Ref.get(handlersRef)
const specificHandlers = HashMap.get(handlers, event._tag).pipe(
// Option → Array
(opt) => opt._tag === "Some" ? opt.value : []
)
// Получаем глобальные обработчики
const globalH = yield* Ref.get(globalHandlersRef)
// Вызываем все обработчики
const allHandlers = [...specificHandlers, ...globalH]
yield* Effect.forEach(
allHandlers,
(handler) =>
handler(event).pipe(
Effect.catchAll((error) =>
Effect.logError(
`Event handler failed for ${event._tag}: ${error}`
)
)
),
{ concurrency: "unbounded", discard: true }
)
})
)
})
)
return {
publish: (event) =>
PubSub.publish(pubsub, event).pipe(
Effect.asVoid,
Effect.tap(() =>
Effect.logDebug(`Event published: ${event._tag} (${event.eventId})`)
)
),
publishAll: (events) =>
PubSub.publishAll(pubsub, events).pipe(
Effect.asVoid,
Effect.tap(() =>
Effect.logDebug(`Published ${events.length} events`)
)
),
subscribe: (tag, handler) =>
Ref.update(handlersRef, (map) => {
const existing = HashMap.get(map, tag).pipe(
(opt) => opt._tag === "Some" ? opt.value : []
)
return HashMap.set(map, tag, [...existing, handler as EventHandler<any>])
}),
subscribeAll: (handler) =>
Ref.update(globalHandlersRef, (handlers) => [...handlers, handler]),
}
})
)
Упрощённый подход: синхронный dispatch
Для начала можно использовать более простой подход без фонового процесса — синхронный dispatch при публикации:
// ─── Упрощённый синхронный EventBus ─────────────────────────
const SimpleEventBusLive = Layer.effect(
EventBus,
Effect.gen(function* () {
// Хранилище обработчиков
const handlersRef = yield* Ref.make<
HashMap.HashMap<string, ReadonlyArray<EventHandler<any>>>
>(HashMap.empty())
const globalHandlersRef = yield* Ref.make<
ReadonlyArray<EventHandler<TodoEvent>>
>([])
// Функция dispatch — вызывает всех подписчиков
const dispatch = (event: TodoEvent): Effect.Effect<void> =>
Effect.gen(function* () {
const handlers = yield* Ref.get(handlersRef)
const specific = HashMap.get(handlers, event._tag).pipe(
(opt) => opt._tag === "Some" ? opt.value : []
)
const global = yield* Ref.get(globalHandlersRef)
// Вызываем обработчики параллельно с изоляцией ошибок
yield* Effect.forEach(
[...specific, ...global],
(handler) =>
handler(event).pipe(
Effect.catchAll((error) =>
Effect.logError(`Handler error: ${JSON.stringify(error)}`)
)
),
{ concurrency: "unbounded", discard: true }
)
})
return {
publish: (event) =>
dispatch(event).pipe(
Effect.tap(() =>
Effect.logDebug(`Event dispatched: ${event._tag}`)
)
),
publishAll: (events) =>
Effect.forEach(events, dispatch, { discard: true }),
subscribe: (tag, handler) =>
Ref.update(handlersRef, (map) => {
const existing = HashMap.get(map, tag).pipe(
(opt) => opt._tag === "Some" ? opt.value : []
)
return HashMap.set(map, tag, [...existing, handler as any])
}),
subscribeAll: (handler) =>
Ref.update(globalHandlersRef, (handlers) => [...handlers, handler]),
}
})
)
Обработчики событий (Event Handlers)
Определение обработчика
Обработчик — это функция, которая реагирует на конкретный тип события:
// ─── Обработчик: обновление статистики ──────────────────────
const statsHandler: EventHandler<TodoCompleted> = (event) =>
Effect.gen(function* () {
const stats = yield* StatsService
yield* stats.incrementCompleted()
yield* Effect.logInfo(
`Stats updated: todo ${event.todoId} completed`
)
}).pipe(
Effect.mapError((cause) =>
new EventHandlerError({
handlerName: "StatsHandler",
eventTag: event._tag,
eventId: event.eventId,
cause,
})
)
)
// ─── Обработчик: отправка уведомления ───────────────────────
const notificationHandler: EventHandler<TodoCompleted> = (event) =>
Effect.gen(function* () {
const notifications = yield* NotificationService
yield* notifications.send(
event.triggeredBy,
`Task "${event.title}" has been completed!`
)
}).pipe(
Effect.mapError((cause) =>
new EventHandlerError({
handlerName: "NotificationHandler",
eventTag: event._tag,
eventId: event.eventId,
cause,
})
)
)
// ─── Обработчик: аудит ─────────────────────────────────────
const auditHandler: EventHandler<TodoEvent> = (event) =>
Effect.gen(function* () {
const audit = yield* AuditService
yield* audit.log({
action: event._tag,
aggregateId: event.aggregateId,
triggeredBy: event.triggeredBy,
occurredAt: event.occurredAt,
correlationId: event.correlationId,
})
}).pipe(
Effect.mapError((cause) =>
new EventHandlerError({
handlerName: "AuditHandler",
eventTag: event._tag,
eventId: event.eventId,
cause,
})
)
)
Регистрация обработчиков
// ─── Регистрация всех обработчиков при старте ───────────────
const registerEventHandlers = Effect.gen(function* () {
const eventBus = yield* EventBus
// Конкретные подписки
yield* eventBus.subscribe("TodoCompleted", statsHandler)
yield* eventBus.subscribe("TodoCompleted", notificationHandler)
yield* eventBus.subscribe("TodoCreated", (event) =>
Effect.gen(function* () {
const stats = yield* StatsService
yield* stats.incrementTotal()
}).pipe(Effect.mapError((cause) =>
new EventHandlerError({
handlerName: "CreateStatsHandler",
eventTag: event._tag,
eventId: event.eventId,
cause,
})
))
)
// Глобальная подписка (все события → аудит)
yield* eventBus.subscribeAll(auditHandler)
yield* Effect.logInfo("All event handlers registered")
})
Синхронная vs асинхронная доставка
Синхронный dispatch (in-process)
Обработчики вызываются в том же процессе, немедленно после публикации:
// Синхронный: publish блокируется до завершения всех обработчиков
//
// completeTodo → repo.save → eventBus.publish → [handlers run] → return
// ↑ блокирующий вызов
const publish = (event: TodoEvent) =>
dispatch(event) // ждём завершения всех handlers
Плюсы: простота, консистентность (если handler упал — вы узнаете сразу). Минусы: медленнее (ждём обработчиков), один медленный handler тормозит всё.
Асинхронный dispatch (fire-and-forget)
Обработчики вызываются в фоновых Fiber’ах:
// Асинхронный: publish возвращается немедленно
//
// completeTodo → repo.save → eventBus.publish → return
// ↓ (fork)
// [handlers run in background]
const publishAsync = (event: TodoEvent) =>
Effect.fork(dispatch(event)).pipe(Effect.asVoid)
// publish возвращается сразу, handlers работают в фоне
Плюсы: быстрый ответ пользователю, медленные handlers не блокируют. Минусы: ошибки handlers теряются (нужен мониторинг), eventual consistency.
Гибридный подход
// Критичные обработчики — синхронно, некритичные — асинхронно
const publishHybrid = (event: TodoEvent) =>
Effect.gen(function* () {
// Синхронно: обновление Read Model (критично)
yield* readModelHandler(event)
// Асинхронно: уведомления, логи (некритично)
yield* Effect.fork(notificationHandler(event))
yield* Effect.fork(auditHandler(event))
})
Гарантии доставки
At-Most-Once
Событие доставляется не более одного раза. Если обработчик упал — повторной доставки нет:
// At-most-once: простейший подход
const atMostOnce = (event: TodoEvent) =>
dispatch(event).pipe(
Effect.catchAll(() => Effect.void) // ошибка? проигнорировали
)
At-Least-Once
Событие доставляется хотя бы один раз. При ошибке — retry:
import { Schedule } from "effect"
// At-least-once: retry при ошибке
const atLeastOnce = (event: TodoEvent) =>
dispatch(event).pipe(
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.compose(Schedule.recurs(3)) // максимум 3 попытки
)
)
)
При at-least-once обработчик может получить событие дважды, поэтому он должен быть идемпотентным:
// Идемпотентный обработчик
const idempotentHandler: EventHandler<TodoCompleted> = (event) =>
Effect.gen(function* () {
const processed = yield* ProcessedEventsStore
const alreadyDone = yield* processed.has(event.eventId)
if (alreadyDone) {
yield* Effect.logDebug(`Event ${event.eventId} already processed`)
return
}
yield* updateStats(event)
yield* processed.add(event.eventId)
})
Exactly-Once
Событие доставляется ровно один раз. Технически невозможно в распределённых системах, но можно приблизиться через at-least-once + идемпотентность.
Обработка ошибок в handlers
Изоляция ошибок
Ошибка одного обработчика не должна блокировать другие:
const dispatchWithIsolation = (event: TodoEvent) =>
Effect.gen(function* () {
const handlers = yield* getHandlersForEvent(event)
// Каждый handler изолирован — ошибка одного не влияет на других
const results = yield* Effect.forEach(
handlers,
(handler) =>
handler(event).pipe(
Effect.either // Either<void, EventHandlerError>
),
{ concurrency: "unbounded" }
)
// Логируем ошибки, но не прерываем процесс
const failures = results.filter((r) => r._tag === "Left")
if (failures.length > 0) {
yield* Effect.logWarning(
`${failures.length} handler(s) failed for event ${event._tag}`
)
}
})
Dead Letter Queue (DLQ)
Неудачно обработанные события можно сохранять в «очередь мёртвых писем» для позднейшего анализа:
class DeadLetterQueue extends Context.Tag("DeadLetterQueue")<
DeadLetterQueue,
{
readonly enqueue: (entry: {
readonly event: TodoEvent
readonly error: EventHandlerError
readonly attemptedAt: Date
readonly retryCount: number
}) => Effect.Effect<void>
}
>() {}
const dispatchWithDLQ = (event: TodoEvent, handler: EventHandler<TodoEvent>) =>
handler(event).pipe(
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.compose(Schedule.recurs(3))
)
),
Effect.catchAll((error) =>
Effect.gen(function* () {
const dlq = yield* DeadLetterQueue
yield* dlq.enqueue({
event,
error,
attemptedAt: new Date(),
retryCount: 3,
})
yield* Effect.logError(
`Event ${event.eventId} sent to DLQ after 3 retries`
)
})
)
)
Ordering и последовательность
Порядок внутри агрегата
События одного агрегата должны обрабатываться в порядке их aggregateVersion:
// Публикация списка событий должна сохранять порядок
const publishAll = (events: ReadonlyArray<TodoEvent>) =>
Effect.forEach(events, publish, {
concurrency: 1, // последовательно — сохраняем порядок
discard: true
})
Порядок между агрегатами
События разных агрегатов могут обрабатываться в любом порядке (параллельно):
// Группировка событий по агрегату
const groupByAggregate = (events: ReadonlyArray<TodoEvent>) =>
events.reduce<ReadonlyMap<string, ReadonlyArray<TodoEvent>>>(
(groups, event) => {
const existing = groups.get(event.aggregateId) ?? []
return new Map([...groups, [event.aggregateId, [...existing, event]]])
},
new Map()
)
// Параллельная обработка между агрегатами,
// последовательная — внутри одного агрегата
const publishWithOrdering = (events: ReadonlyArray<TodoEvent>) => {
const grouped = groupByAggregate(events)
return Effect.forEach(
[...grouped.values()],
(aggregateEvents) =>
Effect.forEach(aggregateEvents, publish, {
concurrency: 1, // последовательно внутри агрегата
discard: true
}),
{ concurrency: "unbounded", discard: true } // параллельно между агрегатами
)
}
Использование в Application Service
Полный пример: Application Service публикует события после бизнес-операции:
// ─── Application Service: CompleteTodo ──────────────────────
const completeTodoUseCase = (
todoId: TodoId,
userId: UserId,
): Effect.Effect<
void,
TodoNotFound | InvalidTransition,
TodoRepository | EventBus
> =>
Effect.gen(function* () {
const repo = yield* TodoRepository
const eventBus = yield* EventBus
const now = new Date()
const ctx: EventContext = {
correlationId: generateEventId(),
causationId: generateEventId(),
triggeredBy: userId,
}
// 1. Загрузить агрегат
const todo = yield* repo.findById(todoId)
const aggregate = createTodoAggregate(todo)
// 2. Выполнить бизнес-операцию
const updated = yield* complete(aggregate, userId, now, ctx)
// 3. Сохранить состояние
yield* repo.save(updated.state)
// 4. Опубликовать события
yield* eventBus.publishAll(updated.uncommittedEvents)
yield* Effect.logInfo(`Todo ${todoId} completed by ${userId}`)
})
Тестирование EventBus
Test EventBus — накопление событий для проверки
// ─── Тестовый EventBus: накапливает события ─────────────────
const TestEventBusLive = Layer.effect(
EventBus,
Effect.gen(function* () {
const eventsRef = yield* Ref.make<ReadonlyArray<TodoEvent>>([])
return {
publish: (event) =>
Ref.update(eventsRef, (events) => [...events, event]),
publishAll: (newEvents) =>
Ref.update(eventsRef, (events) => [...events, ...newEvents]),
subscribe: (_tag, _handler) => Effect.void,
subscribeAll: (_handler) => Effect.void,
// Дополнительные методы для тестов (не в порту)
// Доступны через cast: yield* (eventBus as TestEventBus)
}
})
)
// В тестах:
// const events = yield* Ref.get(eventsRef)
// expect(events).toHaveLength(1)
// expect(events[0]._tag).toBe("TodoCompleted")
Итоги
- EventBus — Driven Port — домен знает контракт, не реализацию
- Effect PubSub — встроенный примитив для in-process pub/sub
- Sync vs Async — синхронный для критичных обработчиков, асинхронный для некритичных
- Изоляция ошибок — ошибка одного handler не блокирует другие
- Идемпотентность — обработчики должны корректно обрабатывать повторные события
- Ordering — последовательность внутри агрегата, параллелизм между агрегатами
- DLQ — необработанные события сохраняются для анализа
- Test EventBus — накапливает события для assertion’ов в тестах
Далее: 06-todo-events.md — События Todo: Created, Completed, Archived, TitleChanged