Effect Курс Операции над Channel
Глава

Операции над Channel

Полный обзор операций трансформации, композиции, обработки ошибок и управления ресурсами для Channel. Эти операции — строительные блоки любых потоковых пайплайнов.

Теория

Алгебра операций Channel

Операции над Channel образуют алгебру, позволяющую строить сложные потоковые пайплайны из простых компонентов. Эта алгебра включает:

  1. Функториальные операцииmap, mapOut, mapError и др. Они трансформируют значения в каналах, не меняя структуру вычисления.

  2. Монадические операцииflatMap, flatten. Они позволяют строить последовательности Channel, где следующий шаг зависит от результата предыдущего.

  3. Аппликативные операцииzip, zipRight, zipLeft. Они комбинируют результаты нескольких Channel.

  4. Операции обработки ошибокcatchAll, orElse, mapError. Они управляют ошибками.

  5. Категорные операцииpipeTo, compose. Они соединяют Channel в пайплайны.

Уровни композиции:

  map / mapOut          — трансформация значений (Functor)
  flatMap               — последовательная цепочка (Monad)
  zip / zipRight        — параллельная комбинация (Applicative)
  pipeTo                — соединение в пайплайн (Category/Arrow)
  catchAll / orElse     — обработка ошибок (MonadError)

Законы функтора для Channel

Операция map на Channel подчиняется законам функтора:

  1. Тождество: Channel.map(channel, identity) ≡ channel
  2. Композиция: Channel.map(Channel.map(channel, f), g) ≡ Channel.map(channel, x => g(f(x)))

Аналогичные законы действуют для монадических операций (flatMap) и других комбинаторов.


Трансформации

Channel.map — трансформация результата

Channel.map применяет функцию к финальному значению Channel (OutDone). Это трансформация результата, а не элементов.

import { Channel, Effect, pipe } from "effect"

const channel = pipe(
  Channel.succeed(42),
  Channel.map((n) => `Результат: ${n}`)
)
// Channel<never, unknown, never, unknown, string, unknown, never>

const program = Effect.gen(function* () {
  const [_, value] = yield* Channel.runCollect(channel)
  console.log(value) // "Результат: 42"
})

Effect.runPromise(program)

Channel.mapOut — трансформация выходных элементов

Channel.mapOut применяет функцию к каждому эмитируемому элементу. Это аналог Stream.map на уровне Channel.

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.mapOut((chunk) => Chunk.map(chunk, (n) => n * 10))
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(channel)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [10, 20, 30]

⚠️ mapOut трансформирует OutElem целиком. Если OutElem — это Chunk<number>, то функция получает весь Chunk, а не отдельные числа.

Channel.mapInput — трансформация входных элементов

Channel.mapInput трансформирует входные элементы перед их обработкой Channel. Это контравариантная операция — она “оборачивает” вход.

import { Channel, Chunk, Effect } from "effect"

// Channel, ожидающий строки
const stringProcessor = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<string>) =>
    Channel.flatMap(
      Channel.write(Chunk.map(chunk, (s) => s.toUpperCase())),
      () => stringProcessor
    ),
  onFailure: (err: never) => Channel.fail(err),
  onDone: (done: void) => Channel.succeed(done)
})

// Адаптируем вход: принимаем числа, конвертируем в строки
const numberToString = Channel.mapInput(
  stringProcessor,
  (chunk: Chunk.Chunk<number>) => Chunk.map(chunk, (n) => String(n))
)
// Теперь Channel принимает Chunk<number> вместо Chunk<string>

Channel.mapError — трансформация ошибок

Channel.mapError трансформирует ошибку Channel. Полезно для унификации типов ошибок при композиции.

import { Channel, Effect, pipe } from "effect"

class RawError {
  readonly _tag = "RawError" as const
  constructor(readonly code: number) {}
}

class AppError {
  readonly _tag = "AppError" as const
  constructor(readonly message: string) {}
}

const rawChannel = Channel.fail(new RawError(404))

const mappedChannel = pipe(
  rawChannel,
  Channel.mapError((err) => new AppError(`Error code: ${err.code}`))
)
// Channel<never, unknown, AppError, unknown, never, unknown, never>

Channel.mapInputError — трансформация входных ошибок

Channel.mapInputError трансформирует ошибки, получаемые от upstream.

import { Channel } from "effect"

// Адаптируем входную ошибку
const adapted = Channel.mapInputError(
  Channel.readWith({
    onInput: (chunk: unknown) => Channel.write(chunk),
    onFailure: (err: string) => Channel.fail(`Transformed: ${err}`),
    onDone: (done: unknown) => Channel.succeed(done)
  }),
  (rawErr: number) => `Error #${rawErr}` // number → string
)

