Sink структура и основы
Sink — композируемый потребитель потоковых данных в Effect-ts, превращающий элементы Stream в единственный результат с полной типобезопасностью.
Теория
Что такое Sink
В потоковой обработке данных существует три фундаментальные абстракции: источник (producer), трансформатор (transformer) и потребитель (consumer). В Effect-ts этим ролям соответствуют:
- Stream — ленивый источник, генерирующий последовательность элементов
- Pipe/transform — промежуточные трансформации над потоком
- Sink — терминальный потребитель, поглощающий элементы и вычисляющий финальный результат
Sink — это не просто callback или observer. Это полноценная алгебраическая абстракция с типизацией входов, выходов, ошибок, зависимостей и остатков (leftovers). Sink существует как reified computation — вычисление, описанное как значение первого класса, которое можно передавать, комбинировать, трансформировать и запускать.
Stream<A, E, R> ──────────> Sink<B, A, L, E2, R2> ──────> Effect<B, E | E2, R | R2>
│ │ │
│ генерирует │ потребляет │ результат
│ элементы A │ элементы A │ значение B
│ │ остаток L │
│ │ ошибки E2 │
└──────────────────────────────┘ │
Stream.run(stream, sink) ───────────────────────────────┘
Зачем нужен Sink как отдельная абстракция
Может возникнуть вопрос: зачем создавать отдельный тип, если можно использовать Stream.runForEach или Stream.runFold? Ответ заключается в композиции.
Функция runFold — это монолитная операция. Она делает одну вещь и не может быть скомбинирована с другими потребителями. Sink же — это значение, которое можно:
- Комбинировать через
zip— запускать два потребителя параллельно на одном потоке - Выстраивать в цепочки через
flatMap— сначала один потребитель, потом другой - Состязать через
race— кто первый закончит, тот и выигрывает - Трансформировать через
map,mapInput,dimap— менять типы входов и выходов - Фильтровать через
filterInput— принимать только нужные элементы
Это делает Sink по-настоящему алгебраически замкнутым: композиция двух Sink — это снова Sink.
Pull-модель и обратное давление
Effect Streams работают по pull-модели: потребитель (Sink) запрашивает данные у источника (Stream), а не наоборот. Это фундаментальное архитектурное решение обеспечивает:
- Backpressure (обратное давление) — медленный потребитель автоматически замедляет быстрый продюсер
- Ленивость — элементы вычисляются только когда потребитель их запрашивает
- Предсказуемое потребление памяти — в памяти находится ровно столько элементов, сколько обрабатывается
┌──────────┐ pull ┌──────────┐ pull ┌──────────┐
│ Sink │ ───────> │ Transform│ ───────> │ Stream │
│(consumer)│ <─────── │ (pipe) │ <─────── │(producer)│
│ │ element │ │ element │ │
└──────────┘ └──────────┘ └──────────┘
Sink запрашивает ──> Pipe запрашивает ──> Stream генерирует
<── элемент <── элемент элемент
Концепция ФП
Sink как кофунктор и профунктор
В теории категорий Sink обладает свойствами нескольких алгебраических структур:
Functor (ковариантный по выходу):
Sink.mapтрансформирует результат: еслиSink<A, In, L, E, R>иf: A → B, то получаемSink<B, In, L, E, R>- Законы функтора:
map(id) = idиmap(f . g) = map(f) . map(g)
Contravariant Functor (контравариантный по входу):
Sink.mapInputтрансформирует вход: еслиSink<A, In, L, E, R>иf: In2 → In, то получаемSink<A, In2, L, E, R>- Функция идёт в обратном направлении: от нового типа к старому
Profunctor (комбинация):
Sink.dimapтрансформирует оба одновременно, что делает Sink профунктором — структурой, контравариантной по входу и ковариантной по выходу
Profunctor Sink:
contramap map
In2 ──────────> In ──> [Sink] ──> A ──────────> B
mapInput dimap map
dimap(f, g) = mapInput(f) >>> map(g)
Sink как Monad
Sink поддерживает flatMap, что делает его монадой:
// Монадические операции для Sink
Sink.succeed(a) // return / pure
Sink.flatMap(sink, f: A → Sink<B>) // bind / >>= / chain
Монадические законы:
- Левая единица:
Sink.flatMap(Sink.succeed(a), f) ≡ f(a) - Правая единица:
Sink.flatMap(sink, Sink.succeed) ≡ sink - Ассоциативность:
Sink.flatMap(Sink.flatMap(m, f), g) ≡ Sink.flatMap(m, x => Sink.flatMap(f(x), g))
Монадичность Sink позволяет строить последовательные пайплайны потребления: первый Sink потребляет часть данных, на основе результата конструируется второй Sink, который потребляет оставшиеся данные.
Связь с категорией Kleisli
Sink можно рассматривать как стрелку в категории Kleisli для эффективных вычислений. Каждый Sink — это морфизм из потока элементов In в эффективный результат Effect<A, E, R>:
Sink: Stream<In> ──> Effect<A, E, R>
Композиция (через flatMap):
f: Stream<In> ──> Effect<A, E, R>
g: A → (Stream<In> ──> Effect<B, E, R>)
─────────────────────────────────────────
f >=> g: Stream<In> ──> Effect<B, E, R>
Анатомия типа Sink
Сигнатура типа
Sink<A, In, L, E, R>
Каждый параметр несёт конкретную семантику:
┌─── A: Тип результата — что Sink возвращает после завершения
│ ┌─── In: Тип входных элементов — что Sink потребляет из Stream
│ │ ┌─── L: Тип остатков (leftovers) — непотреблённые элементы
│ │ │ ┌─── E: Тип ошибок — какие ошибки может генерировать
│ │ │ │ ┌─── R: Тип зависимостей — какие сервисы требует
▼ ▼ ▼ ▼ ▼
Sink<A, In, L, E, R>
Подробнее о каждом параметре:
A (Result) — тип значения, которое Sink вычисляет. Аналогично возвращаемому типу Effect<A, E, R>. Это может быть:
- Аккумулированное значение (сумма, среднее, максимум)
- Коллекция (
Chunk<A>,HashMap<K, V>,HashSet<A>) - Единичный элемент (
Option<A>для head/last) - Void (
voidдля побочных эффектов)
In (Input) — тип элементов, потребляемых из Stream. Должен совпадать с типом A в Stream<A, E, R>.
L (Leftover) — тип элементов, которые Sink не потребил. Если Sink потребляет только часть потока (например, take(3) из потока длины 10), оставшиеся 7 элементов являются leftovers. Обычно L = In, но может отличаться.
E (Error) — тип ошибок, которые может генерировать Sink в процессе потребления. never означает, что Sink не может завершиться ошибкой.
R (Requirements) — тип зависимостей, необходимых для работы Sink (сервисы из Context). never означает отсутствие зависимостей.
Конкретные примеры типов
import { Sink, Chunk, Option } from "effect"
// Sink.sum: потребляет числа, возвращает сумму, без остатков и ошибок
// Sink<number, number, never, never, never>
// Sink.head(): берёт первый элемент, возвращает Option
// Sink<Option<A>, A, A, never, never>
// ^ leftover = остальные элементы
// Sink.take(3): берёт 3 элемента, возвращает Chunk
// Sink<Chunk<A>, A, A, never, never>
// ^ leftover = непотреблённые элементы
// Sink.collectAll(): собирает все элементы
// Sink<Chunk<A>, A, never, never, never>
// ^ leftover = never (всё потреблено)
// Sink.fail("error"): немедленно завершается ошибкой
// Sink<never, unknown, never, string, never>
Stream.run — точка соединения
Функция Stream.run — это мост между Stream и Sink. Она принимает поток и потребитель и возвращает Effect с результатом:
import { Stream, Sink, Effect } from "effect"
// Сигнатура:
// Stream.run: <A, E, R, B, In, L, E2, R2>(
// stream: Stream<A, E, R>,
// sink: Sink<B, A, L, E2, R2>
// ) => Effect<B, E | E2, R | R2>
const stream = Stream.make(1, 2, 3, 4, 5)
// Stream<number> + Sink<number, number> = Effect<number>
const result: Effect.Effect<number> = Stream.run(stream, Sink.sum)
// Запуск через Bun
const main = Effect.runPromise(result).then(console.log)
// Output: 15
Обратите внимание на объединение типов:
- Ошибки:
E | E2— ошибки потока и потребителя объединяются - Зависимости:
R | R2— зависимости объединяются - Результат:
B— определяется Sink
Связь Stream и Sink
Базовые конструкторы Sink
Effect предоставляет два фундаментальных конструктора для создания тривиальных Sink:
import { Stream, Sink, Effect } from "effect"
// ── Sink.succeed ────────────────────────────────────────────
// Создаёт Sink, который не потребляет элементов
// и немедленно возвращает указанное значение
const succeedSink = Sink.succeed(42)
// Sink<number, unknown, never, never, never>
const stream = Stream.make(1, 2, 3)
Effect.runPromise(Stream.run(stream, succeedSink)).then(console.log)
// Output: 42
// Поток проигнорирован, Sink сразу вернул значение
// ── Sink.fail ───────────────────────────────────────────────
// Создаёт Sink, который не потребляет элементов
// и немедленно завершается ошибкой
const failSink = Sink.fail("Something went wrong" as const)
// Sink<never, unknown, never, "Something went wrong", never>
Effect.runPromiseExit(Stream.run(stream, failSink)).then(console.log)
// Output:
// {
// _id: 'Exit',
// _tag: 'Failure',
// cause: { _id: 'Cause', _tag: 'Fail', failure: 'Something went wrong' }
// }
Sink.drain — потребление без результата
Sink.drain — это Sink, который потребляет все элементы потока и отбрасывает их, возвращая void. Полезен когда побочные эффекты выполняются в самом потоке через Stream.tap:
import { Stream, Sink, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3, 4).pipe(
Stream.tap((n) => Console.log(`Processing: ${n}`))
)
// drain потребляет все элементы, запуская побочные эффекты в tap
Effect.runPromise(Stream.run(stream, Sink.drain)).then(console.log)
// Output:
// Processing: 1
// Processing: 2
// Processing: 3
// Processing: 4
// undefined
Sink.forEach — побочный эффект на каждый элемент
Sink.forEach выполняет эффективную функцию для каждого элемента потока:
import { Stream, Sink, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3, 4)
// forEach принимает функцию (a: A) => Effect<void, E, R>
const sink = Sink.forEach((n: number) =>
Console.log(`Element: ${n}`)
)
// Sink<void, number, never, never, never>
Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Element: 1
// Element: 2
// Element: 3
// Element: 4
// undefined
Sink.timed — измерение времени потребления
Sink.timed потребляет весь поток и измеряет время его выполнения, возвращая Duration:
import { Stream, Sink, Effect, Schedule } from "effect"
const stream = Stream.make(1, 2, 3, 4).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
Effect.runPromise(Stream.run(stream, Sink.timed)).then(console.log)
// Output: { _id: 'Duration', _tag: 'Millis', millis: ~408 }
Leftovers — остатки обработки
Концепция Leftovers
Не каждый Sink потребляет весь поток. Например, Sink.head() берёт только первый элемент, Sink.take(3) — три первых. Остальные элементы называются leftovers (остатки). Тип L в Sink<A, In, L, E, R> описывает тип этих остатков.
Stream: [1, 2, 3, 4, 5]
Sink.take(3):
consumed: [1, 2, 3] → результат: Chunk(1, 2, 3)
leftover: [4, 5] → остатки, доступные через collectLeftover
Sink.collectLeftover
Оператор Sink.collectLeftover оборачивает Sink, чтобы вместе с результатом вернуть и непотреблённые элементы:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// ── Пример 1: take(3) оставляет 2 элемента ────────────
const sink1 = Sink.take<number>(3).pipe(Sink.collectLeftover)
Effect.runPromise(Stream.run(stream, sink1)).then(console.log)
// Output:
// [
// { _id: 'Chunk', values: [ 1, 2, 3 ] }, ← результат
// { _id: 'Chunk', values: [ 4, 5 ] } ← leftovers
// ]
// ── Пример 2: head() оставляет все кроме первого ──────
const sink2 = Sink.head<number>().pipe(Sink.collectLeftover)
Effect.runPromise(Stream.run(stream, sink2)).then(console.log)
// Output:
// [
// { _id: 'Option', _tag: 'Some', value: 1 }, ← первый элемент
// { _id: 'Chunk', values: [ 2, 3, 4, 5 ] } ← leftovers
// ]
Sink.ignoreLeftover
Если остатки не нужны, их можно явно отбросить с помощью Sink.ignoreLeftover. Это полезно для документирования намерения и предотвращения утечек:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Берём 3 элемента и игнорируем остаток
const sink = Sink.take<number>(3).pipe(
Sink.ignoreLeftover,
Sink.collectLeftover // проверяем — leftovers теперь пустые
)
Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// [ { _id: 'Chunk', values: [ 1, 2, 3 ] }, { _id: 'Chunk', values: [] } ]
Роль leftovers в композиции
Leftovers критически важны при последовательной композиции Sink через flatMap. Когда первый Sink завершается, его остатки передаются следующему Sink:
Stream: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
flatMap:
Sink1: take(3)
consumed: [1, 2, 3] → result: Chunk(1, 2, 3)
leftover: [4, 5, 6, 7, 8, 9, 10]
│
▼
Sink2 (из flatMap): take(2)
consumed: [4, 5] → result: Chunk(4, 5)
leftover: [6, 7, 8, 9, 10]
Без механизма leftovers последовательная композиция была бы невозможна — второй Sink не знал бы, откуда продолжать чтение.
API Reference
Базовые конструкторы
| Функция | Сигнатура | Описание |
|---|---|---|
Sink.succeed | <A>(value: A) => Sink<A, unknown, never, never, never> | Sink, немедленно возвращающий значение |
Sink.fail | <E>(error: E) => Sink<never, unknown, never, E, never> | Sink, немедленно завершающийся ошибкой |
Sink.drain | Sink<void, unknown, never, never, never> | Потребляет все, возвращает void |
Sink.forEach | <In, E, R>(f: (input: In) => Effect<void, E, R>) => Sink<void, In, never, E, R> | Эффект на каждый элемент |
Sink.timed | Sink<Duration, unknown, never, never, never> | Измеряет время потребления |
Получение элементов
| Функция | Сигнатура | Описание |
|---|---|---|
Sink.head | <In>() => Sink<Option<In>, In, In, never, never> | Первый элемент |
Sink.last | <In>() => Sink<Option<In>, In, In, never, never> | Последний элемент |
Sink.take | <In>(n: number) => Sink<Chunk<In>, In, In, never, never> | Первые n элементов |
Sink.count | Sink<number, unknown, never, never, never> | Количество элементов |
Sink.sum | Sink<number, number, never, never, never> | Сумма числовых элементов |
Операции с leftovers
| Функция | Описание |
|---|---|
Sink.collectLeftover | Возвращает кортеж [result, leftovers] |
Sink.ignoreLeftover | Отбрасывает остатки, меняя L на never |
Запуск
| Функция | Описание |
|---|---|
Stream.run(stream, sink) | Запускает поток через Sink, возвращает Effect |
Примеры
💻 Пример 1: Мониторинг HTTP-запросов
import { Stream, Sink, Effect, Console, pipe } from "effect"
// Модель HTTP-запроса
interface HttpRequest {
readonly method: string
readonly path: string
readonly statusCode: number
readonly durationMs: number
}
// Поток входящих запросов
const requestStream: Stream.Stream<HttpRequest> = Stream.make(
{ method: "GET", path: "/api/users", statusCode: 200, durationMs: 45 },
{ method: "POST", path: "/api/orders", statusCode: 201, durationMs: 120 },
{ method: "GET", path: "/api/products", statusCode: 500, durationMs: 2300 },
{ method: "DELETE", path: "/api/users/1", statusCode: 404, durationMs: 12 },
{ method: "GET", path: "/api/health", statusCode: 200, durationMs: 3 }
)
// Sink: логирование каждого запроса с побочным эффектом
const loggingSink = Sink.forEach((req: HttpRequest) =>
Console.log(`[${req.method}] ${req.path} → ${req.statusCode} (${req.durationMs}ms)`)
)
const program = Stream.run(requestStream, loggingSink)
Effect.runPromise(program)
// Output:
// [GET] /api/users → 200 (45ms)
// [POST] /api/orders → 201 (120ms)
// [GET] /api/products → 500 (2300ms)
// [DELETE] /api/users/1 → 404 (12ms)
// [GET] /api/health → 200 (3ms)
💻 Пример 2: Peek at first and process rest
import { Stream, Sink, Effect, Option, Chunk, pipe } from "effect"
// Поток данных
const dataStream = Stream.make("header", "row1", "row2", "row3", "row4")
// Берём первый элемент и смотрим остатки
const peekSink = Sink.head<string>().pipe(Sink.collectLeftover)
const program = Effect.gen(function* () {
const [header, rest] = yield* Stream.run(dataStream, peekSink)
if (Option.isSome(header)) {
yield* Effect.log(`Header: ${header.value}`)
yield* Effect.log(`Remaining rows: ${Chunk.toReadonlyArray(rest).length}`)
}
})
Effect.runPromise(program)
// Output:
// Header: header
// Remaining rows: 4
💻 Пример 3: Sink как значение первого класса
import { Stream, Sink, Effect, pipe } from "effect"
// Sink — это значение, его можно хранить в переменных, массивах, передавать
const sinks = {
total: Sink.sum,
count: Sink.count,
first: Sink.head<number>(),
last: Sink.last<number>(),
all: Sink.collectAll<number>(),
} as const
const stream = Stream.make(10, 20, 30, 40, 50)
const program = Effect.gen(function* () {
const total = yield* Stream.run(stream, sinks.total)
const count = yield* Stream.run(stream, sinks.count)
const first = yield* Stream.run(stream, sinks.first)
const last = yield* Stream.run(stream, sinks.last)
const all = yield* Stream.run(stream, sinks.all)
yield* Effect.log(`Total: ${total}`) // 150
yield* Effect.log(`Count: ${count}`) // 5
yield* Effect.log(`First: ${first}`) // Some(10)
yield* Effect.log(`Last: ${last}`) // Some(50)
yield* Effect.log(`All: ${all}`) // Chunk(10, 20, 30, 40, 50)
})
Effect.runPromise(program)
Упражнения
🟢 Basic
Упражнение 1: Первый и последний
Дан поток чисел от 1 до 100. Используя Sink.head и Sink.last, получите первый и последний элементы. Выведите оба значения.
import { Stream, Sink, Effect, Option } from "effect"
const stream = Stream.range(1, 101)
const program = Effect.gen(function* () {
// Ваш код здесь
})
Effect.runPromise(program)
Решение:
import { Stream, Sink, Effect, Option } from "effect"
const stream = Stream.range(1, 101)
const program = Effect.gen(function* () {
const first = yield* Stream.run(stream, Sink.head<number>())
const last = yield* Stream.run(stream, Sink.last<number>())
const firstValue = Option.getOrThrow(first)
const lastValue = Option.getOrThrow(last)
yield* Effect.log(`First: ${firstValue}, Last: ${lastValue}`)
})
Effect.runPromise(program)
// Output: First: 1, Last: 100
Упражнение 2: Drain с побочными эффектами
Создайте поток из 5 строк. Используйте Stream.tap для логирования каждой строки и Sink.drain для запуска потока. Убедитесь, что все строки выведены.
import { Stream, Sink, Effect, Console } from "effect"
const messages = Stream.make("Hello", "World", "Effect", "is", "great")
const program = Effect.gen(function* () {
// Ваш код здесь
})
Effect.runPromise(program)
Решение:
import { Stream, Sink, Effect, Console } from "effect"
const messages = Stream.make("Hello", "World", "Effect", "is", "great")
const program = Effect.gen(function* () {
yield* messages.pipe(
Stream.tap((msg) => Console.log(`>> ${msg}`)),
Stream.run(Sink.drain)
)
})
Effect.runPromise(program)
// Output:
// >> Hello
// >> World
// >> Effect
// >> is
// >> great
🟡 Intermediate
Упражнение 3: Leftovers inspector
Дан поток чисел от 1 до 10. Используя Sink.take(4) и Sink.collectLeftover, получите первые 4 элемента и остатки. Затем посчитайте сумму остатков, создав из них новый поток.
import { Stream, Sink, Effect, Chunk } from "effect"
const stream = Stream.range(1, 11)
const program = Effect.gen(function* () {
// Ваш код здесь
})
Effect.runPromise(program)
Решение:
import { Stream, Sink, Effect, Chunk } from "effect"
const stream = Stream.range(1, 11)
const program = Effect.gen(function* () {
const [taken, leftovers] = yield* Stream.run(
stream,
Sink.take<number>(4).pipe(Sink.collectLeftover)
)
yield* Effect.log(`Taken: ${Chunk.toReadonlyArray(taken)}`)
yield* Effect.log(`Leftovers: ${Chunk.toReadonlyArray(leftovers)}`)
// Создаём новый поток из leftovers и считаем сумму
const leftoverSum = yield* Stream.fromChunk(leftovers).pipe(
Stream.run(Sink.sum)
)
yield* Effect.log(`Sum of leftovers: ${leftoverSum}`)
})
Effect.runPromise(program)
// Output:
// Taken: 1,2,3,4
// Leftovers: 5,6,7,8,9,10
// Sum of leftovers: 45
🔴 Advanced
Упражнение 4: Generic stats collector
Создайте функцию collectStats, которая для потока чисел возвращает объект { count, sum, first, last }, используя несколько Sink. Функция должна быть полностью generic по типу ошибок и зависимостей потока.
import { Stream, Sink, Effect, Option } from "effect"
interface StreamStats {
readonly count: number
readonly sum: number
readonly first: Option.Option<number>
readonly last: Option.Option<number>
}
const collectStats = <E, R>(
stream: Stream.Stream<number, E, R>
): Effect.Effect<StreamStats, E, R> => {
// Ваш код здесь
}
Решение:
import { Stream, Sink, Effect, Option } from "effect"
interface StreamStats {
readonly count: number
readonly sum: number
readonly first: Option.Option<number>
readonly last: Option.Option<number>
}
const collectStats = <E, R>(
stream: Stream.Stream<number, E, R>
): Effect.Effect<StreamStats, E, R> =>
Effect.gen(function* () {
const count = yield* Stream.run(stream, Sink.count)
const sum = yield* Stream.run(stream, Sink.sum)
const first = yield* Stream.run(stream, Sink.head<number>())
const last = yield* Stream.run(stream, Sink.last<number>())
return { count, sum, first, last } as const
})
// Тест
const program = Effect.gen(function* () {
const stats = yield* collectStats(Stream.range(1, 101))
yield* Effect.log(`Count: ${stats.count}`)
yield* Effect.log(`Sum: ${stats.sum}`)
yield* Effect.log(`First: ${stats.first}`)
yield* Effect.log(`Last: ${stats.last}`)
})
Effect.runPromise(program)
// Output:
// Count: 100
// Sum: 5050
// First: { _id: 'Option', _tag: 'Some', value: 1 }
// Last: { _id: 'Option', _tag: 'Some', value: 100 }
🔗 Далее: Стандартные Sink →