Типобезопасный домен: Гексагональная архитектура на базе Effect Агрегат → Event: события как результат поведения
Глава

Агрегат → Event: события как результат поведения

Паттерн порождения событий из агрегата. Два подхода: State-first и Event-first. Функция apply: (State, Event) → State. Иммутабельное обновление агрегата через события. Возврат кортежа (State, Events). Накопление событий в агрегате. Связь с Event Sourcing. Подготовка к Части IX. Практические примеры с Todo-агрегатом.

Введение: откуда берутся события

В предыдущих статьях мы определили, что такое Domain Event и как его спроектировать. Но кто именно создаёт события? Ответ: агрегат порождает события в результате выполнения бизнес-операций.

Когда команда CompleteTodo поступает в агрегат Todo, агрегат:

  1. Проверяет инварианты (задача существует, не завершена, не архивирована)
  2. Применяет бизнес-правила
  3. Порождает событие TodoCompleted как свидетельство того, что произошло

Событие — это не побочный эффект. Это результат бизнес-операции, выраженный в форме факта.


Два подхода: State-first и Event-first

State-first: сначала состояние, потом событие

В подходе State-first агрегат сначала вычисляет новое состояние, а потом создаёт событие как уведомление о произошедшем изменении:

import { Effect } from "effect"

// State-first: агрегат обновляет состояние, потом порождает событие

interface Todo {
  readonly id: TodoId
  readonly title: TodoTitle
  readonly status: "active" | "completed" | "archived"
  readonly priority: Priority
  readonly version: number
  readonly completedAt: Date | null
}

interface AggregateResult<S> {
  readonly state: S
  readonly events: ReadonlyArray<TodoEvent>
}

// ─── State-first подход ─────────────────────────────────────

const completeTodoStateFirst = (
  todo: Todo,
  completedBy: UserId,
  now: Date,
  ctx: EventContext,
): Effect.Effect<AggregateResult<Todo>, InvalidTransition> => {
  // 1. Проверяем инварианты
  if (todo.status !== "active") {
    return Effect.fail(
      new InvalidTransition({
        from: todo.status,
        to: "completed",
        reason: `Cannot complete todo in status "${todo.status}"`,
      })
    )
  }
  
  // 2. Вычисляем новое состояние
  const updatedTodo: Todo = {
    ...todo,
    status: "completed" as const,
    completedAt: now,
    version: todo.version + 1,
  }
  
  // 3. Порождаем событие как уведомление
  const event = new TodoCompleted({
    eventId: generateEventId(),
    aggregateId: todo.id,
    aggregateVersion: updatedTodo.version,
    occurredAt: now,
    schemaVersion: 1,
    correlationId: ctx.correlationId,
    causationId: ctx.causationId,
    triggeredBy: completedBy,
    todoId: todo.id,
    title: todo.title,
    completedAt: now,
    completedBy,
    wasOverdue: todo.dueDate !== undefined && todo.dueDate < now,
  })
  
  return Effect.succeed({ state: updatedTodo, events: [event] })
}

Характеристики State-first:

  • Состояние вычисляется непосредственно из текущего состояния
  • Событие создаётся как побочный продукт
  • Проще для понимания
  • Подходит для CRUD-систем с добавлением событий

Event-first: сначала событие, потом состояние

В подходе Event-first агрегат сначала решает, какое событие должно произойти, а потом вычисляет новое состояние путём применения этого события к текущему состоянию:

// Event-first: агрегат порождает событие, потом применяет к состоянию

// ─── Функция apply: (State, Event) → State ──────────────────