Channel.as — замена результата константой

Channel.as заменяет финальное значение Channel на указанную константу.

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.as("completed" as const)
)

const program = Effect.gen(function* () {
  const [_, done] = yield* Channel.runCollect(channel)
  console.log(done) // "completed"
})

Effect.runPromise(program)

Монадические операции

Channel.flatMap — последовательная композиция

Channel.flatMap — основная монадическая операция. Она принимает результат первого Channel (OutDone) и создаёт следующий Channel на его основе.

import { Channel, Chunk, Effect, pipe } from "effect"

// Первый Channel эмитирует данные и возвращает их количество
const step1 = pipe(
  Channel.write(Chunk.make("a", "b", "c")),
  Channel.as(3)
)

// Второй Channel зависит от результата первого
const pipeline = Channel.flatMap(step1, (count) =>
  Channel.flatMap(
    Channel.fromEffect(
      Effect.log(`Первый шаг обработал ${count} элементов`)
    ),
    () => Channel.write(Chunk.make("d", "e"))
  )
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// timestamp=... message="Первый шаг обработал 3 элементов"
// ["a", "b", "c", "d", "e"]

Channel.flatten — убирает один уровень вложенности

Channel.flatten разворачивает Channel<..., Channel<...>> в плоский Channel. Это эквивалент Channel.flatMap(channel, identity).

import { Channel, Chunk, Effect } from "effect"

// Channel, результат которого — другой Channel
const nested = Channel.succeed(
  Channel.write(Chunk.make(1, 2, 3))
)
// Channel<never, unknown, never, unknown, Channel<Chunk<number>>, unknown, never>

const flat = Channel.flatten(nested)
// Channel<Chunk<number>, unknown, never, unknown, void, unknown, never>

Channel.zip — комбинация двух Channel

Channel.zip выполняет два Channel последовательно и возвращает кортеж их результатов.

import { Channel, Chunk, Effect, pipe } from "effect"

const ch1 = pipe(
  Channel.write(Chunk.make(1, 2)),
  Channel.as("first" as const)
)

const ch2 = pipe(
  Channel.write(Chunk.make(3, 4)),
  Channel.as("second" as const)
)

const combined = Channel.zip(ch1, ch2)

const program = Effect.gen(function* () {
  const [chunks, [a, b]] = yield* Channel.runCollect(combined)
  console.log("Элементы:", Chunk.toReadonlyArray(chunks))
  console.log("Результаты:", a, b)
})

Effect.runPromise(program)
// Элементы: [1, 2, 3, 4]
// Результаты: "first" "second"

Channel.zipRight / Channel.zipLeft

Channel.zipRight (*>) выполняет оба Channel, но возвращает только результат второго. Channel.zipLeft (<*) — только результат первого.

import { Channel, Chunk, Effect, pipe } from "effect"

// Выполнить оба, вернуть результат второго
const right = Channel.zipRight(
  pipe(Channel.write(Chunk.make(1, 2)), Channel.as("ignored")),
  pipe(Channel.write(Chunk.make(3, 4)), Channel.as("kept"))
)

const program = Effect.gen(function* () {
  const [chunks, done] = yield* Channel.runCollect(right)
  console.log("Элементы:", Chunk.toReadonlyArray(chunks))
  console.log("Результат:", done) // "kept"
})

Effect.runPromise(program)

Обработка ошибок

Channel.catchAll — перехват всех ошибок

Channel.catchAll перехватывает ошибку Channel и переключается на альтернативный Channel.

import { Channel, Chunk, Effect, pipe } from "effect"

const failing = pipe(
  Channel.write(Chunk.make(1, 2)),
  Channel.flatMap(() => Channel.fail("Oops!" as const))
)

const recovered = Channel.catchAll(failing, (error) =>
  pipe(
    Channel.fromEffect(Effect.log(`Ошибка: ${error}, переключаемся`)),
    Channel.flatMap(() => Channel.write(Chunk.make(99)))
  )
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(recovered)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Ошибка: Oops!, переключаемся
// [1, 2, 99]

Channel.catchAllCause — перехват с полной Cause

Channel.catchAllCause даёт доступ к полной причине сбоя, включая дефекты и прерывания.

import { Channel, Cause, Chunk, Effect } from "effect"

const withDefect = Channel.flatMap(
  Channel.write(Chunk.make(1)),
  () => Channel.failCause(Cause.die(new Error("Unexpected")))
)

const recovered = Channel.catchAllCause(withDefect, (cause) =>
  Channel.flatMap(
    Channel.fromEffect(
      Effect.log(`Cause: ${Cause.pretty(cause)}`)
    ),
    () => Channel.write(Chunk.make(-1))
  )
)

Channel.orElse — альтернативный Channel

Channel.orElse предоставляет альтернативный Channel на случай ошибки. В отличие от catchAll, ошибка не передаётся в обработчик.

import { Channel, Chunk, Effect } from "effect"

const primary = Channel.fail("Primary failed" as const)
const fallback = Channel.write(Chunk.make("fallback-data"))

const withFallback = Channel.orElse(primary, () => fallback)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(withFallback)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// ["fallback-data"]

Channel.orDie — конвертация ошибки в дефект

Channel.orDie преобразует типизированную ошибку в дефект (непредвиденную ошибку). Это удаляет ошибку из типа Channel, но при её возникновении программа аварийно завершится.

import { Channel, Effect, pipe } from "effect"

const mayFail: Channel.Channel<never, unknown, string, unknown, number> =
  Channel.fail("Something failed")

const neverFails = pipe(mayFail, Channel.orDie)
// Channel<never, unknown, never, unknown, number, unknown, never>
//                          ▲
//                    OutErr теперь never

Конкурентные операции

Channel.mergeWith — слияние двух Channel

Channel.mergeWith запускает два Channel параллельно и объединяет их результаты. Элементы от обоих Channel чередуются в зависимости от того, кто первым произвёл данные.

import { Channel, Chunk, Effect } from "effect"

const ch1 = Channel.write(Chunk.make("a", "b"))
const ch2 = Channel.write(Chunk.make(1, 2))

const merged = Channel.mergeWith(ch1, {
  other: ch2,
  onSelfDone: (exit) => (fiber) => fiber,
  onOtherDone: (exit) => (fiber) => fiber
})

Channel.concatMap — последовательное развёртывание

Channel.concatMap применяет функцию к каждому элементу Channel, получая новый Channel для каждого элемента, и конкатенирует их последовательно.

import { Channel, Chunk, Effect, pipe } from "effect"

const source = Channel.writeAll(
  Chunk.of(1),
  Chunk.of(2),
  Chunk.of(3)
)

// Для каждого элемента создаём подканал
const expanded = Channel.concatMap(source, (chunk) => {
  const n = Chunk.unsafeHead(chunk)
  return Channel.write(Chunk.make(n, n * 10, n * 100))
})

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(expanded)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 10, 100, 2, 20, 200, 3, 30, 300]

Запуск и исполнение

Channel.run — запуск без сбора

Channel.run запускает Channel, игнорируя все эмитируемые элементы, и возвращает финальное значение.

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2, 3)),  // эти элементы игнорируются
  Channel.as("done" as const)
)

