Effect Курс Channel: структура и назначение
Глава

Channel: структура и назначение

Channel — низкоуровневый примитив двунаправленного ввода-вывода, лежащий в основе Stream и Sink. Понимание Channel раскрывает внутреннее устройство всей потоковой подсистемы Effect.

Теория

Зачем нужен Channel

При работе с потоковыми данными мы оперируем двумя высокоуровневыми абстракциями: Stream (источник данных) и Sink (потребитель данных). Однако обе эти абстракции построены поверх единого низкоуровневого примитива — Channel.

Channel — это “узел ввода-вывода” (nexus of I/O), который поддерживает одновременно чтение и запись. Он может читать входные элементы, обрабатывать upstream-ошибки, записывать выходные элементы, завершаться с результатом или ошибкой. Эта двунаправленная природа делает Channel универсальным строительным блоком для любых потоковых операций.

┌─────────────────────────────────────────────┐
│                  Channel                     │
│                                              │
│   InElem ──────►┌──────────┐──────► OutElem  │
│                 │          │                  │
│   InErr  ──────►│  Логика  │──────► OutErr   │
│                 │          │                  │
│   InDone ──────►│          │──────► OutDone   │
│                 └──────────┘                  │
│                      │                        │
│                     Env                       │
└─────────────────────────────────────────────┘

Ключевая идея: Channel — это не просто «поток с двумя концами». Это декларативное описание программы, которая обрабатывает данные пошагово, поддерживая при этом полный контроль над ошибками, ресурсами и зависимостями.

Историческая перспектива

Channel в Effect-ts унаследован от ZIO ZChannel — абстракции, разработанной командой ZIO (Scala). В свою очередь, ZChannel вдохновлён концепцией conduit из экосистемы Haskell (библиотеки conduit, pipes, machines). Все эти абстракции решают одну фундаментальную проблему: как описать потоковую обработку данных, сохраняя композируемость, безопасность ресурсов и строгую типизацию.

Три роли Channel

Channel может выступать в трёх ролях в зависимости от того, как используются его входные и выходные каналы:

  1. Источник (Producer) — Channel, который не читает входные данные, а только производит выходные элементы. Это основа Stream.

  2. Потребитель (Consumer) — Channel, который читает входные элементы и производит финальный результат, но не эмитирует выходных элементов. Это основа Sink.

  3. Трансформер (Pipe/Transducer) — Channel, который и читает, и записывает, преобразуя входные элементы в выходные. Это основа операторов Stream (map, filter, take и т.д.).

Producer:   Channel<OutElem, unknown, OutErr, unknown, OutDone, unknown, Env>
                     ▲                  ▲                ▲
                  эмитирует         может ошибка     финальный результат

Consumer:   Channel<never, InElem, never, InErr, OutDone, InDone, Env>
                     ▲       ▲              ▲       ▲
                  не эмитирует  читает    без ошибки  обрабатывает

Pipe:       Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>
                     ▲         ▲       ▲      ▲       ▲        ▲
                  эмитирует  читает  ошибка  ошибка  результат  результат

Концепция ФП

Channel как свободная монада над протоколом ввода-вывода

С точки зрения теории функционального программирования, Channel можно рассматривать как свободную монаду (free monad) над алгеброй операций ввода-вывода. Каждый Channel описывает последовательность шагов, где каждый шаг — это одна из примитивных операций:

  • Emit(elem) — записать элемент в выходной канал
  • Read(onElem, onDone) — прочитать элемент из входного канала (или обработать завершение)
  • Done(value) — завершиться с результатом
  • Fail(error) — завершиться с ошибкой
  • Effect(effect) — выполнить побочный эффект

Эта алгебра формирует AST (абстрактное синтаксическое дерево), которое интерпретируется исполнителем (ChannelExecutor) лениво и пошагово.

Arrow-подобная композиция

Channel поддерживает два вида композиции:

  1. Последовательная композиция (flatMap, >>=) — монадическая: результат одного Channel передаётся следующему.

  2. Горизонтальная композиция (pipeTo, >>>) — arrow-подобная: выход одного Channel подключается ко входу другого. Это напоминает категорную композицию стрелок (arrows) из теории категорий.

Последовательная (flatMap):

  Channel_A ──OutDone──► f(OutDone) ──► Channel_B

Горизонтальная (pipeTo):

  Channel_A ──OutElem──► Channel_B
  Channel_A ──OutErr───► Channel_B
  Channel_A ──OutDone──► Channel_B

Корекурсия и ленивые вычисления

Channel описывает потенциально бесконечные потоки данных через корекурсию (corecursion). В отличие от рекурсии (разбирает конечную структуру), корекурсия строит потенциально бесконечную структуру лениво — по одному шагу за раз. Каждый вызов read или emit — это один корекурсивный шаг.