const applyEvent = (state: Todo, event: TodoEvent): Todo => {
  switch (event._tag) {
    case "TodoCreated":
      return {
        id: event.todoId,
        title: event.title,
        status: "active" as const,
        priority: event.priority,
        version: event.aggregateVersion,
        completedAt: null,
      }
    
    case "TodoCompleted":
      return {
        ...state,
        status: "completed" as const,
        completedAt: event.completedAt,
        version: event.aggregateVersion,
      }
    
    case "TodoTitleChanged":
      return {
        ...state,
        title: event.newTitle,
        version: event.aggregateVersion,
      }
    
    case "TodoPriorityChanged":
      return {
        ...state,
        priority: event.newPriority,
        version: event.aggregateVersion,
      }
    
    case "TodoArchived":
      return {
        ...state,
        status: "archived" as const,
        version: event.aggregateVersion,
      }
      
    case "TodoReopened":
      return {
        ...state,
        status: "active" as const,
        completedAt: null,
        version: event.aggregateVersion,
      }
    
    case "TodoDueDateSet":
      return {
        ...state,
        version: event.aggregateVersion,
      }
  }
}

// ─── Event-first подход ─────────────────────────────────────

const completeTodoEventFirst = (
  todo: Todo,
  completedBy: UserId,
  now: Date,
  ctx: EventContext,
): Effect.Effect<AggregateResult<Todo>, InvalidTransition> => {
  // 1. Проверяем инварианты
  if (todo.status !== "active") {
    return Effect.fail(
      new InvalidTransition({
        from: todo.status,
        to: "completed",
        reason: `Cannot complete todo in status "${todo.status}"`,
      })
    )
  }
  
  // 2. Решаем, какое событие должно произойти
  const event = new TodoCompleted({
    eventId: generateEventId(),
    aggregateId: todo.id,
    aggregateVersion: todo.version + 1,
    occurredAt: now,
    schemaVersion: 1,
    correlationId: ctx.correlationId,
    causationId: ctx.causationId,
    triggeredBy: completedBy,
    todoId: todo.id,
    title: todo.title,
    completedAt: now,
    completedBy,
    wasOverdue: todo.dueDate !== undefined && todo.dueDate < now,
  })
  
  // 3. Вычисляем состояние, ПРИМЕНЯЯ событие
  const updatedTodo = applyEvent(todo, event)
  
  return Effect.succeed({ state: updatedTodo, events: [event] })
}

Характеристики Event-first:

  • Событие — первичный артефакт, состояние — производная
  • Функция applyEvent — единственное место, где меняется состояние
  • Обязательна для Event Sourcing
  • Гарантирует консистентность: одно и то же событие всегда даёт одно и то же состояние

Сравнение подходов

АспектState-firstEvent-first
ПорядокState → EventEvent → State
СложностьПрощеСложнее
Event SourcingНе подходитИдеально подходит
КонсистентностьМожет расходитьсяГарантирована
ТестируемостьОбычнаяОтличная (applyEvent — чистая функция)
РекомендацияCRUD + EventsDDD + Event Sourcing

Для нашего курса мы используем Event-first подход, поскольку он:

  • Гарантирует, что состояние и события всегда согласованы
  • Подготавливает к Event Sourcing (Часть IX)
  • Делает функцию applyEvent тестируемой отдельно от бизнес-логики

Функция apply: сердце Event-first подхода

Функция apply (или evolve, fold) — это чистая функция, которая вычисляет новое состояние из текущего состояния и события:

apply: (State, Event) → State

Свойства:

  1. Чистая функция — нет побочных эффектов, только вычисление
  2. Детерминированная — одинаковые входы всегда дают одинаковый выход
  3. Тотальная — определена для всех комбинаций State × Event
  4. Единственное место изменения состояния — больше нигде состояние не меняется
// apply — ЧИСТАЯ функция, без Effect, без побочных эффектов
// Именно поэтому она не возвращает Effect, а возвращает обычное значение