const program = Effect.gen(function* () {
  const result = yield* Channel.run(channel)
  console.log(result) // "done"
})

Effect.runPromise(program)

Channel.runCollect — запуск со сбором

Channel.runCollect запускает Channel и собирает все эмитируемые элементы в Chunk, а также возвращает финальное значение.

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2)),
  Channel.flatMap(() => Channel.write(Chunk.make(3))),
  Channel.as("complete")
)

const program = Effect.gen(function* () {
  const [elements, done] = yield* Channel.runCollect(channel)
  console.log("Элементы:", Chunk.toReadonlyArray(elements)) // [1, 2, 3]
  console.log("Результат:", done) // "complete"
})

Effect.runPromise(program)

Channel.runDrain — запуск с отбрасыванием

Channel.runDrain запускает Channel, отбрасывая все элементы и результат. Полезно, когда важны только побочные эффекты.

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.fromEffect(Effect.log("Шаг 1")),
  Channel.flatMap(() => Channel.write(Chunk.make(1, 2, 3))),
  Channel.flatMap(() => Channel.fromEffect(Effect.log("Шаг 2")))
)

Effect.runPromise(Channel.runDrain(channel))
// timestamp=... message="Шаг 1"
// timestamp=... message="Шаг 2"

API Reference

Сводная таблица операций

