Типобезопасный домен: Гексагональная архитектура на базе Effect Dispatching: PubSub через Effect
Глава

Dispatching: PubSub через Effect

Механизм публикации и подписки на доменные события. EventBus как Driven Port. Effect PubSub для in-process dispatching. Синхронная vs асинхронная доставка. Гарантии доставки: at-most-once, at-least-once, exactly-once. Ordering и partitioning. Обработчики событий: EventHandler. Декларативная регистрация подписчиков. Ошибки обработчиков. Dead Letter Queue. Полная реализация EventBus для Todo-домена.

Введение: проблема доставки событий

Агрегат породил событие TodoCompleted. Что дальше? Кто-то должен доставить это событие всем заинтересованным подписчикам: сервису статистики, системе уведомлений, проекции для дашборда.

Эта задача решается паттерном Publish-Subscribe (PubSub) — издатель публикует событие, не зная о подписчиках, а подписчики получают события, не зная об издателе.

В Hexagonal Architecture механизм доставки событий — это Driven Port. Домен знает контракт (EventBus), но не знает, доставляются ли события через in-memory PubSub, через Kafka, через Redis Streams или через другой транспорт.


EventBus как Driven Port

Определение порта

import { Context, Effect } from "effect"

// ─── Порт: EventBus ─────────────────────────────────────────
// Контракт доставки событий — часть доменного слоя
// Реализация (in-memory, Kafka, Redis) — в инфраструктуре

class EventBus extends Context.Tag("EventBus")<
  EventBus,
  {
    /** Опубликовать одно событие */
    readonly publish: <E extends TodoEvent>(
      event: E
    ) => Effect.Effect<void>
    
    /** Опубликовать несколько событий атомарно */
    readonly publishAll: (
      events: ReadonlyArray<TodoEvent>
    ) => Effect.Effect<void>
    
    /** Подписаться на события определённого типа */
    readonly subscribe: <Tag extends TodoEvent["_tag"]>(
      tag: Tag,
      handler: EventHandler<Extract<TodoEvent, { readonly _tag: Tag }>>
    ) => Effect.Effect<void>
    
    /** Подписаться на все события */
    readonly subscribeAll: (
      handler: EventHandler<TodoEvent>
    ) => Effect.Effect<void>
  }
>() {}

// ─── Тип обработчика событий ────────────────────────────────

type EventHandler<E extends TodoEvent> = (
  event: E
) => Effect.Effect<void, EventHandlerError>

// ─── Ошибка обработчика ─────────────────────────────────────

class EventHandlerError extends Schema.TaggedError<EventHandlerError>()(
  "EventHandlerError",
  {
    handlerName: Schema.String,
    eventTag: Schema.String,
    eventId: Schema.String,
    cause: Schema.Unknown,
  }
) {}

Почему EventBus — Driven Port

EventBus — Driven (Secondary) Port, потому что:

  • Домен использует его для вывода информации (публикации фактов)
  • Домен вызывает EventBus, но не определяет, кто подписан
  • Конкретная реализация — деталь инфраструктуры
             Hexagonal Architecture
           ┌─────────────────────────┐
           │    Application Core     │
           │                         │
 HTTP ────▶│  UseCase ──▶ Aggregate  │
 (Driving) │              │          │
           │              ▼          │
           │        EventBus Port ───│──▶ InMemory PubSub
           │        (Driven Port)    │    (Driven Adapter)
           └─────────────────────────┘

Effect PubSub: in-process реализация

Effect предоставляет PubSub — примитив для in-process publish-subscribe коммуникации:

Основы Effect PubSub

import { PubSub, Queue, Effect, Layer, Chunk } from "effect"

// Создание PubSub с буфером
const program = Effect.gen(function* () {
  // Создаём PubSub с bounded buffer (capacity = 100)
  const pubsub = yield* PubSub.bounded<TodoEvent>(100)
  
  // Публикация
  yield* PubSub.publish(pubsub, someEvent)
  
  // Подписка — создаёт Queue для получения событий
  const subscription = yield* PubSub.subscribe(pubsub)
  
  // Получение событий из очереди подписки
  const event = yield* Queue.take(subscription)
})

Типы PubSub

Effect предоставляет несколько стратегий буферизации:

// Bounded — блокирует публикацию, если буфер полон
const bounded = PubSub.bounded<TodoEvent>(100)

// Unbounded — неограниченный буфер (осторожно с памятью)
const unbounded = PubSub.unbounded<TodoEvent>()