const applyTodoEvent = (state: Todo, event: TodoEvent): Todo => {
  switch (event._tag) {
    case "TodoCreated":
      return {
        id: event.todoId,
        title: event.title,
        status: "active",
        priority: event.priority,
        version: event.aggregateVersion,
        completedAt: null,
      }
    
    case "TodoCompleted":
      return {
        ...state,
        status: "completed",
        completedAt: event.completedAt,
        version: event.aggregateVersion,
      }
    
    case "TodoTitleChanged":
      return {
        ...state,
        title: event.newTitle,
        version: event.aggregateVersion,
      }
    
    case "TodoPriorityChanged":
      return {
        ...state,
        priority: event.newPriority,
        version: event.aggregateVersion,
      }
    
    case "TodoDueDateSet":
      return {
        ...state,
        // dueDate обновляется, если есть в state
        version: event.aggregateVersion,
      }
    
    case "TodoArchived":
      return {
        ...state,
        status: "archived",
        version: event.aggregateVersion,
      }
    
    case "TodoReopened":
      return {
        ...state,
        status: "active",
        completedAt: null,
        version: event.aggregateVersion,
      }
  }
}

Восстановление состояния из событий (Preview)

Если у вас есть полная цепочка событий, состояние можно восстановить с нуля:

// Восстановление состояния из цепочки событий
// Это ОСНОВА Event Sourcing (подробно в Модулях 38–42)

const rehydrate = (events: ReadonlyArray<TodoEvent>): Todo | null => {
  if (events.length === 0) return null
  
  // Array.reduce с apply — fold по цепочке событий
  return events.reduce<Todo | null>(
    (state, event) => {
      if (state === null && event._tag !== "TodoCreated") {
        throw new Error("First event must be TodoCreated")
      }
      
      // Для первого события (TodoCreated) state может быть null
      // applyTodoEvent обрабатывает TodoCreated без зависимости от state
      return applyTodoEvent(
        state ?? ({} as Todo),  // начальное состояние для TodoCreated
        event
      )
    },
    null,
  )
}

// Пример использования:
const events: ReadonlyArray<TodoEvent> = [
  new TodoCreated({ /* ... */ todoId: "1", title: "Write article", priority: "medium" }),
  new TodoTitleChanged({ /* ... */ oldTitle: "Write article", newTitle: "Write DDD article" }),
  new TodoPriorityChanged({ /* ... */ oldPriority: "medium", newPriority: "high" }),
  new TodoCompleted({ /* ... */ completedAt: new Date() }),
]

const currentState = rehydrate(events)
// currentState = {
//   id: "1",
//   title: "Write DDD article",   // ← изменён TodoTitleChanged
//   status: "completed",           // ← изменён TodoCompleted
//   priority: "high",              // ← изменён TodoPriorityChanged
//   completedAt: Date,
//   version: 4,
// }

Паттерн: агрегат накапливает события

В продвинутых реализациях агрегат хранит список непубликованных событий:

// ─── Агрегат с накоплением событий ──────────────────────────

interface TodoAggregate {
  readonly state: Todo
  readonly uncommittedEvents: ReadonlyArray<TodoEvent>
}

const createTodoAggregate = (state: Todo): TodoAggregate => ({
  state,
  uncommittedEvents: [],
})

// Функция, которая порождает событие и применяет его к агрегату
const emitEvent = (
  aggregate: TodoAggregate,
  event: TodoEvent,
): TodoAggregate => ({
  state: applyTodoEvent(aggregate.state, event),
  uncommittedEvents: [...aggregate.uncommittedEvents, event],
})

// ─── Операции над агрегатом ─────────────────────────────────

const complete = (
  aggregate: TodoAggregate,
  completedBy: UserId,
  now: Date,
  ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
  if (aggregate.state.status !== "active") {
    return Effect.fail(new InvalidTransition({
      from: aggregate.state.status,
      to: "completed",
      reason: "Only active todos can be completed",
    }))
  }
  
  const event = new TodoCompleted({
    eventId: generateEventId(),
    aggregateId: aggregate.state.id,
    aggregateVersion: aggregate.state.version + 1,
    occurredAt: now,
    schemaVersion: 1,
    ...ctx,
    todoId: aggregate.state.id,
    title: aggregate.state.title,
    completedAt: now,
    completedBy,
    wasOverdue: false, // вычислить из dueDate
  })
  
  return Effect.succeed(emitEvent(aggregate, event))
}