Анатомия типа Channel

Полная сигнатура типа

import { Channel } from "effect"

// Полная сигнатура:
// Channel<out OutElem, in InElem, out OutErr, in InErr, out OutDone, in InDone, out Env>

Семь параметров типа Channel описывают полный протокол двунаправленного взаимодействия:

Выходные параметры (Output — что Channel производит)

ПараметрОписаниеАналогия
OutElemТип элементов, которые Channel эмитирует downstreamЭлементы Stream
OutErrТип ошибки, с которой Channel может завершитьсяОшибка Effect
OutDoneТип финального значения при успешном завершенииРезультат Effect

Входные параметры (Input — что Channel потребляет)

ПараметрОписаниеАналогия
InElemТип элементов, которые Channel читает из upstreamВходные данные Sink
InErrТип ошибки, которую Channel может получить от upstreamОшибка upstream
InDoneТип финального значения от upstream при его завершенииРезультат upstream

Контекст

ПараметрОписаниеАналогия
EnvТип зависимостей, необходимых для работы ChannelRequirements Effect

Ковариантность и контравариантность

Обратите внимание на модификаторы out и in в сигнатуре типа:

  • Выходные параметры (OutElem, OutErr, OutDone, Env) — ковариантные (out). Channel, производящий string, является подтипом Channel, производящего string | number.

  • Входные параметры (InElem, InErr, InDone) — контравариантные (in). Channel, принимающий string | number, является подтипом Channel, принимающего string.

Это следует из принципа подстановки Лисков: если что-то принимает более широкий тип, его можно использовать там, где ожидается приём узкого типа.

Мнемоника для запоминания параметров

Channel<OutElem, InElem, OutErr, InErr, OutDone, InDone, Env>
         ▲        ▲       ▲      ▲       ▲        ▲      ▲
         │        │       │      │       │        │      │
         │        │       │      │       │        │      └── Зависимости (R)
         │        │       │      │       │        └── Входной результат
         │        │       │      │       └── Выходной результат
         │        │       │      └── Входная ошибка
         │        │       └── Выходная ошибка
         │        └── Входные элементы
         └── Выходные элементы

Правило: параметры чередуются «выход-вход» парами: (OutElem, InElem), (OutErr, InErr), (OutDone, InDone), и завершается Env.


Связь с Stream и Sink

Stream как Channel

Stream<A, E, R> — это, по сути, Channel, который:

  • Эмитирует Chunk<A> (выходные элементы — чанки)
  • Не читает входные данные (InElem = unknown)
  • Может завершиться с ошибкой типа E
  • Не обрабатывает входных ошибок (InErr = unknown)
  • Завершается без значимого результата (OutDone = unknown)
  • Не обрабатывает входного завершения (InDone = unknown)
  • Работает в контексте R
import { Channel, Chunk, Stream } from "effect"

// Stream<A, E, R> ≡ Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>

// Конвертация Stream → Channel
const streamChannel = Stream.toChannel(
  Stream.make(1, 2, 3)
)
// Тип: Channel<Chunk<number>, unknown, never, unknown, unknown, unknown, never>

// Конвертация Channel → Stream
const stream = Stream.fromChannel(streamChannel)
// Тип: Stream<number, never, never>

Sink как Channel

Sink<A, In, L, E, R> — это Channel, который:

  • Не эмитирует промежуточных элементов (OutElem = never)
  • Читает Chunk<In> (входные элементы — чанки)
  • Может завершиться с ошибкой типа E
  • Не обрабатывает входных ошибок (InErr = unknown)
  • Завершается с результатом типа A
  • Имеет “остаток” (leftover) типа L
import { Sink } from "effect"

// Sink<A, In, L, E, R> ≡ Channel<never, Chunk<In>, E, unknown, A, unknown, R>
//                                   ▲       ▲       ▲              ▲
//                            не эмитирует  читает  ошибка     результат

Операторы Stream как Channel-трансформеры

Когда вы пишете Stream.map(stream, f), под капотом происходит следующее:

  1. stream конвертируется в Channel (Producer)
  2. Создаётся Channel-трансформер, который читает чанки и применяет f к каждому элементу
  3. Producer и трансформер соединяются через pipeTo
  4. Результат конвертируется обратно в Stream
Stream.map(stream, f):

  stream.toChannel() ──pipeTo──► mapChannel(f) ──► Stream.fromChannel()

  Channel<Chunk<A>>   ──────►   Channel<Chunk<B>>  ──────►  Stream<B>

Таблица соответствий