// Dropping — отбрасывает новые события при полном буфере
const dropping = PubSub.dropping<TodoEvent>(100)

// Sliding — отбрасывает старые события при полном буфере
const sliding = PubSub.sliding<TodoEvent>(100)

Для доменных событий в типичном приложении:

  • bounded — для production (backpressure при перегрузке)
  • unbounded — для тестов и разработки
  • dropping — если потеря события допустима (метрики, логи)
  • sliding — если важнее свежие события, чем старые

Реализация EventBus адаптера

InMemory EventBus

import { PubSub, Queue, Effect, Layer, Ref, HashMap, Fiber, Chunk } from "effect"

// ─── InMemory реализация EventBus ───────────────────────────

const InMemoryEventBusLive = Layer.effect(
  EventBus,
  Effect.gen(function* () {
    // Основной PubSub канал
    const pubsub = yield* PubSub.bounded<TodoEvent>(1000)
    
    // Хранилище обработчиков для каждого типа события
    const handlersRef = yield* Ref.make<
      HashMap.HashMap<string, ReadonlyArray<EventHandler<any>>>
    >(HashMap.empty())
    
    // Хранилище обработчиков для всех событий
    const globalHandlersRef = yield* Ref.make<
      ReadonlyArray<EventHandler<TodoEvent>>
    >([])
    
    // Запускаем фоновый процесс обработки событий
    yield* Effect.fork(
      Effect.gen(function* () {
        const subscription = yield* PubSub.subscribe(pubsub)
        
        // Бесконечный цикл обработки
        yield* Effect.forever(
          Effect.gen(function* () {
            const event = yield* Queue.take(subscription)
            
            // Получаем обработчики для конкретного типа события
            const handlers = yield* Ref.get(handlersRef)
            const specificHandlers = HashMap.get(handlers, event._tag).pipe(
              // Option → Array
              (opt) => opt._tag === "Some" ? opt.value : []
            )
            
            // Получаем глобальные обработчики
            const globalH = yield* Ref.get(globalHandlersRef)
            
            // Вызываем все обработчики
            const allHandlers = [...specificHandlers, ...globalH]
            
            yield* Effect.forEach(
              allHandlers,
              (handler) =>
                handler(event).pipe(
                  Effect.catchAll((error) =>
                    Effect.logError(
                      `Event handler failed for ${event._tag}: ${error}`
                    )
                  )
                ),
              { concurrency: "unbounded", discard: true }
            )
          })
        )
      })
    )
    
    return {
      publish: (event) =>
        PubSub.publish(pubsub, event).pipe(
          Effect.asVoid,
          Effect.tap(() =>
            Effect.logDebug(`Event published: ${event._tag} (${event.eventId})`)
          )
        ),
      
      publishAll: (events) =>
        PubSub.publishAll(pubsub, events).pipe(
          Effect.asVoid,
          Effect.tap(() =>
            Effect.logDebug(`Published ${events.length} events`)
          )
        ),
      
      subscribe: (tag, handler) =>
        Ref.update(handlersRef, (map) => {
          const existing = HashMap.get(map, tag).pipe(
            (opt) => opt._tag === "Some" ? opt.value : []
          )
          return HashMap.set(map, tag, [...existing, handler as EventHandler<any>])
        }),
      
      subscribeAll: (handler) =>
        Ref.update(globalHandlersRef, (handlers) => [...handlers, handler]),
    }
  })
)

Упрощённый подход: синхронный dispatch

Для начала можно использовать более простой подход без фонового процесса — синхронный dispatch при публикации:

// ─── Упрощённый синхронный EventBus ─────────────────────────