const changeTitle = (
  aggregate: TodoAggregate,
  newTitle: TodoTitle,
  changedBy: UserId,
  now: Date,
  ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
  if (aggregate.state.status === "archived") {
    return Effect.fail(new InvalidTransition({
      from: "archived",
      to: "archived",
      reason: "Cannot change title of archived todo",
    }))
  }
  
  if (aggregate.state.title === newTitle) {
    // Нет изменения — нет события
    return Effect.succeed(aggregate)
  }
  
  const event = new TodoTitleChanged({
    eventId: generateEventId(),
    aggregateId: aggregate.state.id,
    aggregateVersion: aggregate.state.version + aggregate.uncommittedEvents.length + 1,
    occurredAt: now,
    schemaVersion: 1,
    ...ctx,
    todoId: aggregate.state.id,
    oldTitle: aggregate.state.title,
    newTitle,
    changedBy,
  })
  
  return Effect.succeed(emitEvent(aggregate, event))
}

Сохранение и очистка событий

// После сохранения в репозиторий — очищаем uncommitted events
const markEventsAsCommitted = (aggregate: TodoAggregate): TodoAggregate => ({
  state: aggregate.state,
  uncommittedEvents: [],  // очищаем после persist
})

// ─── Использование в Application Service ────────────────────

const completeTodoUseCase = (
  todoId: TodoId,
  userId: UserId,
) =>
  Effect.gen(function* () {
    const repo = yield* TodoRepository
    const eventBus = yield* EventBus
    const now = new Date()
    const ctx = yield* createEventContext()
    
    // 1. Загрузить агрегат
    const todo = yield* repo.findById(todoId)
    const aggregate = createTodoAggregate(todo)
    
    // 2. Выполнить бизнес-операцию (порождает события)
    const updated = yield* complete(aggregate, userId, now, ctx)
    
    // 3. Сохранить новое состояние
    yield* repo.save(updated.state)
    
    // 4. Опубликовать накопленные события
    yield* eventBus.publishAll(updated.uncommittedEvents)
    
    // 5. Очистить uncommitted events
    return markEventsAsCommitted(updated)
  })

Множественные события за одну операцию

Иногда одна бизнес-операция порождает несколько событий:

// Пример: создание Todo с высоким приоритетом и дедлайном
const createTodoWithDeadline = (
  params: {
    readonly todoId: TodoId
    readonly title: TodoTitle
    readonly priority: Priority
    readonly dueDate: Date
    readonly createdBy: UserId
  },
  now: Date,
  ctx: EventContext,
): TodoAggregate => {
  let aggregate: TodoAggregate = {
    state: {} as Todo,
    uncommittedEvents: [],
  }
  
  // Событие 1: TodoCreated
  const createdEvent = new TodoCreated({
    eventId: generateEventId(),
    aggregateId: params.todoId,
    aggregateVersion: 1,
    occurredAt: now,
    schemaVersion: 1,
    ...ctx,
    todoId: params.todoId,
    title: params.title,
    priority: params.priority,
    createdBy: params.createdBy,
  })
  aggregate = emitEvent(aggregate, createdEvent)
  
  // Событие 2: TodoDueDateSet
  const dueDateEvent = new TodoDueDateSet({
    eventId: generateEventId(),
    aggregateId: params.todoId,
    aggregateVersion: 2,
    occurredAt: now,
    schemaVersion: 1,
    ...ctx,
    todoId: params.todoId,
    dueDate: params.dueDate,
    setBy: params.createdBy,
  })
  aggregate = emitEvent(aggregate, dueDateEvent)
  
  return aggregate
  // aggregate.uncommittedEvents = [TodoCreated, TodoDueDateSet]
}

Важное правило: нет изменения — нет события

Если операция не приводит к изменению состояния, событие не порождается:

const changeTitle = (
  aggregate: TodoAggregate,
  newTitle: TodoTitle,
  changedBy: UserId,
  now: Date,
  ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
  // Если заголовок не изменился — событие НЕ порождается
  if (aggregate.state.title === newTitle) {
    return Effect.succeed(aggregate)  // без изменений
  }
  
  // Заголовок изменился — порождаем событие
  const event = new TodoTitleChanged({ /* ... */ })
  return Effect.succeed(emitEvent(aggregate, event))
}

