Построение пайплайнов
Пайплайны Channel — это способ соединять producers, transformers и consumers в единые потоковые конвейеры. Операция pipeTo — сердце потоковой архитектуры Effect.
Теория
Что такое пайплайн Channel
Пайплайн — это последовательность Channel, соединённых так, что выход одного становится входом следующего. Это фундаментальный паттерн потоковой обработки в Effect.
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Producer │────►│ Transformer │────►│ Consumer │
│ (Source) │ │ (Pipe) │ │ (Sink) │
└──────────┘ └──────────────┘ └──────────┘
OutElem ──────► InElem→OutElem ────► InElem
OutErr ──────► InErr→OutErr ────► InErr
OutDone ──────► InDone→OutDone ───► InDone
Пайплайн обладает следующими свойствами:
- Композируемость — любые два совместимых Channel можно соединить
- Ленивость — данные обрабатываются по требованию downstream
- Безопасность ресурсов — финализаторы всех ступеней гарантированно выполняются
- Типобезопасность — несовместимые Channel нельзя соединить (ошибка компиляции)
Направление потока данных
В пайплайне Channel данные текут слева направо (от producer к consumer), а запросы на данные (pull) — справа налево (от consumer к producer):
Поток данных: ──────────────────────────►
Producer → Pipe → Consumer
Запросы (pull): ◄──────────────────────────
Producer ← Pipe ← Consumer
Consumer “тянет” данные: когда ему нужен следующий чанк, он запрашивает его у предшествующего Pipe, который, в свою очередь, запрашивает у Producer. Это обеспечивает автоматический backpressure.
Концепция ФП
Категорная композиция
Channel.pipeTo реализует категорную композицию (composition of arrows). В теории категорий, если:
f: A → B(Channel из A в B)g: B → C(Channel из B в C)
то g ∘ f: A → C (пайплайн из A в C).
В Effect-ts это записывается как:
// Channel.pipeTo(upstream, downstream)
// аналог: downstream ∘ upstream
// или Channel.compose(downstream, upstream)
// аналог: downstream ∘ upstream
Это удовлетворяет законам категории:
- Ассоциативность:
pipeTo(pipeTo(a, b), c) ≡ pipeTo(a, pipeTo(b, c)) - Единица:
pipeTo(a, identity) ≡ a ≡ pipeTo(identity, a)
Где identity — это Channel, который просто пересылает данные без изменений.
Coroutines и кооперативная многозадачность
Пайплайн Channel работает как система сопрограмм (coroutines). Каждый Channel в пайплайне — это coroutine, которая:
- Выполняет работу, пока не нужны данные (emit/read)
- Передаёт управление соседнему Channel
- Получает управление обратно, когда данные готовы
Это форма кооперативной многозадачности без потоков и переключения контекста.
Producer Pipe Consumer
│ │
│◄─── pull ──────────── pull ────│
│ │
├─── emit(Chunk) ───► │
│ ├─── emit(Chunk') ──►
│ │ │
│◄─── pull ──────┤ │
│ │
├─── done ───────► │
│ ├─── done ────►
Channel.pipeTo — основа пайплайнов
Базовое использование
Channel.pipeTo соединяет выход upstream с входом downstream. Типы должны быть совместимы:
OutElemupstream →InElemdownstreamOutErrupstream →InErrdownstreamOutDoneupstream →InDonedownstream
import { Channel, Chunk, Effect, pipe } from "effect"
// Producer: эмитирует чанки чисел
const producer = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
)
// Transformer: удваивает каждый элемент
const doubler: Channel.Channel<
Chunk.Chunk<number>,
Chunk.Chunk<number>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, (n) => n * 2)),
() => doubler
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// Соединяем в пайплайн
const pipeline = Channel.pipeTo(producer, doubler)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// [2, 4, 6, 8, 10, 12]
Типы в pipeTo
import { Channel, Chunk } from "effect"
// Producer:
// Channel<Chunk<number>, unknown, string, unknown, boolean, unknown, never>
// OutElem InElem OutErr InErr OutDone InDone Env
// Transformer (downstream):
// Channel<Chunk<string>, Chunk<number>, Error, string, number, boolean, never>
// OutElem InElem OutErr InErr OutDone InDone Env
// ▲ ▲ ▲
// = upstream OutElem = upstream OutErr = upstream OutDone
// Результат pipeTo:
// Channel<Chunk<string>, unknown, Error, unknown, number, unknown, never>
// ▲ ▲ ▲
// downstream OutElem downstream OutErr downstream OutDone
//
// InElem, InErr, InDone берутся от upstream
// OutElem, OutErr, OutDone берутся от downstream
Channel.compose — обратный pipeTo
Channel.compose делает то же, что и pipeTo, но с обратным порядком аргументов.
import { Channel, Chunk, Effect } from "effect"
// compose(downstream, upstream) ≡ pipeTo(upstream, downstream)
const producer = Channel.write(Chunk.make(1, 2, 3))
const transformer: Channel.Channel<
Chunk.Chunk<string>,
Chunk.Chunk<number>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, (n) => `item-${n}`)),
() => transformer
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// Оба варианта эквивалентны:
const via1 = Channel.pipeTo(producer, transformer)
const via2 = Channel.compose(transformer, producer)
Многоступенчатые пайплайны
Цепочка из нескольких трансформеров
Пайплайны могут состоять из произвольного числа ступеней. Каждая ступень — это Channel-трансформер.
import { Channel, Chunk, Effect, pipe } from "effect"
// Утилита для создания трансформера
const mapChannel = <A, B>(
f: (a: A) => B
): Channel.Channel<
Chunk.Chunk<B>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<B>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, f)),
() => go
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
const filterChannel = <A>(
predicate: (a: A) => boolean
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
const filtered = Chunk.filter(chunk, predicate)
return Chunk.isEmpty(filtered)
? go
: Channel.flatMap(Channel.write(filtered), () => go)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
// Producer
const source = pipe(
Channel.write(Chunk.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
)
// Пайплайн: source → filter(чётные) → map(*10) → map(toString)
const pipeline = pipe(
source,
(ch) => Channel.pipeTo(ch, filterChannel<number>((n) => n % 2 === 0)),
(ch) => Channel.pipeTo(ch, mapChannel<number, number>((n) => n * 10)),
(ch) => Channel.pipeTo(ch, mapChannel<number, string>((n) => `value: ${n}`))
)
const program = Effect.gen(function* () {
const [chunks, _] = yield* Channel.runCollect(pipeline)
console.log(Chunk.toReadonlyArray(chunks))
})
Effect.runPromise(program)
// ["value: 20", "value: 40", "value: 60", "value: 80", "value: 100"]
Пайплайн с аккумулятором (Consumer)
Consumer — это финальная ступень пайплайна, которая собирает результат.
import { Channel, Chunk, Effect, pipe } from "effect"
// Consumer: подсчитывает сумму всех элементов
const sumConsumer = (
acc: number = 0
): Channel.Channel<
never, // не эмитирует
Chunk.Chunk<number>, // принимает чанки чисел
never,
never,
number, // результат — сумма
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) => {
const chunkSum = Chunk.reduce(chunk, 0, (a, b) => a + b)
return sumConsumer(acc + chunkSum)
},
onFailure: (err) => Channel.fail(err),
onDone: (_) => Channel.succeed(acc)
})
// Producer → Consumer
const source = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
)
const pipeline = Channel.pipeTo(source, sumConsumer())
const program = Effect.gen(function* () {
const result = yield* Channel.run(pipeline)
console.log("Сумма:", result)
})
Effect.runPromise(program)
// Сумма: 21
Паттерны построения пайплайнов
Паттерн 1: Identity Channel
Identity Channel пересылает данные без изменений. Полезен как нейтральный элемент в комбинациях.
import { Channel, Chunk } from "effect"
const identity = <A>(): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(Channel.write(chunk), () => go),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
// pipeTo(source, identity()) ≡ source
Паттерн 2: Tap (наблюдение без изменений)
Tap Channel наблюдает за данными (выполняет побочный эффект), но пересылает их без изменений.
import { Channel, Chunk, Effect } from "effect"
const tap = <A>(
f: (chunk: Chunk.Chunk<A>) => Effect.Effect<void>
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(
Channel.fromEffect(f(chunk)),
() => Channel.flatMap(Channel.write(chunk), () => go)
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
// Использование:
// pipeTo(source, tap((chunk) => Effect.log(`Got ${Chunk.size(chunk)} items`)))
Паттерн 3: Stateful трансформер
Трансформер с внутренним состоянием, передаваемым через рекурсию.
import { Channel, Chunk, Effect } from "effect"
// Нумерация элементов: каждый чанк получает глобальный порядковый номер
const enumerate = <A>(
startIndex: number = 0
): Channel.Channel<
Chunk.Chunk<readonly [number, A]>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go = (
idx: number
): Channel.Channel<
Chunk.Chunk<readonly [number, A]>,
Chunk.Chunk<A>,
never,
never,
void,
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
let currentIdx = idx
const enumerated = Chunk.map(chunk, (a) => {
const pair = [currentIdx, a] as const
currentIdx++
return pair
})
return Channel.flatMap(
Channel.write(enumerated),
() => go(currentIdx)
)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go(startIndex)
}
Паттерн 4: Buffered Channel
Channel с внутренним буфером, который собирает элементы до определённого размера.
import { Channel, Chunk, Effect } from "effect"
const buffer = <A>(
maxSize: number
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go = (
buf: Chunk.Chunk<A>
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
const combined = Chunk.appendAll(buf, chunk)
if (Chunk.size(combined) >= maxSize) {
const [flush, rest] = Chunk.splitAt(combined, maxSize)
return Channel.flatMap(
Channel.write(flush),
() => go(rest)
)
}
return go(combined)
},
onFailure: (err) => Channel.fail(err),
onDone: (_) =>
Chunk.isEmpty(buf)
? Channel.void
: Channel.write(buf) // flush remaining
})
return go(Chunk.empty())
}
Паттерн 5: Early termination
Channel, который прекращает обработку при выполнении условия.
import { Channel, Chunk, Effect } from "effect"
const takeWhileChannel = <A>(
predicate: (a: A) => boolean
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
const taken = Chunk.takeWhile(chunk, predicate)
if (Chunk.size(taken) < Chunk.size(chunk)) {
// Нашли элемент, не удовлетворяющий условию — завершаемся
return Chunk.isEmpty(taken)
? Channel.void
: Channel.write(taken)
}
// Весь чанк прошёл — продолжаем
return Channel.flatMap(
Channel.write(taken),
() => takeWhileChannel(predicate)
)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
Интеграция с Stream и Sink
Stream → Channel → Stream
Основной сценарий: опускаемся на уровень Channel для реализации кастомной логики, затем возвращаемся на уровень Stream.
import { Channel, Chunk, Effect, Stream, pipe } from "effect"
// Кастомный Stream-оператор через Channel
const deduplicateAdjacent = <A>(
stream: Stream.Stream<A>
): Stream.Stream<A> => {
// Опускаемся на уровень Channel
const sourceChannel = Stream.toChannel(stream)
// Дедупликатор на уровне Channel
const dedup = (
last: A | undefined
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
unknown,
unknown
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
const result: Array<A> = []
let current = last
for (const item of chunk) {
if (item !== current) {
result.push(item)
current = item
}
}
const outChunk = Chunk.fromIterable(result)
return Chunk.isEmpty(outChunk)
? dedup(current)
: Channel.flatMap(
Channel.write(outChunk),
() => dedup(current)
)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// Соединяем и возвращаемся на уровень Stream
return Stream.fromChannel(
Channel.pipeTo(sourceChannel, dedup(undefined))
)
}
// Использование
const stream = Stream.make(1, 1, 2, 2, 2, 3, 1, 1, 4, 4)
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(
deduplicateAdjacent(stream)
)
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// [1, 2, 3, 1, 4]
Stream.pipeThroughChannel
Effect предоставляет Stream.pipeThroughChannel для прямого подключения Stream к Channel-трансформеру без ручной конвертации.
import { Channel, Chunk, Effect, Stream } from "effect"
// Channel-трансформер
const toUpperCase: Channel.Channel<
Chunk.Chunk<string>,
Chunk.Chunk<string>,
never,
never,
unknown,
unknown
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<string>) =>
Channel.flatMap(
Channel.write(Chunk.map(chunk, (s) => s.toUpperCase())),
() => toUpperCase
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
const stream = Stream.make("hello", "world", "effect")
const result = Stream.pipeThroughChannel(stream, toUpperCase)
const program = Effect.gen(function* () {
const collected = yield* Stream.runCollect(result)
console.log(Chunk.toReadonlyArray(collected))
})
Effect.runPromise(program)
// ["HELLO", "WORLD", "EFFECT"]
Полный пайплайн: Stream → Channel Pipeline → Sink
import { Channel, Chunk, Effect, Sink, Stream, pipe } from "effect"
// 1. Source Stream
const source = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
// 2. Channel-трансформер (фильтр + маппинг)
const transformer: Channel.Channel<
Chunk.Chunk<string>,
Chunk.Chunk<number>,
never,
never,
unknown,
unknown
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) => {
const processed = pipe(
chunk,
Chunk.filter((n) => n % 2 === 0),
Chunk.map((n) => `item-${n}`)
)
return Chunk.isEmpty(processed)
? transformer
: Channel.flatMap(Channel.write(processed), () => transformer)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// 3. Подключаем трансформер к Stream
const transformed = Stream.pipeThroughChannel(source, transformer)
// 4. Запускаем через Sink
const program = Effect.gen(function* () {
const result = yield* Stream.run(
transformed,
Sink.collectAll<string>()
)
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// ["item-2", "item-4", "item-6", "item-8", "item-10"]
Production-паттерны
Rate-limiting пайплайн
import { Channel, Chunk, Duration, Effect, pipe } from "effect"
// Трансформер с rate limiting: задержка между чанками
const rateLimiter = <A>(
delay: Duration.DurationInput
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(
Channel.fromEffect(Effect.sleep(delay)),
() => Channel.flatMap(Channel.write(chunk), () => go)
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
Metrics-collecting пайплайн
import { Channel, Chunk, Effect, Ref, pipe } from "effect"
interface PipelineMetrics {
readonly chunksProcessed: number
readonly elementsProcessed: number
readonly startTime: number
}
// Трансформер, собирающий метрики
const withMetrics = <A>(
metricsRef: Ref.Ref<PipelineMetrics>
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(
Channel.fromEffect(
Ref.update(metricsRef, (m) => ({
...m,
chunksProcessed: m.chunksProcessed + 1,
elementsProcessed: m.elementsProcessed + Chunk.size(chunk)
}))
),
() => Channel.flatMap(Channel.write(chunk), () => go)
),
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return go
}
// Использование
const program = Effect.gen(function* () {
const metrics = yield* Ref.make<PipelineMetrics>({
chunksProcessed: 0,
elementsProcessed: 0,
startTime: Date.now()
})
const source = pipe(
Channel.write(Chunk.make(1, 2, 3)),
Channel.flatMap(() => Channel.write(Chunk.make(4, 5))),
Channel.flatMap(() => Channel.write(Chunk.make(6)))
)
const pipeline = Channel.pipeTo(source, withMetrics<number>(metrics))
yield* Channel.runDrain(pipeline)
const finalMetrics = yield* Ref.get(metrics)
console.log(`Обработано: ${finalMetrics.chunksProcessed} чанков, ${finalMetrics.elementsProcessed} элементов`)
})
Effect.runPromise(program)
// Обработано: 3 чанков, 6 элементов
Circuit breaker пайплайн
import { Channel, Chunk, Effect, Ref, pipe } from "effect"
class CircuitOpen {
readonly _tag = "CircuitOpen" as const
constructor(readonly failureCount: number) {}
}
// Трансформер с circuit breaker
const circuitBreaker = <A, E>(
maxFailures: number,
failureCountRef: Ref.Ref<number>
): Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
CircuitOpen | E,
E,
void,
void
> => {
const go: Channel.Channel<
Chunk.Chunk<A>,
Chunk.Chunk<A>,
CircuitOpen | E,
E,
void,
void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) =>
Channel.flatMap(
// Сбрасываем счётчик при успешном чанке
Channel.fromEffect(Ref.set(failureCountRef, 0)),
() => Channel.flatMap(Channel.write(chunk), () => go)
),
onFailure: (err: E) =>
Channel.unwrap(
Effect.gen(function* () {
const count = yield* Ref.updateAndGet(
failureCountRef,
(n) => n + 1
)
if (count >= maxFailures) {
return Channel.fail(new CircuitOpen(count))
}
// Пропускаем ошибку upstream и продолжаем
return go
})
),
onDone: (done) => Channel.succeed(done)
})
return go
}
API Reference
Операции пайплайнов
| Операция | Описание |
|---|---|
Channel.pipeTo(upstream, downstream) | Соединяет выход upstream со входом downstream |
Channel.compose(downstream, upstream) | Обратный порядок pipeTo |
Channel.pipeToOrFail(upstream, downstream) | pipeTo с возможностью ошибки |
Stream.pipeThroughChannel(stream, channel) | Подключает Stream к Channel-трансформеру |
Stream.toChannel(stream) | Конвертирует Stream в Channel |
Stream.fromChannel(channel) | Конвертирует Channel в Stream |
Таблица совместимости типов для pipeTo
pipeTo(upstream, downstream):
upstream: Channel<OutElem_U, InElem_U, OutErr_U, InErr_U, OutDone_U, InDone_U, Env_U>
downstream: Channel<OutElem_D, OutElem_U, OutErr_D, OutErr_U, OutDone_D, OutDone_U, Env_D>
▲ ▲ ▲
= upstream OutElem = upstream OutErr = upstream OutDone
result: Channel<OutElem_D, InElem_U, OutErr_D, InErr_U, OutDone_D, InDone_U, Env_U | Env_D>
Примеры
💻 Пример 1: ETL-пайплайн
import { Channel, Chunk, Effect, pipe } from "effect"
// Extract: чтение данных из "источника"
const extract = pipe(
Channel.write(Chunk.make(
{ name: "Alice", age: "30" },
{ name: "Bob", age: "invalid" },
{ name: "Charlie", age: "25" }
))
)
// Transform: парсинг и валидация
interface Person {
readonly name: string
readonly age: number
}
const transform: Channel.Channel<
Chunk.Chunk<Person>,
Chunk.Chunk<{ readonly name: string; readonly age: string }>,
never,
never,
void,
void
> = Channel.readWith({
onInput: (chunk) => {
const validated = Chunk.filterMap(chunk, (raw) => {
const age = parseInt(raw.age, 10)
return isNaN(age)
? undefined // пропускаем невалидные
: ({ name: raw.name, age } as Person)
})
return Chunk.isEmpty(validated)
? transform
: Channel.flatMap(Channel.write(validated), () => transform)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
// Load: "сохранение" в лог
const load: Channel.Channel<
never,
Chunk.Chunk<Person>,
never,
never,
ReadonlyArray<Person>,
void
> = (() => {
const go = (
acc: Array<Person>
): Channel.Channel<
never,
Chunk.Chunk<Person>,
never,
never,
ReadonlyArray<Person>,
void
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<Person>) => {
for (const p of chunk) {
acc.push(p)
}
return go(acc)
},
onFailure: (err) => Channel.fail(err),
onDone: (_) => Channel.succeed(acc as ReadonlyArray<Person>)
})
return go([])
})()
// Собираем ETL-пайплайн
const etl = pipe(
extract,
(ch) => Channel.pipeTo(ch, transform),
(ch) => Channel.pipeTo(ch, load)
)
const program = Effect.gen(function* () {
const result = yield* Channel.run(etl)
console.log("Загружено:", result)
})
Effect.runPromise(program)
// Загружено: [{ name: "Alice", age: 30 }, { name: "Charlie", age: 25 }]
💻 Пример 2: Кастомный Stream-оператор
import { Channel, Chunk, Effect, Stream, pipe } from "effect"
// Sliding window: группирует элементы в окна скользящим образом
const slidingWindow = <A>(
windowSize: number,
stream: Stream.Stream<A>
): Stream.Stream<ReadonlyArray<A>> => {
const sourceChannel = Stream.toChannel(stream)
const slider = (
buf: ReadonlyArray<A>
): Channel.Channel<
Chunk.Chunk<ReadonlyArray<A>>,
Chunk.Chunk<A>,
never,
never,
unknown,
unknown
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
let currentBuf = [...buf]
const windows: Array<ReadonlyArray<A>> = []
for (const item of chunk) {
currentBuf.push(item)
if (currentBuf.length >= windowSize) {
windows.push([...currentBuf.slice(-windowSize)])
}
}
// Сохраняем только последние (windowSize - 1) элементов
const nextBuf = currentBuf.slice(-(windowSize - 1))
return windows.length > 0
? Channel.flatMap(
Channel.write(Chunk.fromIterable(windows)),
() => slider(nextBuf)
)
: slider(nextBuf)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
return Stream.fromChannel(
Channel.pipeTo(sourceChannel, slider([]))
)
}
// Тест
const stream = Stream.fromIterable([1, 2, 3, 4, 5, 6])
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(slidingWindow(3, stream))
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]
Упражнения
🟢 Basic
Упражнение 1: Простой пайплайн
Создайте пайплайн из трёх ступеней: producer эмитирует числа 1-10, трансформер фильтрует числа больше 5, consumer считает количество элементов.
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
const source = Channel.write(Chunk.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
const filterGt5: Channel.Channel<
Chunk.Chunk<number>, Chunk.Chunk<number>, never, never, void, void
> = Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) => {
const filtered = Chunk.filter(chunk, (n) => n > 5)
return Chunk.isEmpty(filtered)
? filterGt5
: Channel.flatMap(Channel.write(filtered), () => filterGt5)
},
onFailure: (err) => Channel.fail(err),
onDone: (done) => Channel.succeed(done)
})
const counter = (
count: number = 0
): Channel.Channel<never, Chunk.Chunk<number>, never, never, number, void> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<number>) =>
counter(count + Chunk.size(chunk)),
onFailure: (err) => Channel.fail(err),
onDone: (_) => Channel.succeed(count)
})
const pipeline = pipe(
source,
(ch) => Channel.pipeTo(ch, filterGt5),
(ch) => Channel.pipeTo(ch, counter())
)
const program = Effect.gen(function* () {
const result = yield* Channel.run(pipeline)
console.log("Количество чисел > 5:", result)
})
Effect.runPromise(program)
// Количество чисел > 5: 5
🟡 Intermediate
Упражнение 2: Пайплайн с агрегацией
Создайте пайплайн, который:
- Принимает поток строк (логов)
- Группирует их по уровню (INFO, ERROR, WARN)
- Возвращает объект со статистикой
Решение:
import { Channel, Chunk, Effect, pipe } from "effect"
interface LogStats {
readonly info: number
readonly error: number
readonly warn: number
}
const logSource = Channel.write(Chunk.make(
"INFO: Started",
"INFO: Processing",
"WARN: Slow query",
"ERROR: Connection failed",
"INFO: Retrying",
"ERROR: Timeout"
))
const statsCollector = (
stats: LogStats = { info: 0, error: 0, warn: 0 }
): Channel.Channel<never, Chunk.Chunk<string>, never, never, LogStats, void> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<string>) => {
let newStats = { ...stats }
for (const line of chunk) {
if (line.startsWith("INFO:")) newStats = { ...newStats, info: newStats.info + 1 }
else if (line.startsWith("ERROR:")) newStats = { ...newStats, error: newStats.error + 1 }
else if (line.startsWith("WARN:")) newStats = { ...newStats, warn: newStats.warn + 1 }
}
return statsCollector(newStats)
},
onFailure: (err) => Channel.fail(err),
onDone: (_) => Channel.succeed(stats)
})
const pipeline = Channel.pipeTo(logSource, statsCollector())
const program = Effect.gen(function* () {
const stats = yield* Channel.run(pipeline)
console.log("Статистика:", stats)
})
Effect.runPromise(program)
// Статистика: { info: 3, error: 2, warn: 1 }
🔴 Advanced
Упражнение 3: Кастомный Stream-оператор через Channel
Реализуйте chunkBy — оператор, который группирует элементы Stream в подгруппы, разделяя их каждый раз, когда предикат возвращает true. Используйте Channel внутри.
Пример: chunkBy([1, 2, 3, 0, 4, 5, 0, 6], (n) => n === 0) → [[1, 2, 3], [4, 5], [6]]
Решение:
import { Channel, Chunk, Effect, Stream, pipe } from "effect"
const chunkBy = <A>(
stream: Stream.Stream<A>,
isSeparator: (a: A) => boolean
): Stream.Stream<ReadonlyArray<A>> => {
const sourceChannel = Stream.toChannel(stream)
const splitter = (
current: Array<A>
): Channel.Channel<
Chunk.Chunk<ReadonlyArray<A>>,
Chunk.Chunk<A>,
never,
never,
unknown,
unknown
> =>
Channel.readWith({
onInput: (chunk: Chunk.Chunk<A>) => {
const groups: Array<ReadonlyArray<A>> = []
let buf = [...current]
for (const item of chunk) {
if (isSeparator(item)) {
if (buf.length > 0) {
groups.push([...buf])
buf = []
}
} else {
buf.push(item)
}
}
return groups.length > 0
? Channel.flatMap(
Channel.write(Chunk.fromIterable(groups)),
() => splitter(buf)
)
: splitter(buf)
},
onFailure: (err) => Channel.fail(err),
onDone: (_) => {
if (current.length > 0) {
return Channel.write(Chunk.of(current as ReadonlyArray<A>))
}
return Channel.void
}
})
return Stream.fromChannel(
Channel.pipeTo(sourceChannel, splitter([]))
)
}
// Тест
const stream = Stream.fromIterable([1, 2, 3, 0, 4, 5, 0, 6])
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(chunkBy(stream, (n) => n === 0))
console.log(Chunk.toReadonlyArray(result))
})
Effect.runPromise(program)
// [[1, 2, 3], [4, 5], [6]]
🔗 Связанные темы: 03-channel-operations.md, Модуль 10 (Stream), Модуль 11 (Sink), Модуль 08 (Fiber — конкурентность)