Effect Курс Sink структура и основы
Глава

Sink структура и основы

Sink — композируемый потребитель потоковых данных в Effect-ts, превращающий элементы Stream в единственный результат с полной типобезопасностью.

Теория

Что такое Sink

В потоковой обработке данных существует три фундаментальные абстракции: источник (producer), трансформатор (transformer) и потребитель (consumer). В Effect-ts этим ролям соответствуют:

  • Stream — ленивый источник, генерирующий последовательность элементов
  • Pipe/transform — промежуточные трансформации над потоком
  • Sink — терминальный потребитель, поглощающий элементы и вычисляющий финальный результат

Sink — это не просто callback или observer. Это полноценная алгебраическая абстракция с типизацией входов, выходов, ошибок, зависимостей и остатков (leftovers). Sink существует как reified computation — вычисление, описанное как значение первого класса, которое можно передавать, комбинировать, трансформировать и запускать.

Stream<A, E, R>  ──────────>  Sink<B, A, L, E2, R2>  ──────>  Effect<B, E | E2, R | R2>
   │                              │                                │
   │ генерирует                   │ потребляет                     │ результат
   │ элементы A                   │ элементы A                     │ значение B
   │                              │ остаток L                      │
   │                              │ ошибки E2                      │
   └──────────────────────────────┘                                │
          Stream.run(stream, sink)  ───────────────────────────────┘

Зачем нужен Sink как отдельная абстракция

Может возникнуть вопрос: зачем создавать отдельный тип, если можно использовать Stream.runForEach или Stream.runFold? Ответ заключается в композиции.

Функция runFold — это монолитная операция. Она делает одну вещь и не может быть скомбинирована с другими потребителями. Sink же — это значение, которое можно:

  • Комбинировать через zip — запускать два потребителя параллельно на одном потоке
  • Выстраивать в цепочки через flatMap — сначала один потребитель, потом другой
  • Состязать через race — кто первый закончит, тот и выигрывает
  • Трансформировать через map, mapInput, dimap — менять типы входов и выходов
  • Фильтровать через filterInput — принимать только нужные элементы

Это делает Sink по-настоящему алгебраически замкнутым: композиция двух Sink — это снова Sink.

Pull-модель и обратное давление

Effect Streams работают по pull-модели: потребитель (Sink) запрашивает данные у источника (Stream), а не наоборот. Это фундаментальное архитектурное решение обеспечивает:

  • Backpressure (обратное давление) — медленный потребитель автоматически замедляет быстрый продюсер
  • Ленивость — элементы вычисляются только когда потребитель их запрашивает
  • Предсказуемое потребление памяти — в памяти находится ровно столько элементов, сколько обрабатывается
┌──────────┐   pull    ┌──────────┐   pull    ┌──────────┐
│   Sink   │ ───────>  │ Transform│ ───────>  │  Stream  │
│(consumer)│ <───────  │  (pipe)  │ <───────  │(producer)│
│          │  element  │          │  element  │          │
└──────────┘           └──────────┘           └──────────┘

Sink запрашивает ──> Pipe запрашивает ──> Stream генерирует
     <── элемент          <── элемент          элемент

Концепция ФП

Sink как кофунктор и профунктор

В теории категорий Sink обладает свойствами нескольких алгебраических структур:

Functor (ковариантный по выходу):

  • Sink.map трансформирует результат: если Sink<A, In, L, E, R> и f: A → B, то получаем Sink<B, In, L, E, R>
  • Законы функтора: map(id) = id и map(f . g) = map(f) . map(g)

Contravariant Functor (контравариантный по входу):

  • Sink.mapInput трансформирует вход: если Sink<A, In, L, E, R> и f: In2 → In, то получаем Sink<A, In2, L, E, R>
  • Функция идёт в обратном направлении: от нового типа к старому

Profunctor (комбинация):

  • Sink.dimap трансформирует оба одновременно, что делает Sink профунктором — структурой, контравариантной по входу и ковариантной по выходу
Profunctor Sink:

          contramap          map