const SimpleEventBusLive = Layer.effect(
  EventBus,
  Effect.gen(function* () {
    // Хранилище обработчиков
    const handlersRef = yield* Ref.make<
      HashMap.HashMap<string, ReadonlyArray<EventHandler<any>>>
    >(HashMap.empty())
    
    const globalHandlersRef = yield* Ref.make<
      ReadonlyArray<EventHandler<TodoEvent>>
    >([])
    
    // Функция dispatch — вызывает всех подписчиков
    const dispatch = (event: TodoEvent): Effect.Effect<void> =>
      Effect.gen(function* () {
        const handlers = yield* Ref.get(handlersRef)
        const specific = HashMap.get(handlers, event._tag).pipe(
          (opt) => opt._tag === "Some" ? opt.value : []
        )
        const global = yield* Ref.get(globalHandlersRef)
        
        // Вызываем обработчики параллельно с изоляцией ошибок
        yield* Effect.forEach(
          [...specific, ...global],
          (handler) =>
            handler(event).pipe(
              Effect.catchAll((error) =>
                Effect.logError(`Handler error: ${JSON.stringify(error)}`)
              )
            ),
          { concurrency: "unbounded", discard: true }
        )
      })
    
    return {
      publish: (event) =>
        dispatch(event).pipe(
          Effect.tap(() =>
            Effect.logDebug(`Event dispatched: ${event._tag}`)
          )
        ),
      
      publishAll: (events) =>
        Effect.forEach(events, dispatch, { discard: true }),
      
      subscribe: (tag, handler) =>
        Ref.update(handlersRef, (map) => {
          const existing = HashMap.get(map, tag).pipe(
            (opt) => opt._tag === "Some" ? opt.value : []
          )
          return HashMap.set(map, tag, [...existing, handler as any])
        }),
      
      subscribeAll: (handler) =>
        Ref.update(globalHandlersRef, (handlers) => [...handlers, handler]),
    }
  })
)

Обработчики событий (Event Handlers)

Определение обработчика

Обработчик — это функция, которая реагирует на конкретный тип события:

// ─── Обработчик: обновление статистики ──────────────────────

const statsHandler: EventHandler<TodoCompleted> = (event) =>
  Effect.gen(function* () {
    const stats = yield* StatsService
    yield* stats.incrementCompleted()
    yield* Effect.logInfo(
      `Stats updated: todo ${event.todoId} completed`
    )
  }).pipe(
    Effect.mapError((cause) =>
      new EventHandlerError({
        handlerName: "StatsHandler",
        eventTag: event._tag,
        eventId: event.eventId,
        cause,
      })
    )
  )

// ─── Обработчик: отправка уведомления ───────────────────────

const notificationHandler: EventHandler<TodoCompleted> = (event) =>
  Effect.gen(function* () {
    const notifications = yield* NotificationService
    yield* notifications.send(
      event.triggeredBy,
      `Task "${event.title}" has been completed!`
    )
  }).pipe(
    Effect.mapError((cause) =>
      new EventHandlerError({
        handlerName: "NotificationHandler",
        eventTag: event._tag,
        eventId: event.eventId,
        cause,
      })
    )
  )

// ─── Обработчик: аудит ─────────────────────────────────────

const auditHandler: EventHandler<TodoEvent> = (event) =>
  Effect.gen(function* () {
    const audit = yield* AuditService
    yield* audit.log({
      action: event._tag,
      aggregateId: event.aggregateId,
      triggeredBy: event.triggeredBy,
      occurredAt: event.occurredAt,
      correlationId: event.correlationId,
    })
  }).pipe(
    Effect.mapError((cause) =>
      new EventHandlerError({
        handlerName: "AuditHandler",
        eventTag: event._tag,
        eventId: event.eventId,
        cause,
      })
    )
  )

Регистрация обработчиков

// ─── Регистрация всех обработчиков при старте ───────────────

const registerEventHandlers = Effect.gen(function* () {
  const eventBus = yield* EventBus
  
  // Конкретные подписки
  yield* eventBus.subscribe("TodoCompleted", statsHandler)
  yield* eventBus.subscribe("TodoCompleted", notificationHandler)
  yield* eventBus.subscribe("TodoCreated", (event) =>
    Effect.gen(function* () {
      const stats = yield* StatsService
      yield* stats.incrementTotal()
    }).pipe(Effect.mapError((cause) =>
      new EventHandlerError({
        handlerName: "CreateStatsHandler",
        eventTag: event._tag,
        eventId: event.eventId,
        cause,
      })
    ))
  )
  
  // Глобальная подписка (все события → аудит)
  yield* eventBus.subscribeAll(auditHandler)
  
  yield* Effect.logInfo("All event handlers registered")
})

Синхронная vs асинхронная доставка

Синхронный dispatch (in-process)

Обработчики вызываются в том же процессе, немедленно после публикации:

// Синхронный: publish блокируется до завершения всех обработчиков
//
// completeTodo → repo.save → eventBus.publish → [handlers run] → return
//                                                    ↑ блокирующий вызов

const publish = (event: TodoEvent) =>
  dispatch(event)  // ждём завершения всех handlers