Это важно, потому что:

  • Подписчики не должны получать «пустые» уведомления
  • Event Store не должен засоряться бессмысленными записями
  • Каждое событие должно отражать реальное изменение

Тестирование Event-first подхода

Функция applyEvent — чистая, и тестировать её тривиально:

import { describe, it, expect } from "bun:test"

describe("applyTodoEvent", () => {
  it("TodoCreated creates initial state", () => {
    const event = new TodoCreated({
      eventId: "evt-1" as EventId,
      aggregateId: "todo-1",
      aggregateVersion: 1,
      occurredAt: new Date("2025-01-15"),
      schemaVersion: 1,
      correlationId: "corr-1",
      causationId: "cmd-1",
      triggeredBy: "user-1",
      todoId: "todo-1" as TodoId,
      title: "Test" as TodoTitle,
      priority: "medium",
      createdBy: "user-1" as UserId,
    })
    
    const state = applyTodoEvent({} as Todo, event)
    
    expect(state.id).toBe("todo-1")
    expect(state.title).toBe("Test")
    expect(state.status).toBe("active")
    expect(state.priority).toBe("medium")
    expect(state.version).toBe(1)
  })
  
  it("TodoCompleted changes status", () => {
    const state: Todo = {
      id: "todo-1" as TodoId,
      title: "Test" as TodoTitle,
      status: "active",
      priority: "medium",
      version: 1,
      completedAt: null,
    }
    
    const event = new TodoCompleted({
      // ...base fields...
      aggregateVersion: 2,
      todoId: "todo-1" as TodoId,
      title: "Test" as TodoTitle,
      completedAt: new Date("2025-01-16"),
      completedBy: "user-1" as UserId,
      wasOverdue: false,
    })
    
    const newState = applyTodoEvent(state, event)
    
    expect(newState.status).toBe("completed")
    expect(newState.completedAt).toEqual(new Date("2025-01-16"))
    expect(newState.version).toBe(2)
  })
  
  it("rehydrate produces correct state from event chain", () => {
    const events = [
      new TodoCreated({ /* ... */ priority: "low" }),
      new TodoPriorityChanged({ /* ... */ newPriority: "high" }),
      new TodoCompleted({ /* ... */ }),
    ]
    
    const state = rehydrate(events)
    
    expect(state?.status).toBe("completed")
    expect(state?.priority).toBe("high")
  })
})

Связь с Event Sourcing

Паттерн, который мы описали в этой статье, — это фундамент Event Sourcing. В классической CRUD-системе:

Запись:  Command → State (сохраняется в БД)
                   Event (публикуется как уведомление)

Чтение: SELECT * FROM todos WHERE id = ?

В Event Sourcing:

Запись:  Command → Event (сохраняется в Event Store)
                   State (вычисляется из событий, может кешироваться)

Чтение: SELECT events FROM event_store WHERE aggregate_id = ?
        → reduce(events, initialState, applyEvent)

Подробно Event Sourcing рассматривается в Части IX (Модули 38–42). Сейчас важно понять: мы уже готовы к нему, потому что:

  1. У нас есть типизированные события (Schema.TaggedClass)
  2. У нас есть функция applyEvent (чистая, тестируемая)
  3. Агрегат порождает события как результат поведения

Итоги

  1. Агрегат порождает события — они не создаются снаружи
  2. Event-first подход — сначала событие, потом состояние через applyEvent
  3. applyEvent — чистая функция(State, Event) → State, без побочных эффектов
  4. Uncommitted events — агрегат накапливает события до момента persist
  5. Нет изменения → нет события — только реальные изменения порождают факты
  6. Множественные события — одна операция может породить несколько событий
  7. Rehydration — состояние можно восстановить из цепочки событий (preview Event Sourcing)

Далее: 05-event-dispatching.md — Dispatching: PubSub через Effect