In2 ──────────> In ──> [Sink] ──> A ──────────> B
     mapInput              dimap           map

dimap(f, g) = mapInput(f) >>> map(g)

Sink как Monad

Sink поддерживает flatMap, что делает его монадой:

// Монадические операции для Sink
Sink.succeed(a)                      // return / pure
Sink.flatMap(sink, f: A → Sink<B>)   // bind / >>= / chain

Монадические законы:

  1. Левая единица: Sink.flatMap(Sink.succeed(a), f) ≡ f(a)
  2. Правая единица: Sink.flatMap(sink, Sink.succeed) ≡ sink
  3. Ассоциативность: Sink.flatMap(Sink.flatMap(m, f), g) ≡ Sink.flatMap(m, x => Sink.flatMap(f(x), g))

Монадичность Sink позволяет строить последовательные пайплайны потребления: первый Sink потребляет часть данных, на основе результата конструируется второй Sink, который потребляет оставшиеся данные.

Связь с категорией Kleisli

Sink можно рассматривать как стрелку в категории Kleisli для эффективных вычислений. Каждый Sink — это морфизм из потока элементов In в эффективный результат Effect<A, E, R>:

Sink: Stream<In> ──> Effect<A, E, R>

Композиция (через flatMap):
f: Stream<In> ──> Effect<A, E, R>
g: A → (Stream<In> ──> Effect<B, E, R>)
─────────────────────────────────────────
f >=> g: Stream<In> ──> Effect<B, E, R>

Анатомия типа Sink

Сигнатура типа

Sink<A, In, L, E, R>

Каждый параметр несёт конкретную семантику:

     ┌─── A:  Тип результата — что Sink возвращает после завершения
     │  ┌─── In: Тип входных элементов — что Sink потребляет из Stream
     │  │   ┌─── L: Тип остатков (leftovers) — непотреблённые элементы
     │  │   │  ┌─── E: Тип ошибок — какие ошибки может генерировать
     │  │   │  │  ┌─── R: Тип зависимостей — какие сервисы требует
     ▼  ▼   ▼  ▼  ▼
Sink<A, In, L, E, R>

Подробнее о каждом параметре:

A (Result) — тип значения, которое Sink вычисляет. Аналогично возвращаемому типу Effect<A, E, R>. Это может быть:

  • Аккумулированное значение (сумма, среднее, максимум)
  • Коллекция (Chunk<A>, HashMap<K, V>, HashSet<A>)
  • Единичный элемент (Option<A> для head/last)
  • Void (void для побочных эффектов)

In (Input) — тип элементов, потребляемых из Stream. Должен совпадать с типом A в Stream<A, E, R>.

L (Leftover) — тип элементов, которые Sink не потребил. Если Sink потребляет только часть потока (например, take(3) из потока длины 10), оставшиеся 7 элементов являются leftovers. Обычно L = In, но может отличаться.

E (Error) — тип ошибок, которые может генерировать Sink в процессе потребления. never означает, что Sink не может завершиться ошибкой.

R (Requirements) — тип зависимостей, необходимых для работы Sink (сервисы из Context). never означает отсутствие зависимостей.

Конкретные примеры типов

import { Sink, Chunk, Option } from "effect"

// Sink.sum: потребляет числа, возвращает сумму, без остатков и ошибок
// Sink<number, number, never, never, never>

// Sink.head(): берёт первый элемент, возвращает Option
// Sink<Option<A>, A, A, never, never>
//                       ^ leftover = остальные элементы

// Sink.take(3): берёт 3 элемента, возвращает Chunk
// Sink<Chunk<A>, A, A, never, never>
//                   ^ leftover = непотреблённые элементы

// Sink.collectAll(): собирает все элементы
// Sink<Chunk<A>, A, never, never, never>
//                   ^ leftover = never (всё потреблено)

// Sink.fail("error"): немедленно завершается ошибкой
// Sink<never, unknown, never, string, never>

Stream.run — точка соединения

Функция Stream.run — это мост между Stream и Sink. Она принимает поток и потребитель и возвращает Effect с результатом:

import { Stream, Sink, Effect } from "effect"

// Сигнатура:
// Stream.run: <A, E, R, B, In, L, E2, R2>(
//   stream: Stream<A, E, R>,
//   sink: Sink<B, A, L, E2, R2>
// ) => Effect<B, E | E2, R | R2>

const stream = Stream.make(1, 2, 3, 4, 5)

// Stream<number> + Sink<number, number> = Effect<number>
const result: Effect.Effect<number> = Stream.run(stream, Sink.sum)

// Запуск через Bun
const main = Effect.runPromise(result).then(console.log)
// Output: 15

Обратите внимание на объединение типов:

  • Ошибки: E | E2 — ошибки потока и потребителя объединяются
  • Зависимости: R | R2 — зависимости объединяются
  • Результат: B — определяется Sink

Связь Stream и Sink

Базовые конструкторы Sink

Effect предоставляет два фундаментальных конструктора для создания тривиальных Sink:

import { Stream, Sink, Effect } from "effect"

// ── Sink.succeed ────────────────────────────────────────────
// Создаёт Sink, который не потребляет элементов
// и немедленно возвращает указанное значение

const succeedSink = Sink.succeed(42)
// Sink<number, unknown, never, never, never>

const stream = Stream.make(1, 2, 3)

Effect.runPromise(Stream.run(stream, succeedSink)).then(console.log)
// Output: 42
// Поток проигнорирован, Sink сразу вернул значение


// ── Sink.fail ───────────────────────────────────────────────
// Создаёт Sink, который не потребляет элементов
// и немедленно завершается ошибкой

const failSink = Sink.fail("Something went wrong" as const)
// Sink<never, unknown, never, "Something went wrong", never>

Effect.runPromiseExit(Stream.run(stream, failSink)).then(console.log)
// Output:
// {
//   _id: 'Exit',
//   _tag: 'Failure',
//   cause: { _id: 'Cause', _tag: 'Fail', failure: 'Something went wrong' }
// }

Sink.drain — потребление без результата

Sink.drain — это Sink, который потребляет все элементы потока и отбрасывает их, возвращая void. Полезен когда побочные эффекты выполняются в самом потоке через Stream.tap:

import { Stream, Sink, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3, 4).pipe(
  Stream.tap((n) => Console.log(`Processing: ${n}`))
)

// drain потребляет все элементы, запуская побочные эффекты в tap
Effect.runPromise(Stream.run(stream, Sink.drain)).then(console.log)
// Output:
// Processing: 1
// Processing: 2
// Processing: 3
// Processing: 4
// undefined

Sink.forEach — побочный эффект на каждый элемент

Sink.forEach выполняет эффективную функцию для каждого элемента потока:

import { Stream, Sink, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3, 4)

// forEach принимает функцию (a: A) => Effect<void, E, R>
const sink = Sink.forEach((n: number) =>
  Console.log(`Element: ${n}`)
)
// Sink<void, number, never, never, never>

Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Element: 1
// Element: 2
// Element: 3
// Element: 4
// undefined

Sink.timed — измерение времени потребления

Sink.timed потребляет весь поток и измеряет время его выполнения, возвращая Duration:

import { Stream, Sink, Effect, Schedule } from "effect"

const stream = Stream.make(1, 2, 3, 4).pipe(
  Stream.schedule(Schedule.spaced("100 millis"))
)

Effect.runPromise(Stream.run(stream, Sink.timed)).then(console.log)
// Output: { _id: 'Duration', _tag: 'Millis', millis: ~408 }

Leftovers — остатки обработки

Концепция Leftovers

Не каждый Sink потребляет весь поток. Например, Sink.head() берёт только первый элемент, Sink.take(3) — три первых. Остальные элементы называются leftovers (остатки). Тип L в Sink<A, In, L, E, R> описывает тип этих остатков.

Stream:  [1, 2, 3, 4, 5]
Sink.take(3):
  consumed: [1, 2, 3]    → результат: Chunk(1, 2, 3)
  leftover: [4, 5]       → остатки, доступные через collectLeftover

Sink.collectLeftover