Плюсы: простота, консистентность (если handler упал — вы узнаете сразу). Минусы: медленнее (ждём обработчиков), один медленный handler тормозит всё.

Асинхронный dispatch (fire-and-forget)

Обработчики вызываются в фоновых Fiber’ах:

// Асинхронный: publish возвращается немедленно
//
// completeTodo → repo.save → eventBus.publish → return
//                                ↓ (fork)
//                            [handlers run in background]

const publishAsync = (event: TodoEvent) =>
  Effect.fork(dispatch(event)).pipe(Effect.asVoid)
  // publish возвращается сразу, handlers работают в фоне

Плюсы: быстрый ответ пользователю, медленные handlers не блокируют. Минусы: ошибки handlers теряются (нужен мониторинг), eventual consistency.

Гибридный подход

// Критичные обработчики — синхронно, некритичные — асинхронно
const publishHybrid = (event: TodoEvent) =>
  Effect.gen(function* () {
    // Синхронно: обновление Read Model (критично)
    yield* readModelHandler(event)
    
    // Асинхронно: уведомления, логи (некритично)
    yield* Effect.fork(notificationHandler(event))
    yield* Effect.fork(auditHandler(event))
  })

Гарантии доставки

At-Most-Once

Событие доставляется не более одного раза. Если обработчик упал — повторной доставки нет:

// At-most-once: простейший подход
const atMostOnce = (event: TodoEvent) =>
  dispatch(event).pipe(
    Effect.catchAll(() => Effect.void)  // ошибка? проигнорировали
  )

At-Least-Once

Событие доставляется хотя бы один раз. При ошибке — retry:

import { Schedule } from "effect"

// At-least-once: retry при ошибке
const atLeastOnce = (event: TodoEvent) =>
  dispatch(event).pipe(
    Effect.retry(
      Schedule.exponential("100 millis").pipe(
        Schedule.compose(Schedule.recurs(3))  // максимум 3 попытки
      )
    )
  )

При at-least-once обработчик может получить событие дважды, поэтому он должен быть идемпотентным:

// Идемпотентный обработчик
const idempotentHandler: EventHandler<TodoCompleted> = (event) =>
  Effect.gen(function* () {
    const processed = yield* ProcessedEventsStore
    const alreadyDone = yield* processed.has(event.eventId)
    
    if (alreadyDone) {
      yield* Effect.logDebug(`Event ${event.eventId} already processed`)
      return
    }
    
    yield* updateStats(event)
    yield* processed.add(event.eventId)
  })

Exactly-Once

Событие доставляется ровно один раз. Технически невозможно в распределённых системах, но можно приблизиться через at-least-once + идемпотентность.


Обработка ошибок в handlers

Изоляция ошибок

Ошибка одного обработчика не должна блокировать другие:

const dispatchWithIsolation = (event: TodoEvent) =>
  Effect.gen(function* () {
    const handlers = yield* getHandlersForEvent(event)
    
    // Каждый handler изолирован — ошибка одного не влияет на других
    const results = yield* Effect.forEach(
      handlers,
      (handler) =>
        handler(event).pipe(
          Effect.either  // Either<void, EventHandlerError>
        ),
      { concurrency: "unbounded" }
    )
    
    // Логируем ошибки, но не прерываем процесс
    const failures = results.filter((r) => r._tag === "Left")
    if (failures.length > 0) {
      yield* Effect.logWarning(
        `${failures.length} handler(s) failed for event ${event._tag}`
      )
    }
  })

Dead Letter Queue (DLQ)

Неудачно обработанные события можно сохранять в «очередь мёртвых писем» для позднейшего анализа:

class DeadLetterQueue extends Context.Tag("DeadLetterQueue")<
  DeadLetterQueue,
  {
    readonly enqueue: (entry: {
      readonly event: TodoEvent
      readonly error: EventHandlerError
      readonly attemptedAt: Date
      readonly retryCount: number
    }) => Effect.Effect<void>
  }
>() {}

const dispatchWithDLQ = (event: TodoEvent, handler: EventHandler<TodoEvent>) =>
  handler(event).pipe(
    Effect.retry(
      Schedule.exponential("100 millis").pipe(
        Schedule.compose(Schedule.recurs(3))
      )
    ),
    Effect.catchAll((error) =>
      Effect.gen(function* () {
        const dlq = yield* DeadLetterQueue
        yield* dlq.enqueue({
          event,
          error,
          attemptedAt: new Date(),
          retryCount: 3,
        })
        yield* Effect.logError(
          `Event ${event.eventId} sent to DLQ after 3 retries`
        )
      })
    )
  )