ОперацияКатегорияОписание
map(f)ТрансформацияТрансформирует OutDone
mapOut(f)ТрансформацияТрансформирует OutElem
mapInput(f)ТрансформацияТрансформирует InElem
mapError(f)ТрансформацияТрансформирует OutErr
mapInputError(f)ТрансформацияТрансформирует InErr
as(value)ТрансформацияЗаменяет OutDone на константу
flatMap(f)МонадаПоследовательная цепочка
flattenМонадаУбирает вложенность
zip(ch2)АппликативКомбинирует результаты в кортеж
zipRight(ch2)АппликативРезультат правого
zipLeft(ch2)АппликативРезультат левого
pipeTo(downstream)КатегорияСоединяет output → input
compose(upstream)КатегорияОбратный pipeTo
catchAll(f)ОшибкиПерехват ошибок
catchAllCause(f)ОшибкиПерехват с Cause
orElse(alt)ОшибкиАльтернативный Channel
orDieОшибкиОшибка → дефект
ensuring(fin)РесурсыФинализатор
ensuringWith(f)РесурсыФинализатор с Exit
mergeWith(other)КонкурентностьПараллельное слияние
concatMap(f)КомпозицияРазвёртывание элементов
runИсполнениеЗапуск, игнорируя элементы
runCollectИсполнениеЗапуск, собирая элементы
runDrainИсполнениеЗапуск, отбрасывая всё

Примеры

💻 Пример 1: Цепочка трансформаций

import { Channel, Chunk, Effect, pipe } from "effect"

// Создаём пайплайн с несколькими трансформациями
const pipeline = pipe(
  // Эмитируем числа
  Channel.writeAll(
    Chunk.make(1, 2, 3),
    Chunk.make(4, 5, 6)
  ),
  // Трансформируем выходные элементы
  Channel.mapOut((chunk) =>
    Chunk.map(chunk, (n) => n * 2)
  ),
  // Добавляем строковое представление в результат
  Channel.map(() => "Processing complete")
)

const program = Effect.gen(function* () {
  const [chunks, done] = yield* Channel.runCollect(pipeline)
  console.log("Данные:", Chunk.toReadonlyArray(chunks))
  console.log("Результат:", done)
})

Effect.runPromise(program)
// Данные: [2, 4, 6, 8, 10, 12]
// Результат: "Processing complete"

💻 Пример 2: Обработка ошибок в пайплайне

import { Channel, Chunk, Effect, pipe } from "effect"

class ValidationError {
  readonly _tag = "ValidationError" as const
  constructor(readonly field: string, readonly message: string) {}
}

class ProcessingError {
  readonly _tag = "ProcessingError" as const
  constructor(readonly step: string, readonly cause: string) {}
}

// Channel, который может завершиться с ValidationError
const validateData = pipe(
  Channel.write(Chunk.make("valid-1", "valid-2")),
  Channel.flatMap(() =>
    Channel.fail(new ValidationError("email", "Invalid format"))
  )
)

// Оборачиваем ошибку в ProcessingError
const withMappedError = pipe(
  validateData,
  Channel.mapError(
    (err) => new ProcessingError("validation", `${err.field}: ${err.message}`)
  )
)

// Добавляем fallback
const withFallback = Channel.catchAll(withMappedError, (err) =>
  pipe(
    Channel.fromEffect(
      Effect.log(`Ошибка на шаге ${err.step}: ${err.cause}`)
    ),
    Channel.flatMap(() =>
      Channel.write(Chunk.make("default-value"))
    )
  )
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(withFallback)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Ошибка на шаге validation: email: Invalid format
// ["valid-1", "valid-2", "default-value"]

💻 Пример 3: flatMap для зависимых шагов

import { Channel, Chunk, Effect, pipe } from "effect"

// Симуляция: загрузка конфигурации, затем обработка данных
const configStep = pipe(
  Channel.fromEffect(
    Effect.succeed({ maxItems: 3, format: "json" as const })
  )
)

const dataProcessing = Channel.flatMap(configStep, (config) =>
  pipe(
    Channel.fromEffect(
      Effect.log(`Конфиг: maxItems=${config.maxItems}, format=${config.format}`)
    ),
    Channel.flatMap(() => {
      const items = Array.from(
        { length: config.maxItems },
        (_, i) => ({ id: i, format: config.format })
      )
      return Channel.write(Chunk.fromIterable(items))
    }),
    Channel.as(`Обработано ${config.maxItems} элементов`)
  )
)

const program = Effect.gen(function* () {
  const [chunks, summary] = yield* Channel.runCollect(dataProcessing)
  console.log("Данные:", Chunk.toReadonlyArray(chunks))
  console.log("Итог:", summary)
})

Effect.runPromise(program)
// Конфиг: maxItems=3, format=json
// Данные: [{ id: 0, format: "json" }, { id: 1, format: "json" }, { id: 2, format: "json" }]
// Итог: "Обработано 3 элементов"

Упражнения

🟢 Basic

Упражнение 1: Цепочка трансформаций

Создайте Channel, который эмитирует числа от 1 до 5 и используйте Channel.mapOut для их удвоения, а затем Channel.map для добавления суммы в результат.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make(1, 2, 3, 4, 5)),
  Channel.mapOut((chunk) => Chunk.map(chunk, (n) => n * 2)),
  Channel.map(() => {
    const sum = 2 + 4 + 6 + 8 + 10
    return `Сумма удвоенных: ${sum}`
  })
)