Оператор Sink.collectLeftover оборачивает Sink, чтобы вместе с результатом вернуть и непотреблённые элементы:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// ── Пример 1: take(3) оставляет 2 элемента ────────────
const sink1 = Sink.take<number>(3).pipe(Sink.collectLeftover)

Effect.runPromise(Stream.run(stream, sink1)).then(console.log)
// Output:
// [
//   { _id: 'Chunk', values: [ 1, 2, 3 ] },   ← результат
//   { _id: 'Chunk', values: [ 4, 5 ] }        ← leftovers
// ]


// ── Пример 2: head() оставляет все кроме первого ──────
const sink2 = Sink.head<number>().pipe(Sink.collectLeftover)

Effect.runPromise(Stream.run(stream, sink2)).then(console.log)
// Output:
// [
//   { _id: 'Option', _tag: 'Some', value: 1 },  ← первый элемент
//   { _id: 'Chunk', values: [ 2, 3, 4, 5 ] }    ← leftovers
// ]

Sink.ignoreLeftover

Если остатки не нужны, их можно явно отбросить с помощью Sink.ignoreLeftover. Это полезно для документирования намерения и предотвращения утечек:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Берём 3 элемента и игнорируем остаток
const sink = Sink.take<number>(3).pipe(
  Sink.ignoreLeftover,
  Sink.collectLeftover // проверяем — leftovers теперь пустые
)

Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// [ { _id: 'Chunk', values: [ 1, 2, 3 ] }, { _id: 'Chunk', values: [] } ]

Роль leftovers в композиции

Leftovers критически важны при последовательной композиции Sink через flatMap. Когда первый Sink завершается, его остатки передаются следующему Sink:

Stream: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

flatMap:
  Sink1: take(3)
    consumed: [1, 2, 3]  → result: Chunk(1, 2, 3)
    leftover: [4, 5, 6, 7, 8, 9, 10]


  Sink2 (из flatMap): take(2)
    consumed: [4, 5]  → result: Chunk(4, 5)
    leftover: [6, 7, 8, 9, 10]

Без механизма leftovers последовательная композиция была бы невозможна — второй Sink не знал бы, откуда продолжать чтение.


API Reference

Базовые конструкторы

ФункцияСигнатураОписание
Sink.succeed<A>(value: A) => Sink<A, unknown, never, never, never>Sink, немедленно возвращающий значение
Sink.fail<E>(error: E) => Sink<never, unknown, never, E, never>Sink, немедленно завершающийся ошибкой
Sink.drainSink<void, unknown, never, never, never>Потребляет все, возвращает void
Sink.forEach<In, E, R>(f: (input: In) => Effect<void, E, R>) => Sink<void, In, never, E, R>Эффект на каждый элемент
Sink.timedSink<Duration, unknown, never, never, never>Измеряет время потребления

Получение элементов

ФункцияСигнатураОписание
Sink.head<In>() => Sink<Option<In>, In, In, never, never>Первый элемент
Sink.last<In>() => Sink<Option<In>, In, In, never, never>Последний элемент
Sink.take<In>(n: number) => Sink<Chunk<In>, In, In, never, never>Первые n элементов
Sink.countSink<number, unknown, never, never, never>Количество элементов
Sink.sumSink<number, number, never, never, never>Сумма числовых элементов

Операции с leftovers

ФункцияОписание
Sink.collectLeftoverВозвращает кортеж [result, leftovers]
Sink.ignoreLeftoverОтбрасывает остатки, меняя L на never

Запуск

ФункцияОписание
Stream.run(stream, sink)Запускает поток через Sink, возвращает Effect

Примеры

💻 Пример 1: Мониторинг HTTP-запросов

import { Stream, Sink, Effect, Console, pipe } from "effect"

// Модель HTTP-запроса
interface HttpRequest {
  readonly method: string
  readonly path: string
  readonly statusCode: number
  readonly durationMs: number
}