Ordering и последовательность

Порядок внутри агрегата

События одного агрегата должны обрабатываться в порядке их aggregateVersion:

// Публикация списка событий должна сохранять порядок
const publishAll = (events: ReadonlyArray<TodoEvent>) =>
  Effect.forEach(events, publish, { 
    concurrency: 1,  // последовательно — сохраняем порядок
    discard: true 
  })

Порядок между агрегатами

События разных агрегатов могут обрабатываться в любом порядке (параллельно):

// Группировка событий по агрегату
const groupByAggregate = (events: ReadonlyArray<TodoEvent>) =>
  events.reduce<ReadonlyMap<string, ReadonlyArray<TodoEvent>>>(
    (groups, event) => {
      const existing = groups.get(event.aggregateId) ?? []
      return new Map([...groups, [event.aggregateId, [...existing, event]]])
    },
    new Map()
  )

// Параллельная обработка между агрегатами,
// последовательная — внутри одного агрегата
const publishWithOrdering = (events: ReadonlyArray<TodoEvent>) => {
  const grouped = groupByAggregate(events)
  return Effect.forEach(
    [...grouped.values()],
    (aggregateEvents) =>
      Effect.forEach(aggregateEvents, publish, { 
        concurrency: 1,   // последовательно внутри агрегата
        discard: true 
      }),
    { concurrency: "unbounded", discard: true }  // параллельно между агрегатами
  )
}

Использование в Application Service

Полный пример: Application Service публикует события после бизнес-операции:

// ─── Application Service: CompleteTodo ──────────────────────

const completeTodoUseCase = (
  todoId: TodoId,
  userId: UserId,
): Effect.Effect<
  void,
  TodoNotFound | InvalidTransition,
  TodoRepository | EventBus
> =>
  Effect.gen(function* () {
    const repo = yield* TodoRepository
    const eventBus = yield* EventBus
    const now = new Date()
    const ctx: EventContext = {
      correlationId: generateEventId(),
      causationId: generateEventId(),
      triggeredBy: userId,
    }
    
    // 1. Загрузить агрегат
    const todo = yield* repo.findById(todoId)
    const aggregate = createTodoAggregate(todo)
    
    // 2. Выполнить бизнес-операцию
    const updated = yield* complete(aggregate, userId, now, ctx)
    
    // 3. Сохранить состояние
    yield* repo.save(updated.state)
    
    // 4. Опубликовать события
    yield* eventBus.publishAll(updated.uncommittedEvents)
    
    yield* Effect.logInfo(`Todo ${todoId} completed by ${userId}`)
  })

Тестирование EventBus

Test EventBus — накопление событий для проверки

// ─── Тестовый EventBus: накапливает события ─────────────────

const TestEventBusLive = Layer.effect(
  EventBus,
  Effect.gen(function* () {
    const eventsRef = yield* Ref.make<ReadonlyArray<TodoEvent>>([])
    
    return {
      publish: (event) =>
        Ref.update(eventsRef, (events) => [...events, event]),
      
      publishAll: (newEvents) =>
        Ref.update(eventsRef, (events) => [...events, ...newEvents]),
      
      subscribe: (_tag, _handler) => Effect.void,
      subscribeAll: (_handler) => Effect.void,
      
      // Дополнительные методы для тестов (не в порту)
      // Доступны через cast: yield* (eventBus as TestEventBus)
    }
  })
)

// В тестах:
// const events = yield* Ref.get(eventsRef)
// expect(events).toHaveLength(1)
// expect(events[0]._tag).toBe("TodoCompleted")

Итоги

  1. EventBus — Driven Port — домен знает контракт, не реализацию
  2. Effect PubSub — встроенный примитив для in-process pub/sub
  3. Sync vs Async — синхронный для критичных обработчиков, асинхронный для некритичных
  4. Изоляция ошибок — ошибка одного handler не блокирует другие
  5. Идемпотентность — обработчики должны корректно обрабатывать повторные события
  6. Ordering — последовательность внутри агрегата, параллелизм между агрегатами
  7. DLQ — необработанные события сохраняются для анализа
  8. Test EventBus — накапливает события для assertion’ов в тестах

Далее: 06-todo-events.md — События Todo: Created, Completed, Archived, TitleChanged