АбстракцияChannel-эквивалент
Stream<A, E, R>Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>
Sink<A, In, L, E, R>Channel<never, Chunk<In>, E, unknown, A, unknown, R>
Stream.mapChannel.mapOut + pipeTo
Stream.filterChannel-трансформер с условным emit
Stream.take(n)Channel с состоянием и Done после n элементов
Stream.concatChannel.flatMap (последовательная композиция)
Stream.run(sink)stream.toChannel().pipeTo(sink.toChannel())

Модель исполнения

Pull-based модель

Channel использует pull-based модель исполнения. Это означает, что downstream-потребитель управляет скоростью обработки: он “тянет” данные из upstream, когда готов их обработать. Автоматический backpressure встроен в модель.

                    pull          pull          pull
  Downstream ◄──────────── Pipe ◄──────────── Producer
    (Sink)         Chunk<B>       Chunk<A>

  "Дай мне данные" → Pipe спрашивает Producer → Producer эмитирует

ChannelExecutor

Интерпретация (исполнение) Channel происходит через ChannelExecutor — специализированный runtime для потоковых вычислений. Он:

  1. Поддерживает стек текущих Channel-продолжений
  2. Управляет подключением upstream и downstream
  3. Корректно обрабатывает завершение (done), ошибки (fail) и прерывание (interrupt)
  4. Обеспечивает безопасное управление ресурсами (finalizers)
┌─────────────────────────────────────┐
│         ChannelExecutor              │
│                                      │
│  ┌─────────┐  ┌──────┐  ┌────────┐  │
│  │ Producer │──│ Pipe │──│ Consumer│  │
│  └─────────┘  └──────┘  └────────┘  │
│       ▲           ▲          ▲       │
│       │           │          │       │
│  Continuation  Continuation  Done    │
│     Stack        Stack       Value   │
└─────────────────────────────────────┘

Чанковая обработка

Channel оперирует чанками (Chunk<A>) вместо отдельных элементов. Это критически важная оптимизация: накладные расходы на каждый шаг Channel амортизируются по всему чанку. Вместо того чтобы выполнять полный цикл pull-emit для каждого элемента, Channel передаёт сразу группу элементов.

import { Channel, Chunk } from "effect"

// Один шаг Channel передаёт целый чанк
// а не один элемент — это амортизация O(1) на элемент
const efficientChannel = Channel.write(Chunk.make(1, 2, 3, 4, 5))
// Один write, пять элементов

Финализаторы и безопасность ресурсов

Channel поддерживает acquireRelease-семантику для безопасного управления ресурсами. При завершении Channel (успешном, ошибочном или прерванном) все зарегистрированные финализаторы выполняются в обратном порядке.

import { Channel, Effect } from "effect"

const resourceChannel = Channel.acquireReleaseOut(
  Effect.succeed("resource"),        // acquire
  (resource) => Effect.log(`Released: ${resource}`) // release
)
// Ресурс будет освобождён при любом завершении Channel

API Reference

Основные типы

import { Channel } from "effect"

// Основной тип
type Channel<
  out OutElem,
  in InElem = unknown,
  out OutErr = never,
  in InErr = unknown,
  out OutDone = void,
  in InDone = unknown,
  out Env = never
>

Конвертация между абстракциями

ФункцияСигнатураОписание
Stream.toChannelStream<A, E, R> → Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>Stream в Channel
Stream.fromChannelChannel<Chunk<A>, unknown, E, unknown, unknown, unknown, R> → Stream<A, E, R>Channel в Stream
Sink.toChannelSink<A, In, L, E, R> → Channel<never, Chunk<In>, E, unknown, A, unknown, R>Sink в Channel
Sink.fromChannelChannel → SinkChannel в Sink
Channel.toStreamАналог Stream.fromChannelChannel в Stream

Запуск Channel

ФункцияОписание
Channel.runЗапустить Channel и вернуть Effect<OutDone, OutErr, Env>
Channel.runCollectЗапустить и собрать все элементы в Chunk
Channel.runDrainЗапустить, игнорируя все элементы
import { Channel, Effect } from "effect"

// Запуск Channel, возвращает Effect
const result: Effect.Effect<void, never, never> = Channel.run(
  Channel.write(1)
)

// Запуск со сбором элементов
const collected = Channel.runCollect(
  Channel.write(1)
)
// Effect<[Chunk<number>, void], never, never>

Примеры

💻 Пример 1: Простейший Channel

import { Channel, Chunk, Effect, pipe } from "effect"

// Channel, который эмитирует три числа
const producer = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5)))
)

// Запускаем и собираем результаты
const program = Effect.gen(function* () {
  const [chunks, done] = yield* Channel.runCollect(producer)
  console.log("Элементы:", Chunk.toReadonlyArray(chunks))
  console.log("Результат:", done)
})