const program = Effect.gen(function* () {
  const [chunks, summary] = yield* Channel.runCollect(channel)
  console.log("Элементы:", Chunk.toReadonlyArray(chunks))
  console.log("Итог:", summary)
})

Effect.runPromise(program)
// Элементы: [2, 4, 6, 8, 10]
// Итог: "Сумма удвоенных: 30"

Упражнение 2: Обработка ошибок

Создайте Channel, который эмитирует [1, 2], затем завершается с ошибкой "timeout". Используйте catchAll для перехвата ошибки и эмиссии [0] как fallback.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const failing = pipe(
  Channel.write(Chunk.make(1, 2)),
  Channel.flatMap(() => Channel.fail("timeout" as const))
)

const recovered = Channel.catchAll(failing, (error) =>
  pipe(
    Channel.fromEffect(Effect.log(`Error: ${error}`)),
    Channel.flatMap(() => Channel.write(Chunk.make(0)))
  )
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(recovered)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Error: timeout
// [1, 2, 0]

🟡 Intermediate

Упражнение 3: Zip и комбинирование

Создайте два Channel: один эмитирует заголовки ["Name", "Age"], другой — данные ["Alice", "30"]. Используйте Channel.zip для их объединения. Результат — общее количество ячеек.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const headers = pipe(
  Channel.write(Chunk.make("Name", "Age")),
  Channel.as(2)
)

const data = pipe(
  Channel.write(Chunk.make("Alice", "30")),
  Channel.as(2)
)

const table = pipe(
  Channel.zip(headers, data),
  Channel.map(([headerCount, dataCount]) =>
    `Таблица: ${headerCount} заголовков, ${dataCount} данных`
  )
)

const program = Effect.gen(function* () {
  const [chunks, summary] = yield* Channel.runCollect(table)
  console.log("Ячейки:", Chunk.toReadonlyArray(chunks))
  console.log("Итог:", summary)
})

Effect.runPromise(program)
// Ячейки: ["Name", "Age", "Alice", "30"]
// Итог: "Таблица: 2 заголовков, 2 данных"

🔴 Advanced

Упражнение 4: Retry с экспоненциальным backoff

Реализуйте Channel retryChannel, который пытается выполнить Effect до 3 раз с экспоненциальным backoff (100ms, 200ms, 400ms). При успехе — эмитирует результат. При исчерпании попыток — завершается с ошибкой.

Решение:

import { Channel, Chunk, Duration, Effect, pipe } from "effect"

const retryChannel = <A, E>(
  effect: Effect.Effect<A, E>,
  maxRetries: number = 3,
  baseDelay: number = 100,
  attempt: number = 0
): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void> =>
  Channel.unwrap(
    Effect.matchEffect(effect, {
      onSuccess: (value) =>
        Effect.succeed(Channel.write(Chunk.of(value))),
      onFailure: (error) => {
        if (attempt >= maxRetries) {
          return Effect.succeed(Channel.fail(error))
        }
        const delay = baseDelay * Math.pow(2, attempt)
        return pipe(
          Effect.log(`Попытка ${attempt + 1} неудачна, ждём ${delay}ms`),
          Effect.flatMap(() => Effect.sleep(Duration.millis(delay))),
          Effect.map(() =>
            retryChannel(effect, maxRetries, baseDelay, attempt + 1)
          )
        )
      }
    })
  )

// Тест: Effect, который успешен на 3-й попытке
let counter = 0
const unstableEffect = Effect.sync(() => {
  counter++
  if (counter < 3) throw new Error(`Attempt ${counter} failed`)
  return `Success on attempt ${counter}`
}).pipe(
  Effect.catchAllDefect((defect) =>
    Effect.fail(defect instanceof Error ? defect.message : "Unknown error")
  )
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(
    retryChannel(unstableEffect)
  )
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Попытка 1 неудачна, ждём 100ms
// Попытка 2 неудачна, ждём 200ms
// ["Success on attempt 3"]

🔗 Связанные темы: 02-creating-channels.md, 04-pipelines.md, Модуль 02 (Error Handling)