Channel: структура и назначение
Channel — низкоуровневый примитив двунаправленного ввода-вывода, лежащий в основе Stream и Sink. Понимание Channel раскрывает внутреннее устройство всей потоковой подсистемы Effect.
Теория
Зачем нужен Channel
При работе с потоковыми данными мы оперируем двумя высокоуровневыми абстракциями: Stream (источник данных) и Sink (потребитель данных). Однако обе эти абстракции построены поверх единого низкоуровневого примитива — Channel.
Channel — это “узел ввода-вывода” (nexus of I/O), который поддерживает одновременно чтение и запись. Он может читать входные элементы, обрабатывать upstream-ошибки, записывать выходные элементы, завершаться с результатом или ошибкой. Эта двунаправленная природа делает Channel универсальным строительным блоком для любых потоковых операций.
┌─────────────────────────────────────────────┐
│ Channel │
│ │
│ InElem ──────►┌──────────┐──────► OutElem │
│ │ │ │
│ InErr ──────►│ Логика │──────► OutErr │
│ │ │ │
│ InDone ──────►│ │──────► OutDone │
│ └──────────┘ │
│ │ │
│ Env │
└─────────────────────────────────────────────┘
Ключевая идея: Channel — это не просто «поток с двумя концами». Это декларативное описание программы, которая обрабатывает данные пошагово, поддерживая при этом полный контроль над ошибками, ресурсами и зависимостями.
Историческая перспектива
Channel в Effect-ts унаследован от ZIO ZChannel — абстракции, разработанной командой ZIO (Scala). В свою очередь, ZChannel вдохновлён концепцией conduit из экосистемы Haskell (библиотеки conduit, pipes, machines). Все эти абстракции решают одну фундаментальную проблему: как описать потоковую обработку данных, сохраняя композируемость, безопасность ресурсов и строгую типизацию.
Три роли Channel
Channel может выступать в трёх ролях в зависимости от того, как используются его входные и выходные каналы:
-
Источник (Producer) — Channel, который не читает входные данные, а только производит выходные элементы. Это основа
Stream. -
Потребитель (Consumer) — Channel, который читает входные элементы и производит финальный результат, но не эмитирует выходных элементов. Это основа
Sink. -
Трансформер (Pipe/Transducer) — Channel, который и читает, и записывает, преобразуя входные элементы в выходные. Это основа операторов Stream (map, filter, take и т.д.).
Producer: Channel<OutElem, unknown, OutErr, unknown, OutDone, unknown, Env>
▲ ▲ ▲
эмитирует может ошибка финальный результат
Consumer: Channel<never, InElem, never, InErr, OutDone, InDone, Env>
▲ ▲ ▲ ▲
не эмитирует читает без ошибки обрабатывает
Pipe: Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>
▲ ▲ ▲ ▲ ▲ ▲
эмитирует читает ошибка ошибка результат результат
Концепция ФП
Channel как свободная монада над протоколом ввода-вывода
С точки зрения теории функционального программирования, Channel можно рассматривать как свободную монаду (free monad) над алгеброй операций ввода-вывода. Каждый Channel описывает последовательность шагов, где каждый шаг — это одна из примитивных операций:
- Emit(elem) — записать элемент в выходной канал
- Read(onElem, onDone) — прочитать элемент из входного канала (или обработать завершение)
- Done(value) — завершиться с результатом
- Fail(error) — завершиться с ошибкой
- Effect(effect) — выполнить побочный эффект
Эта алгебра формирует AST (абстрактное синтаксическое дерево), которое интерпретируется исполнителем (ChannelExecutor) лениво и пошагово.
Arrow-подобная композиция
Channel поддерживает два вида композиции:
-
Последовательная композиция (
flatMap,>>=) — монадическая: результат одного Channel передаётся следующему. -
Горизонтальная композиция (
pipeTo,>>>) — arrow-подобная: выход одного Channel подключается ко входу другого. Это напоминает категорную композицию стрелок (arrows) из теории категорий.
Последовательная (flatMap):
Channel_A ──OutDone──► f(OutDone) ──► Channel_B
Горизонтальная (pipeTo):
Channel_A ──OutElem──► Channel_B
Channel_A ──OutErr───► Channel_B
Channel_A ──OutDone──► Channel_B
Корекурсия и ленивые вычисления
Channel описывает потенциально бесконечные потоки данных через корекурсию (corecursion). В отличие от рекурсии (разбирает конечную структуру), корекурсия строит потенциально бесконечную структуру лениво — по одному шагу за раз. Каждый вызов read или emit — это один корекурсивный шаг.
Анатомия типа Channel
Полная сигнатура типа
import { Channel } from "effect"
// Полная сигнатура:
// Channel<out OutElem, in InElem, out OutErr, in InErr, out OutDone, in InDone, out Env>
Семь параметров типа Channel описывают полный протокол двунаправленного взаимодействия:
Выходные параметры (Output — что Channel производит)
| Параметр | Описание | Аналогия |
|---|---|---|
OutElem | Тип элементов, которые Channel эмитирует downstream | Элементы Stream |
OutErr | Тип ошибки, с которой Channel может завершиться | Ошибка Effect |
OutDone | Тип финального значения при успешном завершении | Результат Effect |
Входные параметры (Input — что Channel потребляет)
| Параметр | Описание | Аналогия |
|---|---|---|
InElem | Тип элементов, которые Channel читает из upstream | Входные данные Sink |
InErr | Тип ошибки, которую Channel может получить от upstream | Ошибка upstream |
InDone | Тип финального значения от upstream при его завершении | Результат upstream |
Контекст
| Параметр | Описание | Аналогия |
|---|---|---|
Env | Тип зависимостей, необходимых для работы Channel | Requirements Effect |
Ковариантность и контравариантность
Обратите внимание на модификаторы out и in в сигнатуре типа:
-
Выходные параметры (
OutElem,OutErr,OutDone,Env) — ковариантные (out). Channel, производящийstring, является подтипом Channel, производящегоstring | number. -
Входные параметры (
InElem,InErr,InDone) — контравариантные (in). Channel, принимающийstring | number, является подтипом Channel, принимающегоstring.
Это следует из принципа подстановки Лисков: если что-то принимает более широкий тип, его можно использовать там, где ожидается приём узкого типа.
Мнемоника для запоминания параметров
Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>
▲ ▲ ▲ ▲ ▲ ▲ ▲
│ │ │ │ │ │ │
│ │ │ │ │ │ └── Зависимости (R)
│ │ │ │ │ └── Входной результат
│ │ │ │ └── Выходной результат
│ │ │ └── Входная ошибка
│ │ └── Выходная ошибка
│ └── Входные элементы
└── Выходные элементы
Правило: параметры чередуются «выход-вход» парами: (OutElem, InElem), (OutErr, InErr), (OutDone, InDone), и завершается Env.
Связь с Stream и Sink
Stream как Channel
Stream<A, E, R> — это, по сути, Channel, который:
- Эмитирует
Chunk<A>(выходные элементы — чанки) - Не читает входные данные (
InElem = unknown) - Может завершиться с ошибкой типа
E - Не обрабатывает входных ошибок (
InErr = unknown) - Завершается без значимого результата (
OutDone = unknown) - Не обрабатывает входного завершения (
InDone = unknown) - Работает в контексте
R
import { Channel, Chunk, Stream } from "effect"
// Stream<A, E, R> ≡ Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>
// Конвертация Stream → Channel
const streamChannel = Stream.toChannel(
Stream.make(1, 2, 3)
)
// Тип: Channel<Chunk<number>, unknown, never, unknown, unknown, unknown, never>
// Конвертация Channel → Stream
const stream = Stream.fromChannel(streamChannel)
// Тип: Stream<number, never, never>
Sink как Channel
Sink<A, In, L, E, R> — это Channel, который:
- Не эмитирует промежуточных элементов (
OutElem = never) - Читает
Chunk<In>(входные элементы — чанки) - Может завершиться с ошибкой типа
E - Не обрабатывает входных ошибок (
InErr = unknown) - Завершается с результатом типа
A - Имеет “остаток” (leftover) типа
L
import { Sink } from "effect"
// Sink<A, In, L, E, R> ≡ Channel<never, Chunk<In>, E, unknown, A, unknown, R>
// ▲ ▲ ▲ ▲
// не эмитирует читает ошибка результат
Операторы Stream как Channel-трансформеры
Когда вы пишете Stream.map(stream, f), под капотом происходит следующее:
streamконвертируется в Channel (Producer)- Создаётся Channel-трансформер, который читает чанки и применяет
fк каждому элементу - Producer и трансформер соединяются через
pipeTo - Результат конвертируется обратно в Stream
Stream.map(stream, f):
stream.toChannel() ──pipeTo──► mapChannel(f) ──► Stream.fromChannel()
Channel<Chunk<A>> ──────► Channel<Chunk<B>> ──────► Stream<B>
Таблица соответствий
| Абстракция | Channel-эквивалент |
|---|---|
Stream<A, E, R> | Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R> |
Sink<A, In, L, E, R> | Channel<never, Chunk<In>, E, unknown, A, unknown, R> |
Stream.map | Channel.mapOut + pipeTo |
Stream.filter | Channel-трансформер с условным emit |
Stream.take(n) | Channel с состоянием и Done после n элементов |
Stream.concat | Channel.flatMap (последовательная композиция) |
Stream.run(sink) | stream.toChannel().pipeTo(sink.toChannel()) |
Модель исполнения
Pull-based модель
Channel использует pull-based модель исполнения. Это означает, что downstream-потребитель управляет скоростью обработки: он “тянет” данные из upstream, когда готов их обработать. Автоматический backpressure встроен в модель.
pull pull pull
Downstream ◄──────────── Pipe ◄──────────── Producer
(Sink) Chunk<B> Chunk<A>
"Дай мне данные" → Pipe спрашивает Producer → Producer эмитирует
ChannelExecutor
Интерпретация (исполнение) Channel происходит через ChannelExecutor — специализированный runtime для потоковых вычислений. Он:
- Поддерживает стек текущих Channel-продолжений
- Управляет подключением upstream и downstream
- Корректно обрабатывает завершение (done), ошибки (fail) и прерывание (interrupt)
- Обеспечивает безопасное управление ресурсами (finalizers)
┌─────────────────────────────────────┐
│ ChannelExecutor │
│ │
│ ┌─────────┐ ┌──────┐ ┌────────┐ │
│ │ Producer │──│ Pipe │──│ Consumer│ │
│ └─────────┘ └──────┘ └────────┘ │
│ ▲ ▲ ▲ │
│ │ │ │ │
│ Continuation Continuation Done │
│ Stack Stack Value │
└─────────────────────────────────────┘
Чанковая обработка
Channel оперирует чанками (Chunk<A>) вместо отдельных элементов. Это критически важная оптимизация: накладные расходы на каждый шаг Channel амортизируются по всему чанку. Вместо того чтобы выполнять полный цикл pull-emit для каждого элемента, Channel передаёт сразу группу элементов.
import { Channel, Chunk } from "effect"
// Один шаг Channel передаёт целый чанк
// а не один элемент — это амортизация O(1) на элемент
const efficientChannel = Channel.write(Chunk.make(1, 2, 3, 4, 5))
// Один write, пять элементов
Финализаторы и безопасность ресурсов
Channel поддерживает acquireRelease-семантику для безопасного управления ресурсами. При завершении Channel (успешном, ошибочном или прерванном) все зарегистрированные финализаторы выполняются в обратном порядке.
import { Channel, Effect } from "effect"
const resourceChannel = Channel.acquireReleaseOut(
Effect.succeed("resource"), // acquire
(resource) => Effect.log(`Released: ${resource}`) // release
)
// Ресурс будет освобождён при любом завершении Channel
API Reference
Основные типы
import { Channel } from "effect"
// Основной тип
type Channel<
out OutElem,
in InElem = unknown,
out OutErr = never,
in InErr = unknown,
out OutDone = void,
in InDone = unknown,
out Env = never
>
Конвертация между абстракциями
| Функция | Сигнатура | Описание |
|---|---|---|
Stream.toChannel | Stream<A, E, R> → Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R> | Stream в Channel |
Stream.fromChannel | Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R> → Stream<A, E, R> | Channel в Stream |
Sink.toChannel | Sink<A, In, L, E, R> → Channel<never, Chunk<In>, E, unknown, A, unknown, R> | Sink в Channel |
Sink.fromChannel | Channel → Sink | Channel в Sink |
Channel.toStream | Аналог Stream.fromChannel | Channel в Stream |
Запуск Channel
| Функция | Описание |
|---|---|
Channel.run | Запустить Channel и вернуть Effect<OutDone, OutErr, Env> |
Channel.runCollect | Запустить и собрать все элементы в Chunk |
Channel.runDrain | Запустить, игнорируя все элементы |
import { Channel, Effect } from "effect"
// Запуск Channel, возвращает Effect
const result: Effect.Effect<void, never, never> = Channel.run(
Channel.write(1)
)
// Запуск со сбором элементов
const collected = Channel.runCollect(
Channel.write(1)
)
// Effect<[Chunk<number>, void], never, never>
Примеры
💻 Пример 1: Простейший Channel
import { Channel, Chunk, Effect, pipe } from "effect"
// Channel, который эмитирует три числа
const producer = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5)))
)
// Запускаем и собираем результаты
const program = Effect.gen(function* () {
const [chunks, done] = yield* Channel.runCollect(producer)
console.log("Элементы:", Chunk.toReadonlyArray(chunks))
console.log("Результат:", done)
})
Effect.runPromise(program)
// Элементы: [1, 2, 3, 4, 5]
// Результат: undefined
💻 Пример 2: Channel как трансформер
import { Channel, Chunk, Effect, pipe, Stream } from "effect"
// Создаём Stream из Channel
const numbersStream = Stream.fromChannel(
pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
)
)
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(numbersStream)
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// [1, 2, 3, 4, 5, 6]
💻 Пример 3: Двунаправленный Channel
import { Channel, Chunk, Effect, Option } from "effect"
// Channel, который читает входные элементы и удваивает их
const doubler = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, (n) => n * 2)),
() => doubler // рекурсивно продолжаем читать
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// Соединяем producer и doubler
const producer = Channel.flatMap(
Channel.write(Chunk.make(1, 2, 3)),
() => Channel.write(Chunk.make(10, 20))
)
const pipeline = Channel.pipeTo(producer, doubler)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [2, 4, 6, 20, 40]
Упражнения
🟢 Basic
Упражнение 1: Первый Channel
Создайте Channel, который эмитирует чанки [1, 2, 3] и [4, 5, 6], а затем завершается со значением "done". Запустите его с помощью Channel.runCollect и выведите результат.
import { Channel, Chunk, Effect } from "effect"
const myChannel = // ваш код здесь
const program = Effect.gen(function* () {
const [elements, done] = yield* Channel.runCollect(myChannel)
console.log("Элементы:", Chunk.toReadonlyArray(elements))
console.log("Финальное значение:", done)
})
Effect.runPromise(program)
// Элементы: [1, 2, 3, 4, 5, 6]
// Финальное значение: "done"
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const myChannel = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6))),
Channel.flatMap(() => Channel.succeed("done" as const))
)
const program = Effect.gen(function* () {
const [elements, done] = yield* Channel.runCollect(myChannel)
console.log("Элементы:", Chunk.toReadonlyArray(elements))
console.log("Финальное значение:", done)
})
Effect.runPromise(program)
Упражнение 2: Channel → Stream
Создайте Stream из Channel, который эмитирует числа от 1 до 10 в чанках по 3 элемента. Используйте Stream.fromChannel и Stream.runCollect.
Решение:
import { Channel, Chunk, Effect, Stream, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6))),
Channel.flatMap(() => Channel.write(Chunk.make(7, 8, 9))),
Channel.flatMap(() => Channel.write(Chunk.make(10)))
)
const stream = Stream.fromChannel(channel)
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(stream)
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
🟡 Intermediate
Упражнение 3: Трансформирующий Channel
Реализуйте Channel-трансформер filterChannel, который читает чанки чисел из upstream и пропускает только чётные числа. Соедините его с producer через Channel.pipeTo.
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const filterEven: Channel.Channel<
Chunk.Chunk<number>,
Chunk.Chunk<number>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) => {
const filtered = Chunk.filter(chunk, (n) => n % 2 === 0)
return Chunk.isEmpty(filtered)
? filterEven
: Channel.flatMap(
Channel.write(filtered),
() => filterEven
)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
const producer = pipe(
Channel.write(Chunk.make(1, 2, 3, 4, 5)),
Channel.flatMap(() => Channel.write(Chunk.make(6, 7, 8, 9, 10)))
)
const pipeline = Channel.pipeTo(producer, filterEven)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [2, 4, 6, 8, 10]
🔴 Advanced
Упражнение 4: Stateful Channel с аккумулятором
Реализуйте Channel, который работает как scan (running total): принимает чанки чисел и эмитирует чанки с накопленными суммами. Например, вход [1, 2, 3], [4, 5] → выход [1, 3, 6], [10, 15].
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const scanChannel = (
initial: number
): Channel.Channel<
Chunk.Chunk<number>,
Chunk.Chunk<number>,
never,
never,
void,
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) => {
let acc = initial
const scanned = Chunk.map(chunk, (n) => {
acc = acc + n
return acc
})
return Channel.flatMap(
Channel.write(scanned),
() => scanChannel(acc)
)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
const producer = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5)))
)
const pipeline = Channel.pipeTo(producer, scanChannel(0))
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [1, 3, 6, 10, 15]
🔗 Связанные темы: Модуль 10 (Stream), Модуль 11 (Sink), статья 02-creating-channels.md