Effect.runPromise(program)
// Элементы: [1, 2, 3, 4, 5]
// Результат: undefined

💻 Пример 2: Channel как трансформер

import { Channel, Chunk, Effect, pipe, Stream } from "effect"

// Создаём Stream из Channel
const numbersStream = Stream.fromChannel(
  pipe(
    Channel.write(Chunk.make(1, 2, 3)),
    Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
  )
)

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(numbersStream)
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5, 6]

💻 Пример 3: Двунаправленный Channel

import { Channel, Chunk, Effect, Option } from "effect"

// Channel, который читает входные элементы и удваивает их
const doubler = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) =>
    Channel.flatMap(
      Channel.write(Chunk.map(chunk, (n) => n * 2)),
      () => doubler // рекурсивно продолжаем читать
    ),
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

// Соединяем producer и doubler
const producer = Channel.flatMap(
  Channel.write(Chunk.make(1, 2, 3)),
  () => Channel.write(Chunk.make(10, 20))
)

const pipeline = Channel.pipeTo(producer, doubler)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [2, 4, 6, 20, 40]

Упражнения

🟢 Basic

Упражнение 1: Первый Channel

Создайте Channel, который эмитирует чанки [1, 2, 3] и [4, 5, 6], а затем завершается со значением "done". Запустите его с помощью Channel.runCollect и выведите результат.

import { Channel, Chunk, Effect } from "effect"

const myChannel = // ваш код здесь

const program = Effect.gen(function* () {
  const [elements, done] = yield* Channel.runCollect(myChannel)
  console.log("Элементы:", Chunk.toReadonlyArray(elements))
  console.log("Финальное значение:", done)
})

Effect.runPromise(program)
// Элементы: [1, 2, 3, 4, 5, 6]
// Финальное значение: "done"

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const myChannel = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6))),
  Channel.flatMap(() => Channel.succeed("done" as const))
)

const program = Effect.gen(function* () {
  const [elements, done] = yield* Channel.runCollect(myChannel)
  console.log("Элементы:", Chunk.toReadonlyArray(elements))
  console.log("Финальное значение:", done)
})

Effect.runPromise(program)

Упражнение 2: Channel → Stream

Создайте Stream из Channel, который эмитирует числа от 1 до 10 в чанках по 3 элемента. Используйте Stream.fromChannel и Stream.runCollect.

Решение:

import { Channel, Chunk, Effect, Stream, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6))),
  Channel.flatMap(() => Channel.write(Chunk.make(7, 8, 9))),
  Channel.flatMap(() => Channel.write(Chunk.make(10)))
)

const stream = Stream.fromChannel(channel)

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(stream)
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

🟡 Intermediate

Упражнение 3: Трансформирующий Channel

Реализуйте Channel-трансформер filterChannel, который читает чанки чисел из upstream и пропускает только чётные числа. Соедините его с producer через Channel.pipeTo.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const filterEven: Channel.Channel<
  Chunk.Chunk<number>,
  Chunk.Chunk<number>,
  never,
  never,
  void,
  void
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) => {
    const filtered = Chunk.filter(chunk, (n) => n % 2 === 0)
    return Chunk.isEmpty(filtered)
      ? filterEven
      : Channel.flatMap(
          Channel.write(filtered),
          () => filterEven
        )
  },
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

const producer = pipe(
  Channel.write(Chunk.make(1, 2, 3, 4, 5)),
  Channel.flatMap(() => Channel.write(Chunk.make(6, 7, 8, 9, 10)))
)

const pipeline = Channel.pipeTo(producer, filterEven)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [2, 4, 6, 8, 10]

🔴 Advanced

Упражнение 4: Stateful Channel с аккумулятором

Реализуйте Channel, который работает как scan (running total): принимает чанки чисел и эмитирует чанки с накопленными суммами. Например, вход [1, 2, 3], [4, 5] → выход [1, 3, 6], [10, 15].

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const scanChannel = (
  initial: number
): Channel.Channel<
  Chunk.Chunk<number>,
  Chunk.Chunk<number>,
  never,
  never,
  void,
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<number>) => {
      let acc = initial
      const scanned = Chunk.map(chunk, (n) => {
        acc = acc + n
        return acc
      })
      return Channel.flatMap(
        Channel.write(scanned),
        () => scanChannel(acc)
      )
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })

const producer = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5)))
)

const pipeline = Channel.pipeTo(producer, scanChannel(0))

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 3, 6, 10, 15]

🔗 Связанные темы: Модуль 10 (Stream), Модуль 11 (Sink), статья 02-creating-channels.md