Агрегат → Event: события как результат поведения
Паттерн порождения событий из агрегата. Два подхода: State-first и Event-first. Функция apply: (State, Event) → State. Иммутабельное обновление агрегата через события. Возврат кортежа (State, Events). Накопление событий в агрегате. Связь с Event Sourcing. Подготовка к Части IX. Практические примеры с Todo-агрегатом.
Введение: откуда берутся события
В предыдущих статьях мы определили, что такое Domain Event и как его спроектировать. Но кто именно создаёт события? Ответ: агрегат порождает события в результате выполнения бизнес-операций.
Когда команда CompleteTodo поступает в агрегат Todo, агрегат:
- Проверяет инварианты (задача существует, не завершена, не архивирована)
- Применяет бизнес-правила
- Порождает событие
TodoCompletedкак свидетельство того, что произошло
Событие — это не побочный эффект. Это результат бизнес-операции, выраженный в форме факта.
Два подхода: State-first и Event-first
State-first: сначала состояние, потом событие
В подходе State-first агрегат сначала вычисляет новое состояние, а потом создаёт событие как уведомление о произошедшем изменении:
import { Effect } from "effect"
// State-first: агрегат обновляет состояние, потом порождает событие
interface Todo {
readonly id: TodoId
readonly title: TodoTitle
readonly status: "active" | "completed" | "archived"
readonly priority: Priority
readonly version: number
readonly completedAt: Date | null
}
interface AggregateResult<S> {
readonly state: S
readonly events: ReadonlyArray<TodoEvent>
}
// ─── State-first подход ─────────────────────────────────────
const completeTodoStateFirst = (
todo: Todo,
completedBy: UserId,
now: Date,
ctx: EventContext,
): Effect.Effect<AggregateResult<Todo>, InvalidTransition> => {
// 1. Проверяем инварианты
if (todo.status !== "active") {
return Effect.fail(
new InvalidTransition({
from: todo.status,
to: "completed",
reason: `Cannot complete todo in status "${todo.status}"`,
})
)
}
// 2. Вычисляем новое состояние
const updatedTodo: Todo = {
...todo,
status: "completed" as const,
completedAt: now,
version: todo.version + 1,
}
// 3. Порождаем событие как уведомление
const event = new TodoCompleted({
eventId: generateEventId(),
aggregateId: todo.id,
aggregateVersion: updatedTodo.version,
occurredAt: now,
schemaVersion: 1,
correlationId: ctx.correlationId,
causationId: ctx.causationId,
triggeredBy: completedBy,
todoId: todo.id,
title: todo.title,
completedAt: now,
completedBy,
wasOverdue: todo.dueDate !== undefined && todo.dueDate < now,
})
return Effect.succeed({ state: updatedTodo, events: [event] })
}
Характеристики State-first:
- Состояние вычисляется непосредственно из текущего состояния
- Событие создаётся как побочный продукт
- Проще для понимания
- Подходит для CRUD-систем с добавлением событий
Event-first: сначала событие, потом состояние
В подходе Event-first агрегат сначала решает, какое событие должно произойти, а потом вычисляет новое состояние путём применения этого события к текущему состоянию:
// Event-first: агрегат порождает событие, потом применяет к состоянию
// ─── Функция apply: (State, Event) → State ──────────────────
const applyEvent = (state: Todo, event: TodoEvent): Todo => {
switch (event._tag) {
case "TodoCreated":
return {
id: event.todoId,
title: event.title,
status: "active" as const,
priority: event.priority,
version: event.aggregateVersion,
completedAt: null,
}
case "TodoCompleted":
return {
...state,
status: "completed" as const,
completedAt: event.completedAt,
version: event.aggregateVersion,
}
case "TodoTitleChanged":
return {
...state,
title: event.newTitle,
version: event.aggregateVersion,
}
case "TodoPriorityChanged":
return {
...state,
priority: event.newPriority,
version: event.aggregateVersion,
}
case "TodoArchived":
return {
...state,
status: "archived" as const,
version: event.aggregateVersion,
}
case "TodoReopened":
return {
...state,
status: "active" as const,
completedAt: null,
version: event.aggregateVersion,
}
case "TodoDueDateSet":
return {
...state,
version: event.aggregateVersion,
}
}
}
// ─── Event-first подход ─────────────────────────────────────
const completeTodoEventFirst = (
todo: Todo,
completedBy: UserId,
now: Date,
ctx: EventContext,
): Effect.Effect<AggregateResult<Todo>, InvalidTransition> => {
// 1. Проверяем инварианты
if (todo.status !== "active") {
return Effect.fail(
new InvalidTransition({
from: todo.status,
to: "completed",
reason: `Cannot complete todo in status "${todo.status}"`,
})
)
}
// 2. Решаем, какое событие должно произойти
const event = new TodoCompleted({
eventId: generateEventId(),
aggregateId: todo.id,
aggregateVersion: todo.version + 1,
occurredAt: now,
schemaVersion: 1,
correlationId: ctx.correlationId,
causationId: ctx.causationId,
triggeredBy: completedBy,
todoId: todo.id,
title: todo.title,
completedAt: now,
completedBy,
wasOverdue: todo.dueDate !== undefined && todo.dueDate < now,
})
// 3. Вычисляем состояние, ПРИМЕНЯЯ событие
const updatedTodo = applyEvent(todo, event)
return Effect.succeed({ state: updatedTodo, events: [event] })
}
Характеристики Event-first:
- Событие — первичный артефакт, состояние — производная
- Функция
applyEvent— единственное место, где меняется состояние - Обязательна для Event Sourcing
- Гарантирует консистентность: одно и то же событие всегда даёт одно и то же состояние
Сравнение подходов
| Аспект | State-first | Event-first |
|---|---|---|
| Порядок | State → Event | Event → State |
| Сложность | Проще | Сложнее |
| Event Sourcing | Не подходит | Идеально подходит |
| Консистентность | Может расходиться | Гарантирована |
| Тестируемость | Обычная | Отличная (applyEvent — чистая функция) |
| Рекомендация | CRUD + Events | DDD + Event Sourcing |
Для нашего курса мы используем Event-first подход, поскольку он:
- Гарантирует, что состояние и события всегда согласованы
- Подготавливает к Event Sourcing (Часть IX)
- Делает функцию
applyEventтестируемой отдельно от бизнес-логики
Функция apply: сердце Event-first подхода
Функция apply (или evolve, fold) — это чистая функция, которая вычисляет новое состояние из текущего состояния и события:
apply: (State, Event) → State
Свойства:
- Чистая функция — нет побочных эффектов, только вычисление
- Детерминированная — одинаковые входы всегда дают одинаковый выход
- Тотальная — определена для всех комбинаций State × Event
- Единственное место изменения состояния — больше нигде состояние не меняется
// apply — ЧИСТАЯ функция, без Effect, без побочных эффектов
// Именно поэтому она не возвращает Effect, а возвращает обычное значение
const applyTodoEvent = (state: Todo, event: TodoEvent): Todo => {
switch (event._tag) {
case "TodoCreated":
return {
id: event.todoId,
title: event.title,
status: "active",
priority: event.priority,
version: event.aggregateVersion,
completedAt: null,
}
case "TodoCompleted":
return {
...state,
status: "completed",
completedAt: event.completedAt,
version: event.aggregateVersion,
}
case "TodoTitleChanged":
return {
...state,
title: event.newTitle,
version: event.aggregateVersion,
}
case "TodoPriorityChanged":
return {
...state,
priority: event.newPriority,
version: event.aggregateVersion,
}
case "TodoDueDateSet":
return {
...state,
// dueDate обновляется, если есть в state
version: event.aggregateVersion,
}
case "TodoArchived":
return {
...state,
status: "archived",
version: event.aggregateVersion,
}
case "TodoReopened":
return {
...state,
status: "active",
completedAt: null,
version: event.aggregateVersion,
}
}
}
Восстановление состояния из событий (Preview)
Если у вас есть полная цепочка событий, состояние можно восстановить с нуля:
// Восстановление состояния из цепочки событий
// Это ОСНОВА Event Sourcing (подробно в Модулях 38–42)
const rehydrate = (events: ReadonlyArray<TodoEvent>): Todo | null => {
if (events.length === 0) return null
// Array.reduce с apply — fold по цепочке событий
return events.reduce<Todo | null>(
(state, event) => {
if (state === null && event._tag !== "TodoCreated") {
throw new Error("First event must be TodoCreated")
}
// Для первого события (TodoCreated) state может быть null
// applyTodoEvent обрабатывает TodoCreated без зависимости от state
return applyTodoEvent(
state ?? ({} as Todo), // начальное состояние для TodoCreated
event
)
},
null,
)
}
// Пример использования:
const events: ReadonlyArray<TodoEvent> = [
new TodoCreated({ /* ... */ todoId: "1", title: "Write article", priority: "medium" }),
new TodoTitleChanged({ /* ... */ oldTitle: "Write article", newTitle: "Write DDD article" }),
new TodoPriorityChanged({ /* ... */ oldPriority: "medium", newPriority: "high" }),
new TodoCompleted({ /* ... */ completedAt: new Date() }),
]
const currentState = rehydrate(events)
// currentState = {
// id: "1",
// title: "Write DDD article", // ← изменён TodoTitleChanged
// status: "completed", // ← изменён TodoCompleted
// priority: "high", // ← изменён TodoPriorityChanged
// completedAt: Date,
// version: 4,
// }
Паттерн: агрегат накапливает события
В продвинутых реализациях агрегат хранит список непубликованных событий:
// ─── Агрегат с накоплением событий ──────────────────────────
interface TodoAggregate {
readonly state: Todo
readonly uncommittedEvents: ReadonlyArray<TodoEvent>
}
const createTodoAggregate = (state: Todo): TodoAggregate => ({
state,
uncommittedEvents: [],
})
// Функция, которая порождает событие и применяет его к агрегату
const emitEvent = (
aggregate: TodoAggregate,
event: TodoEvent,
): TodoAggregate => ({
state: applyTodoEvent(aggregate.state, event),
uncommittedEvents: [...aggregate.uncommittedEvents, event],
})
// ─── Операции над агрегатом ─────────────────────────────────
const complete = (
aggregate: TodoAggregate,
completedBy: UserId,
now: Date,
ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
if (aggregate.state.status !== "active") {
return Effect.fail(new InvalidTransition({
from: aggregate.state.status,
to: "completed",
reason: "Only active todos can be completed",
}))
}
const event = new TodoCompleted({
eventId: generateEventId(),
aggregateId: aggregate.state.id,
aggregateVersion: aggregate.state.version + 1,
occurredAt: now,
schemaVersion: 1,
...ctx,
todoId: aggregate.state.id,
title: aggregate.state.title,
completedAt: now,
completedBy,
wasOverdue: false, // вычислить из dueDate
})
return Effect.succeed(emitEvent(aggregate, event))
}
const changeTitle = (
aggregate: TodoAggregate,
newTitle: TodoTitle,
changedBy: UserId,
now: Date,
ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
if (aggregate.state.status === "archived") {
return Effect.fail(new InvalidTransition({
from: "archived",
to: "archived",
reason: "Cannot change title of archived todo",
}))
}
if (aggregate.state.title === newTitle) {
// Нет изменения — нет события
return Effect.succeed(aggregate)
}
const event = new TodoTitleChanged({
eventId: generateEventId(),
aggregateId: aggregate.state.id,
aggregateVersion: aggregate.state.version + aggregate.uncommittedEvents.length + 1,
occurredAt: now,
schemaVersion: 1,
...ctx,
todoId: aggregate.state.id,
oldTitle: aggregate.state.title,
newTitle,
changedBy,
})
return Effect.succeed(emitEvent(aggregate, event))
}
Сохранение и очистка событий
// После сохранения в репозиторий — очищаем uncommitted events
const markEventsAsCommitted = (aggregate: TodoAggregate): TodoAggregate => ({
state: aggregate.state,
uncommittedEvents: [], // очищаем после persist
})
// ─── Использование в Application Service ────────────────────
const completeTodoUseCase = (
todoId: TodoId,
userId: UserId,
) =>
Effect.gen(function* () {
const repo = yield* TodoRepository
const eventBus = yield* EventBus
const now = new Date()
const ctx = yield* createEventContext()
// 1. Загрузить агрегат
const todo = yield* repo.findById(todoId)
const aggregate = createTodoAggregate(todo)
// 2. Выполнить бизнес-операцию (порождает события)
const updated = yield* complete(aggregate, userId, now, ctx)
// 3. Сохранить новое состояние
yield* repo.save(updated.state)
// 4. Опубликовать накопленные события
yield* eventBus.publishAll(updated.uncommittedEvents)
// 5. Очистить uncommitted events
return markEventsAsCommitted(updated)
})
Множественные события за одну операцию
Иногда одна бизнес-операция порождает несколько событий:
// Пример: создание Todo с высоким приоритетом и дедлайном
const createTodoWithDeadline = (
params: {
readonly todoId: TodoId
readonly title: TodoTitle
readonly priority: Priority
readonly dueDate: Date
readonly createdBy: UserId
},
now: Date,
ctx: EventContext,
): TodoAggregate => {
let aggregate: TodoAggregate = {
state: {} as Todo,
uncommittedEvents: [],
}
// Событие 1: TodoCreated
const createdEvent = new TodoCreated({
eventId: generateEventId(),
aggregateId: params.todoId,
aggregateVersion: 1,
occurredAt: now,
schemaVersion: 1,
...ctx,
todoId: params.todoId,
title: params.title,
priority: params.priority,
createdBy: params.createdBy,
})
aggregate = emitEvent(aggregate, createdEvent)
// Событие 2: TodoDueDateSet
const dueDateEvent = new TodoDueDateSet({
eventId: generateEventId(),
aggregateId: params.todoId,
aggregateVersion: 2,
occurredAt: now,
schemaVersion: 1,
...ctx,
todoId: params.todoId,
dueDate: params.dueDate,
setBy: params.createdBy,
})
aggregate = emitEvent(aggregate, dueDateEvent)
return aggregate
// aggregate.uncommittedEvents = [TodoCreated, TodoDueDateSet]
}
Важное правило: нет изменения — нет события
Если операция не приводит к изменению состояния, событие не порождается:
const changeTitle = (
aggregate: TodoAggregate,
newTitle: TodoTitle,
changedBy: UserId,
now: Date,
ctx: EventContext,
): Effect.Effect<TodoAggregate, InvalidTransition> => {
// Если заголовок не изменился — событие НЕ порождается
if (aggregate.state.title === newTitle) {
return Effect.succeed(aggregate) // без изменений
}
// Заголовок изменился — порождаем событие
const event = new TodoTitleChanged({ /* ... */ })
return Effect.succeed(emitEvent(aggregate, event))
}
Это важно, потому что:
- Подписчики не должны получать «пустые» уведомления
- Event Store не должен засоряться бессмысленными записями
- Каждое событие должно отражать реальное изменение
Тестирование Event-first подхода
Функция applyEvent — чистая, и тестировать её тривиально:
import { describe, it, expect } from "bun:test"
describe("applyTodoEvent", () => {
it("TodoCreated creates initial state", () => {
const event = new TodoCreated({
eventId: "evt-1" as EventId,
aggregateId: "todo-1",
aggregateVersion: 1,
occurredAt: new Date("2025-01-15"),
schemaVersion: 1,
correlationId: "corr-1",
causationId: "cmd-1",
triggeredBy: "user-1",
todoId: "todo-1" as TodoId,
title: "Test" as TodoTitle,
priority: "medium",
createdBy: "user-1" as UserId,
})
const state = applyTodoEvent({} as Todo, event)
expect(state.id).toBe("todo-1")
expect(state.title).toBe("Test")
expect(state.status).toBe("active")
expect(state.priority).toBe("medium")
expect(state.version).toBe(1)
})
it("TodoCompleted changes status", () => {
const state: Todo = {
id: "todo-1" as TodoId,
title: "Test" as TodoTitle,
status: "active",
priority: "medium",
version: 1,
completedAt: null,
}
const event = new TodoCompleted({
// ...base fields...
aggregateVersion: 2,
todoId: "todo-1" as TodoId,
title: "Test" as TodoTitle,
completedAt: new Date("2025-01-16"),
completedBy: "user-1" as UserId,
wasOverdue: false,
})
const newState = applyTodoEvent(state, event)
expect(newState.status).toBe("completed")
expect(newState.completedAt).toEqual(new Date("2025-01-16"))
expect(newState.version).toBe(2)
})
it("rehydrate produces correct state from event chain", () => {
const events = [
new TodoCreated({ /* ... */ priority: "low" }),
new TodoPriorityChanged({ /* ... */ newPriority: "high" }),
new TodoCompleted({ /* ... */ }),
]
const state = rehydrate(events)
expect(state?.status).toBe("completed")
expect(state?.priority).toBe("high")
})
})
Связь с Event Sourcing
Паттерн, который мы описали в этой статье, — это фундамент Event Sourcing. В классической CRUD-системе:
Запись: Command → State (сохраняется в БД)
Event (публикуется как уведомление)
Чтение: SELECT * FROM todos WHERE id = ?
В Event Sourcing:
Запись: Command → Event (сохраняется в Event Store)
State (вычисляется из событий, может кешироваться)
Чтение: SELECT events FROM event_store WHERE aggregate_id = ?
→ reduce(events, initialState, applyEvent)
Подробно Event Sourcing рассматривается в Части IX (Модули 38–42). Сейчас важно понять: мы уже готовы к нему, потому что:
- У нас есть типизированные события (
Schema.TaggedClass) - У нас есть функция
applyEvent(чистая, тестируемая) - Агрегат порождает события как результат поведения
Итоги
- Агрегат порождает события — они не создаются снаружи
- Event-first подход — сначала событие, потом состояние через
applyEvent - applyEvent — чистая функция —
(State, Event) → State, без побочных эффектов - Uncommitted events — агрегат накапливает события до момента persist
- Нет изменения → нет события — только реальные изменения порождают факты
- Множественные события — одна операция может породить несколько событий
- Rehydration — состояние можно восстановить из цепочки событий (preview Event Sourcing)
Далее: 05-event-dispatching.md — Dispatching: PubSub через Effect