// Поток входящих запросов
const requestStream: Stream.Stream<HttpRequest> = Stream.make(
  { method: "GET", path: "/api/users", statusCode: 200, durationMs: 45 },
  { method: "POST", path: "/api/orders", statusCode: 201, durationMs: 120 },
  { method: "GET", path: "/api/products", statusCode: 500, durationMs: 2300 },
  { method: "DELETE", path: "/api/users/1", statusCode: 404, durationMs: 12 },
  { method: "GET", path: "/api/health", statusCode: 200, durationMs: 3 }
)

// Sink: логирование каждого запроса с побочным эффектом
const loggingSink = Sink.forEach((req: HttpRequest) =>
  Console.log(`[${req.method}] ${req.path} → ${req.statusCode} (${req.durationMs}ms)`)
)

const program = Stream.run(requestStream, loggingSink)

Effect.runPromise(program)
// Output:
// [GET] /api/users → 200 (45ms)
// [POST] /api/orders → 201 (120ms)
// [GET] /api/products → 500 (2300ms)
// [DELETE] /api/users/1 → 404 (12ms)
// [GET] /api/health → 200 (3ms)

💻 Пример 2: Peek at first and process rest

import { Stream, Sink, Effect, Option, Chunk, pipe } from "effect"

// Поток данных
const dataStream = Stream.make("header", "row1", "row2", "row3", "row4")

// Берём первый элемент и смотрим остатки
const peekSink = Sink.head<string>().pipe(Sink.collectLeftover)

const program = Effect.gen(function* () {
  const [header, rest] = yield* Stream.run(dataStream, peekSink)

  if (Option.isSome(header)) {
    yield* Effect.log(`Header: ${header.value}`)
    yield* Effect.log(`Remaining rows: ${Chunk.toReadonlyArray(rest).length}`)
  }
})

Effect.runPromise(program)
// Output:
// Header: header
// Remaining rows: 4

💻 Пример 3: Sink как значение первого класса

import { Stream, Sink, Effect, pipe } from "effect"

// Sink — это значение, его можно хранить в переменных, массивах, передавать
const sinks = {
  total: Sink.sum,
  count: Sink.count,
  first: Sink.head<number>(),
  last: Sink.last<number>(),
  all: Sink.collectAll<number>(),
} as const

const stream = Stream.make(10, 20, 30, 40, 50)

const program = Effect.gen(function* () {
  const total = yield* Stream.run(stream, sinks.total)
  const count = yield* Stream.run(stream, sinks.count)
  const first = yield* Stream.run(stream, sinks.first)
  const last = yield* Stream.run(stream, sinks.last)
  const all = yield* Stream.run(stream, sinks.all)

  yield* Effect.log(`Total: ${total}`)      // 150
  yield* Effect.log(`Count: ${count}`)      // 5
  yield* Effect.log(`First: ${first}`)      // Some(10)
  yield* Effect.log(`Last: ${last}`)        // Some(50)
  yield* Effect.log(`All: ${all}`)          // Chunk(10, 20, 30, 40, 50)
})

Effect.runPromise(program)

Упражнения

🟢 Basic

Упражнение 1: Первый и последний

Дан поток чисел от 1 до 100. Используя Sink.head и Sink.last, получите первый и последний элементы. Выведите оба значения.

import { Stream, Sink, Effect, Option } from "effect"

const stream = Stream.range(1, 101)

const program = Effect.gen(function* () {
  // Ваш код здесь
})

Effect.runPromise(program)

Решение:

import { Stream, Sink, Effect, Option } from "effect"

const stream = Stream.range(1, 101)

const program = Effect.gen(function* () {
  const first = yield* Stream.run(stream, Sink.head<number>())
  const last = yield* Stream.run(stream, Sink.last<number>())

  const firstValue = Option.getOrThrow(first)
  const lastValue = Option.getOrThrow(last)

  yield* Effect.log(`First: ${firstValue}, Last: ${lastValue}`)
})

Effect.runPromise(program)
// Output: First: 1, Last: 100

Упражнение 2: Drain с побочными эффектами

Создайте поток из 5 строк. Используйте Stream.tap для логирования каждой строки и Sink.drain для запуска потока. Убедитесь, что все строки выведены.

