Операции над Channel
Полный обзор операций трансформации, композиции, обработки ошибок и управления ресурсами для Channel. Эти операции — строительные блоки любых потоковых пайплайнов.
Теория
Алгебра операций Channel
Операции над Channel образуют алгебру, позволяющую строить сложные потоковые пайплайны из простых компонентов. Эта алгебра включает:
-
Функториальные операции —
map,mapOut,mapErrorи др. Они трансформируют значения в каналах, не меняя структуру вычисления. -
Монадические операции —
flatMap,flatten. Они позволяют строить последовательности Channel, где следующий шаг зависит от результата предыдущего. -
Аппликативные операции —
zip,zipRight,zipLeft. Они комбинируют результаты нескольких Channel. -
Операции обработки ошибок —
catchAll,orElse,mapError. Они управляют ошибками. -
Категорные операции —
pipeTo,compose. Они соединяют Channel в пайплайны.
Уровни композиции:
map / mapOut — трансформация значений (Functor)
flatMap — последовательная цепочка (Monad)
zip / zipRight — параллельная комбинация (Applicative)
pipeTo — соединение в пайплайн (Category/Arrow)
catchAll / orElse — обработка ошибок (MonadError)
Законы функтора для Channel
Операция map на Channel подчиняется законам функтора:
- Тождество:
Channel.map(channel, identity) ≡ channel - Композиция:
Channel.map(Channel.map(channel, f), g) ≡ Channel.map(channel, x => g(f(x)))
Аналогичные законы действуют для монадических операций (flatMap) и других комбинаторов.
Трансформации
Channel.map — трансформация результата
Channel.map применяет функцию к финальному значению Channel (OutDone). Это трансформация результата, а не элементов.
import { Channel, Effect, pipe } from "effect"
const channel = pipe(
Channel.succeed(42),
Channel.map((n) => `Результат: ${n}`)
)
// Channel<never, unknown, never, unknown, string, unknown, never>
const program = Effect.gen(function* () {
const [_, value] = yield* Channel.runCollect(channel)
console.log(value) // "Результат: 42"
})
Effect.runPromise(program)
Channel.mapOut — трансформация выходных элементов
Channel.mapOut применяет функцию к каждому эмитируемому элементу. Это аналог Stream.map на уровне Channel.
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.mapOut((chunk) => Chunk.map(chunk, (n) => n * 10))
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(channel)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [10, 20, 30]
⚠️ mapOut трансформирует OutElem целиком. Если OutElem — это Chunk<number>, то функция получает весь Chunk, а не отдельные числа.
Channel.mapInput — трансформация входных элементов
Channel.mapInput трансформирует входные элементы перед их обработкой Channel. Это контравариантная операция — она “оборачивает” вход.
import { Channel, Chunk, Effect } from "effect"
// Channel, ожидающий строки
const stringProcessor = Channel.readWith({
onInput: (chunk: Chunk.Chunk<string>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, (s) => s.toUpperCase())),
() => stringProcessor
),
onFailure: (err: never) => Channel.fail(err),
onDone: (done: void) => Channel.succeed(done)
})
// Адаптируем вход: принимаем числа, конвертируем в строки
const numberToString = Channel.mapInput(
stringProcessor,
(chunk: Chunk.Chunk<number>) => Chunk.map(chunk, (n) => String(n))
)
// Теперь Channel принимает Chunk<number> вместо Chunk<string>
Channel.mapError — трансформация ошибок
Channel.mapError трансформирует ошибку Channel. Полезно для унификации типов ошибок при композиции.
import { Channel, Effect, pipe } from "effect"
class RawError {
readonly _tag = "RawError" as const
constructor(readonly code: number) {}
}
class AppError {
readonly _tag = "AppError" as const
constructor(readonly message: string) {}
}
const rawChannel = Channel.fail(new RawError(404))
const mappedChannel = pipe(
rawChannel,
Channel.mapError((err) => new AppError(`Error code: ${err.code}`))
)
// Channel<never, unknown, AppError, unknown, never, unknown, never>
Channel.mapInputError — трансформация входных ошибок
Channel.mapInputError трансформирует ошибки, получаемые от upstream.
import { Channel } from "effect"
// Адаптируем входную ошибку
const adapted = Channel.mapInputError(
Channel.readWith({
onInput: (chunk: unknown) => Channel.write(chunk),
onFailure: (err: string) => Channel.fail(`Transformed: ${err}`),
onDone: (done: unknown) => Channel.succeed(done)
}),
(rawErr: number) => `Error #${rawErr}` // number → string
)
Channel.as — замена результата константой
Channel.as заменяет финальное значение Channel на указанную константу.
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.as("completed" as const)
)
const program = Effect.gen(function* () {
const [_, done] = yield* Channel.runCollect(channel)
console.log(done) // "completed"
})
Effect.runPromise(program)
Монадические операции
Channel.flatMap — последовательная композиция
Channel.flatMap — основная монадическая операция. Она принимает результат первого Channel (OutDone) и создаёт следующий Channel на его основе.
import { Channel, Chunk, Effect, pipe } from "effect"
// Первый Channel эмитирует данные и возвращает их количество
const step1 = pipe(
Channel.write(Chunk.make("a", "b", "c")),
Channel.as(3)
)
// Второй Channel зависит от результата первого
const pipeline = Channel.flatMap(step1, (count) =>
Channel.flatMap(
Channel.fromEffect(
Effect.log(`Первый шаг обработал ${count} элементов`)
),
() => Channel.write(Chunk.make("d", "e"))
)
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// timestamp=... message="Первый шаг обработал 3 элементов"
// ["a", "b", "c", "d", "e"]
Channel.flatten — убирает один уровень вложенности
Channel.flatten разворачивает Channel<..., Channel<...>> в плоский Channel. Это эквивалент Channel.flatMap(channel, identity).
import { Channel, Chunk, Effect } from "effect"
// Channel, результат которого — другой Channel
const nested = Channel.succeed(
Channel.write(Chunk.make(1, 2, 3))
)
// Channel<never, unknown, never, unknown, Channel<Chunk<number>>, unknown, never>
const flat = Channel.flatten(nested)
// Channel<Chunk<number>, unknown, never, unknown, void, unknown, never>
Channel.zip — комбинация двух Channel
Channel.zip выполняет два Channel последовательно и возвращает кортеж их результатов.
import { Channel, Chunk, Effect, pipe } from "effect"
const ch1 = pipe(
Channel.write(Chunk.make(1, 2)),
Channel.as("first" as const)
)
const ch2 = pipe(
Channel.write(Chunk.make(3, 4)),
Channel.as("second" as const)
)
const combined = Channel.zip(ch1, ch2)
const program = Effect.gen(function* () {
const [chunks, [a, b]] = yield* Channel.runCollect(combined)
console.log("Элементы:", Chunk.toReadonlyArray(chunks))
console.log("Результаты:", a, b)
})
Effect.runPromise(program)
// Элементы: [1, 2, 3, 4]
// Результаты: "first" "second"
Channel.zipRight / Channel.zipLeft
Channel.zipRight (*>) выполняет оба Channel, но возвращает только результат второго. Channel.zipLeft (<*) — только результат первого.
import { Channel, Chunk, Effect, pipe } from "effect"
// Выполнить оба, вернуть результат второго
const right = Channel.zipRight(
pipe(Channel.write(Chunk.make(1, 2)), Channel.as("ignored")),
pipe(Channel.write(Chunk.make(3, 4)), Channel.as("kept"))
)
const program = Effect.gen(function* () {
const [chunks, done] = yield* Channel.runCollect(right)
console.log("Элементы:", Chunk.toReadonlyArray(chunks))
console.log("Результат:", done) // "kept"
})
Effect.runPromise(program)
Обработка ошибок
Channel.catchAll — перехват всех ошибок
Channel.catchAll перехватывает ошибку Channel и переключается на альтернативный Channel.
import { Channel, Chunk, Effect, pipe } from "effect"
const failing = pipe(
Channel.write(Chunk.make(1, 2)),
Channel.flatMap(() => Channel.fail("Oops!" as const))
)
const recovered = Channel.catchAll(failing, (error) =>
pipe(
Channel.fromEffect(Effect.log(`Ошибка: ${error}, переключаемся`)),
Channel.flatMap(() => Channel.write(Chunk.make(99)))
)
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(recovered)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// Ошибка: Oops!, переключаемся
// [1, 2, 99]
Channel.catchAllCause — перехват с полной Cause
Channel.catchAllCause даёт доступ к полной причине сбоя, включая дефекты и прерывания.
import { Channel, Cause, Chunk, Effect } from "effect"
const withDefect = Channel.flatMap(
Channel.write(Chunk.make(1)),
() => Channel.failCause(Cause.die(new Error("Unexpected")))
)
const recovered = Channel.catchAllCause(withDefect, (cause) =>
Channel.flatMap(
Channel.fromEffect(
Effect.log(`Cause: ${Cause.pretty(cause)}`)
),
() => Channel.write(Chunk.make(-1))
)
)
Channel.orElse — альтернативный Channel
Channel.orElse предоставляет альтернативный Channel на случай ошибки. В отличие от catchAll, ошибка не передаётся в обработчик.
import { Channel, Chunk, Effect } from "effect"
const primary = Channel.fail("Primary failed" as const)
const fallback = Channel.write(Chunk.make("fallback-data"))
const withFallback = Channel.orElse(primary, () => fallback)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(withFallback)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// ["fallback-data"]
Channel.orDie — конвертация ошибки в дефект
Channel.orDie преобразует типизированную ошибку в дефект (непредвиденную ошибку). Это удаляет ошибку из типа Channel, но при её возникновении программа аварийно завершится.
import { Channel, Effect, pipe } from "effect"
const mayFail: Channel.Channel<never, unknown, string, unknown, number> =
Channel.fail("Something failed")
const neverFails = pipe(mayFail, Channel.orDie)
// Channel<never, unknown, never, unknown, number, unknown, never>
// ▲
// OutErr теперь never
Конкурентные операции
Channel.mergeWith — слияние двух Channel
Channel.mergeWith запускает два Channel параллельно и объединяет их результаты. Элементы от обоих Channel чередуются в зависимости от того, кто первым произвёл данные.
import { Channel, Chunk, Effect } from "effect"
const ch1 = Channel.write(Chunk.make("a", "b"))
const ch2 = Channel.write(Chunk.make(1, 2))
const merged = Channel.mergeWith(ch1, {
other: ch2,
onSelfDone: (exit) => (fiber) => fiber,
onOtherDone: (exit) => (fiber) => fiber
})
Channel.concatMap — последовательное развёртывание
Channel.concatMap применяет функцию к каждому элементу Channel, получая новый Channel для каждого элемента, и конкатенирует их последовательно.
import { Channel, Chunk, Effect, pipe } from "effect"
const source = Channel.writeAll(
Chunk.of(1),
Chunk.of(2),
Chunk.of(3)
)
// Для каждого элемента создаём подканал
const expanded = Channel.concatMap(source, (chunk) => {
const n = Chunk.unsafeHead(chunk)
return Channel.write(Chunk.make(n, n * 10, n * 100))
})
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(expanded)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [1, 10, 100, 2, 20, 200, 3, 30, 300]
Запуск и исполнение
Channel.run — запуск без сбора
Channel.run запускает Channel, игнорируя все эмитируемые элементы, и возвращает финальное значение.
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2, 3)), // эти элементы игнорируются
Channel.as("done" as const)
)
const program = Effect.gen(function* () {
const result = yield* Channel.run(channel)
console.log(result) // "done"
})
Effect.runPromise(program)
Channel.runCollect — запуск со сбором
Channel.runCollect запускает Channel и собирает все эмитируемые элементы в Chunk, а также возвращает финальное значение.
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2)),
Channel.flatMap(() => Channel.write(Chunk.make(3))),
Channel.as("complete")
)
const program = Effect.gen(function* () {
const [elements, done] = yield* Channel.runCollect(channel)
console.log("Элементы:", Chunk.toReadonlyArray(elements)) // [1, 2, 3]
console.log("Результат:", done) // "complete"
})
Effect.runPromise(program)
Channel.runDrain — запуск с отбрасыванием
Channel.runDrain запускает Channel, отбрасывая все элементы и результат. Полезно, когда важны только побочные эффекты.
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.fromEffect(Effect.log("Шаг 1")),
Channel.flatMap(() => Channel.write(Chunk.make(1, 2, 3))),
Channel.flatMap(() => Channel.fromEffect(Effect.log("Шаг 2")))
)
Effect.runPromise(Channel.runDrain(channel))
// timestamp=... message="Шаг 1"
// timestamp=... message="Шаг 2"
API Reference
Сводная таблица операций
| Операция | Категория | Описание |
|---|---|---|
map(f) | Трансформация | Трансформирует OutDone |
mapOut(f) | Трансформация | Трансформирует OutElem |
mapInput(f) | Трансформация | Трансформирует InElem |
mapError(f) | Трансформация | Трансформирует OutErr |
mapInputError(f) | Трансформация | Трансформирует InErr |
as(value) | Трансформация | Заменяет OutDone на константу |
flatMap(f) | Монада | Последовательная цепочка |
flatten | Монада | Убирает вложенность |
zip(ch2) | Аппликатив | Комбинирует результаты в кортеж |
zipRight(ch2) | Аппликатив | Результат правого |
zipLeft(ch2) | Аппликатив | Результат левого |
pipeTo(downstream) | Категория | Соединяет output → input |
compose(upstream) | Категория | Обратный pipeTo |
catchAll(f) | Ошибки | Перехват ошибок |
catchAllCause(f) | Ошибки | Перехват с Cause |
orElse(alt) | Ошибки | Альтернативный Channel |
orDie | Ошибки | Ошибка → дефект |
ensuring(fin) | Ресурсы | Финализатор |
ensuringWith(f) | Ресурсы | Финализатор с Exit |
mergeWith(other) | Конкурентность | Параллельное слияние |
concatMap(f) | Композиция | Развёртывание элементов |
run | Исполнение | Запуск, игнорируя элементы |
runCollect | Исполнение | Запуск, собирая элементы |
runDrain | Исполнение | Запуск, отбрасывая всё |
Примеры
💻 Пример 1: Цепочка трансформаций
import { Channel, Chunk, Effect, pipe } from "effect"
// Создаём пайплайн с несколькими трансформациями
const pipeline = pipe(
// Эмитируем числа
Channel.writeAll(
Chunk.make(1, 2, 3),
Chunk.make(4, 5, 6)
),
// Трансформируем выходные элементы
Channel.mapOut((chunk) =>
Chunk.map(chunk, (n) => n * 2)
),
// Добавляем строковое представление в результат
Channel.map(() => "Processing complete")
)
const program = Effect.gen(function* () {
const [chunks, done] = yield* Channel.runCollect(pipeline)
console.log("Данные:", Chunk.toReadonlyArray(chunks))
console.log("Результат:", done)
})
Effect.runPromise(program)
// Данные: [2, 4, 6, 8, 10, 12]
// Результат: "Processing complete"
💻 Пример 2: Обработка ошибок в пайплайне
import { Channel, Chunk, Effect, pipe } from "effect"
class ValidationError {
readonly _tag = "ValidationError" as const
constructor(readonly field: string, readonly message: string) {}
}
class ProcessingError {
readonly _tag = "ProcessingError" as const
constructor(readonly step: string, readonly cause: string) {}
}
// Channel, который может завершиться с ValidationError
const validateData = pipe(
Channel.write(Chunk.make("valid-1", "valid-2")),
Channel.flatMap(() =>
Channel.fail(new ValidationError("email", "Invalid format"))
)
)
// Оборачиваем ошибку в ProcessingError
const withMappedError = pipe(
validateData,
Channel.mapError(
(err) => new ProcessingError("validation", `${err.field}: ${err.message}`)
)
)
// Добавляем fallback
const withFallback = Channel.catchAll(withMappedError, (err) =>
pipe(
Channel.fromEffect(
Effect.log(`Ошибка на шаге ${err.step}: ${err.cause}`)
),
Channel.flatMap(() =>
Channel.write(Chunk.make("default-value"))
)
)
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(withFallback)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// Ошибка на шаге validation: email: Invalid format
// ["valid-1", "valid-2", "default-value"]
💻 Пример 3: flatMap для зависимых шагов
import { Channel, Chunk, Effect, pipe } from "effect"
// Симуляция: загрузка конфигурации, затем обработка данных
const configStep = pipe(
Channel.fromEffect(
Effect.succeed({ maxItems: 3, format: "json" as const })
)
)
const dataProcessing = Channel.flatMap(configStep, (config) =>
pipe(
Channel.fromEffect(
Effect.log(`Конфиг: maxItems=${config.maxItems}, format=${config.format}`)
),
Channel.flatMap(() => {
const items = Array.from(
{ length: config.maxItems },
(_, i) => ({ id: i, format: config.format })
)
return Channel.write(Chunk.fromIterable(items))
}),
Channel.as(`Обработано ${config.maxItems} элементов`)
)
)
const program = Effect.gen(function* () {
const [chunks, summary] = yield* Channel.runCollect(dataProcessing)
console.log("Данные:", Chunk.toReadonlyArray(chunks))
console.log("Итог:", summary)
})
Effect.runPromise(program)
// Конфиг: maxItems=3, format=json
// Данные: [{ id: 0, format: "json" }, { id: 1, format: "json" }, { id: 2, format: "json" }]
// Итог: "Обработано 3 элементов"
Упражнения
🟢 Basic
Упражнение 1: Цепочка трансформаций
Создайте Channel, который эмитирует числа от 1 до 5 и используйте Channel.mapOut для их удвоения, а затем Channel.map для добавления суммы в результат.
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const channel = pipe(
Channel.write(Chunk.make(1, 2, 3, 4, 5)),
Channel.mapOut((chunk) => Chunk.map(chunk, (n) => n * 2)),
Channel.map(() => {
const sum = 2 + 4 + 6 + 8 + 10
return `Сумма удвоенных: ${sum}`
})
)
const program = Effect.gen(function* () {
const [chunks, summary] = yield* Channel.runCollect(channel)
console.log("Элементы:", Chunk.toReadonlyArray(chunks))
console.log("Итог:", summary)
})
Effect.runPromise(program)
// Элементы: [2, 4, 6, 8, 10]
// Итог: "Сумма удвоенных: 30"
Упражнение 2: Обработка ошибок
Создайте Channel, который эмитирует [1, 2], затем завершается с ошибкой "timeout". Используйте catchAll для перехвата ошибки и эмиссии [0] как fallback.
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const failing = pipe(
Channel.write(Chunk.make(1, 2)),
Channel.flatMap(() => Channel.fail("timeout" as const))
)
const recovered = Channel.catchAll(failing, (error) =>
pipe(
Channel.fromEffect(Effect.log(`Error: ${error}`)),
Channel.flatMap(() => Channel.write(Chunk.make(0)))
)
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(recovered)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// Error: timeout
// [1, 2, 0]
🟡 Intermediate
Упражнение 3: Zip и комбинирование
Создайте два Channel: один эмитирует заголовки ["Name", "Age"], другой — данные ["Alice", "30"]. Используйте Channel.zip для их объединения. Результат — общее количество ячеек.
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const headers = pipe(
Channel.write(Chunk.make("Name", "Age")),
Channel.as(2)
)
const data = pipe(
Channel.write(Chunk.make("Alice", "30")),
Channel.as(2)
)
const table = pipe(
Channel.zip(headers, data),
Channel.map(([headerCount, dataCount]) =>
`Таблица: ${headerCount} заголовков, ${dataCount} данных`
)
)
const program = Effect.gen(function* () {
const [chunks, summary] = yield* Channel.runCollect(table)
console.log("Ячейки:", Chunk.toReadonlyArray(chunks))
console.log("Итог:", summary)
})
Effect.runPromise(program)
// Ячейки: ["Name", "Age", "Alice", "30"]
// Итог: "Таблица: 2 заголовков, 2 данных"
🔴 Advanced
Упражнение 4: Retry с экспоненциальным backoff
Реализуйте Channel retryChannel, который пытается выполнить Effect до 3 раз с экспоненциальным backoff (100ms, 200ms, 400ms). При успехе — эмитирует результат. При исчерпании попыток — завершается с ошибкой.
Решение:
import { Channel, Chunk, Duration, Effect, pipe } from "effect"
const retryChannel = <A, E>(
effect: Effect.Effect<A, E>,
maxRetries: number = 3,
baseDelay: number = 100,
attempt: number = 0
): Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void> =>
Channel.unwrap(
Effect.matchEffect(effect, {
onSuccess: (value) =>
Effect.succeed(Channel.write(Chunk.of(value))),
onFailure: (error) => {
if (attempt >= maxRetries) {
return Effect.succeed(Channel.fail(error))
}
const delay = baseDelay * Math.pow(2, attempt)
return pipe(
Effect.log(`Попытка ${attempt + 1} неудачна, ждём ${delay}ms`),
Effect.flatMap(() => Effect.sleep(Duration.millis(delay))),
Effect.map(() =>
retryChannel(effect, maxRetries, baseDelay, attempt + 1)
)
)
}
})
)
// Тест: Effect, который успешен на 3-й попытке
let counter = 0
const unstableEffect = Effect.sync(() => {
counter++
if (counter < 3) throw new Error(`Attempt ${counter} failed`)
return `Success on attempt ${counter}`
}).pipe(
Effect.catchAllDefect((defect) =>
Effect.fail(defect instanceof Error ? defect.message : "Unknown error")
)
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(
retryChannel(unstableEffect)
)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// Попытка 1 неудачна, ждём 100ms
// Попытка 2 неудачна, ждём 200ms
// ["Success on attempt 3"]
🔗 Связанные темы: 02-creating-channels.md, 04-pipelines.md, Модуль 02 (Error Handling)