import { Stream, Sink, Effect, Console } from "effect"

const messages = Stream.make("Hello", "World", "Effect", "is", "great")

const program = Effect.gen(function* () {
  // Ваш код здесь
})

Effect.runPromise(program)

Решение:

import { Stream, Sink, Effect, Console } from "effect"

const messages = Stream.make("Hello", "World", "Effect", "is", "great")

const program = Effect.gen(function* () {
  yield* messages.pipe(
    Stream.tap((msg) => Console.log(`>> ${msg}`)),
    Stream.run(Sink.drain)
  )
})

Effect.runPromise(program)
// Output:
// >> Hello
// >> World
// >> Effect
// >> is
// >> great

🟡 Intermediate

Упражнение 3: Leftovers inspector

Дан поток чисел от 1 до 10. Используя Sink.take(4) и Sink.collectLeftover, получите первые 4 элемента и остатки. Затем посчитайте сумму остатков, создав из них новый поток.

import { Stream, Sink, Effect, Chunk } from "effect"

const stream = Stream.range(1, 11)

const program = Effect.gen(function* () {
  // Ваш код здесь
})

Effect.runPromise(program)

Решение:

import { Stream, Sink, Effect, Chunk } from "effect"

const stream = Stream.range(1, 11)

const program = Effect.gen(function* () {
  const [taken, leftovers] = yield* Stream.run(
    stream,
    Sink.take<number>(4).pipe(Sink.collectLeftover)
  )

  yield* Effect.log(`Taken: ${Chunk.toReadonlyArray(taken)}`)
  yield* Effect.log(`Leftovers: ${Chunk.toReadonlyArray(leftovers)}`)

  // Создаём новый поток из leftovers и считаем сумму
  const leftoverSum = yield* Stream.fromChunk(leftovers).pipe(
    Stream.run(Sink.sum)
  )

  yield* Effect.log(`Sum of leftovers: ${leftoverSum}`)
})

Effect.runPromise(program)
// Output:
// Taken: 1,2,3,4
// Leftovers: 5,6,7,8,9,10
// Sum of leftovers: 45

🔴 Advanced

Упражнение 4: Generic stats collector

Создайте функцию collectStats, которая для потока чисел возвращает объект { count, sum, first, last }, используя несколько Sink. Функция должна быть полностью generic по типу ошибок и зависимостей потока.

import { Stream, Sink, Effect, Option } from "effect"

interface StreamStats {
  readonly count: number
  readonly sum: number
  readonly first: Option.Option<number>
  readonly last: Option.Option<number>
}

const collectStats = <E, R>(
  stream: Stream.Stream<number, E, R>
): Effect.Effect<StreamStats, E, R> => {
  // Ваш код здесь
}

Решение:

import { Stream, Sink, Effect, Option } from "effect"

interface StreamStats {
  readonly count: number
  readonly sum: number
  readonly first: Option.Option<number>
  readonly last: Option.Option<number>
}

const collectStats = <E, R>(
  stream: Stream.Stream<number, E, R>
): Effect.Effect<StreamStats, E, R> =>
  Effect.gen(function* () {
    const count = yield* Stream.run(stream, Sink.count)
    const sum = yield* Stream.run(stream, Sink.sum)
    const first = yield* Stream.run(stream, Sink.head<number>())
    const last = yield* Stream.run(stream, Sink.last<number>())

    return { count, sum, first, last } as const
  })

// Тест
const program = Effect.gen(function* () {
  const stats = yield* collectStats(Stream.range(1, 101))

  yield* Effect.log(`Count: ${stats.count}`)
  yield* Effect.log(`Sum: ${stats.sum}`)
  yield* Effect.log(`First: ${stats.first}`)
  yield* Effect.log(`Last: ${stats.last}`)
})

Effect.runPromise(program)
// Output:
// Count: 100
// Sum: 5050
// First: { _id: 'Option', _tag: 'Some', value: 1 }
// Last: { _id: 'Option', _tag: 'Some', value: 100 }

🔗 Далее: Стандартные Sink →