Effect Курс Построение пайплайнов
Глава

Построение пайплайнов

Пайплайны Channel — это способ соединять producers, transformers и consumers в единые потоковые конвейеры. Операция pipeTo — сердце потоковой архитектуры Effect.

Теория

Что такое пайплайн Channel

Пайплайн — это последовательность Channel, соединённых так, что выход одного становится входом следующего. Это фундаментальный паттерн потоковой обработки в Effect.

┌──────────┐     ┌──────────────┐     ┌──────────┐
│ Producer  │────►│ Transformer  │────►│ Consumer  │
│ (Source)  │     │ (Pipe)       │     │ (Sink)    │
└──────────┘     └──────────────┘     └──────────┘
  OutElem ──────► InElem→OutElem ────► InElem
  OutErr  ──────► InErr→OutErr  ────► InErr
  OutDone ──────► InDone→OutDone ───► InDone

Пайплайн обладает следующими свойствами:

  • Композируемость — любые два совместимых Channel можно соединить
  • Ленивость — данные обрабатываются по требованию downstream
  • Безопасность ресурсов — финализаторы всех ступеней гарантированно выполняются
  • Типобезопасность — несовместимые Channel нельзя соединить (ошибка компиляции)

Направление потока данных

В пайплайне Channel данные текут слева направо (от producer к consumer), а запросы на данные (pull) — справа налево (от consumer к producer):

Поток данных:         ──────────────────────────►
                      Producer  → Pipe → Consumer

Запросы (pull):       ◄──────────────────────────
                      Producer ← Pipe ← Consumer

Consumer “тянет” данные: когда ему нужен следующий чанк, он запрашивает его у предшествующего Pipe, который, в свою очередь, запрашивает у Producer. Это обеспечивает автоматический backpressure.


Концепция ФП

Категорная композиция

Channel.pipeTo реализует категорную композицию (composition of arrows). В теории категорий, если:

  • f: A → B (Channel из A в B)
  • g: B → C (Channel из B в C)

то g ∘ f: A → C (пайплайн из A в C).

В Effect-ts это записывается как:

// Channel.pipeTo(upstream, downstream)
// аналог: downstream ∘ upstream

// или Channel.compose(downstream, upstream)
// аналог: downstream ∘ upstream

Это удовлетворяет законам категории:

  1. Ассоциативность: pipeTo(pipeTo(a, b), c) ≡ pipeTo(a, pipeTo(b, c))
  2. Единица: pipeTo(a, identity) ≡ a ≡ pipeTo(identity, a)

Где identity — это Channel, который просто пересылает данные без изменений.

Coroutines и кооперативная многозадачность

Пайплайн Channel работает как система сопрограмм (coroutines). Каждый Channel в пайплайне — это coroutine, которая:

  1. Выполняет работу, пока не нужны данные (emit/read)
  2. Передаёт управление соседнему Channel
  3. Получает управление обратно, когда данные готовы

Это форма кооперативной многозадачности без потоков и переключения контекста.

Producer         Pipe            Consumer
   │                                │
   │◄─── pull ──────────── pull ────│
   │                                │
   ├─── emit(Chunk) ───►            │
   │                 ├─── emit(Chunk') ──►
   │                 │              │
   │◄─── pull ──────┤              │
   │                                │
   ├─── done ───────►              │
   │                 ├─── done ────►

Channel.pipeTo — основа пайплайнов

Базовое использование

Channel.pipeTo соединяет выход upstream с входом downstream. Типы должны быть совместимы:

  • OutElem upstream → InElem downstream
  • OutErr upstream → InErr downstream
  • OutDone upstream → InDone downstream
import { Channel, Chunk, Effect, pipe } from "effect"

// Producer: эмитирует чанки чисел
const producer = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
)

// Transformer: удваивает каждый элемент
const doubler: Channel.Channel<
  Chunk.Chunk<number>,
  Chunk.Chunk<number>,
  never,
  never,
  void,
  void
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) =>
    Channel.flatMap(
      Channel.write(Chunk.map(chunk, (n) => n * 2)),
      () => doubler
    ),
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

// Соединяем в пайплайн
const pipeline = Channel.pipeTo(producer, doubler)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [2, 4, 6, 8, 10, 12]

Типы в pipeTo

import { Channel, Chunk } from "effect"

// Producer:
// Channel<Chunk<number>, unknown, string, unknown, boolean, unknown, never>
//          OutElem        InElem   OutErr  InErr   OutDone  InDone   Env

// Transformer (downstream):
// Channel<Chunk<string>, Chunk<number>, Error, string, number, boolean, never>
//          OutElem        InElem         OutErr InErr   OutDone InDone   Env
//                         ▲               ▲             ▲
//                    = upstream OutElem  = upstream OutErr = upstream OutDone

// Результат pipeTo:
// Channel<Chunk<string>, unknown, Error, unknown, number, unknown, never>
//          ▲                       ▲               ▲
//     downstream OutElem   downstream OutErr  downstream OutDone
//
// InElem, InErr, InDone берутся от upstream
// OutElem, OutErr, OutDone берутся от downstream

Channel.compose — обратный pipeTo

Channel.compose делает то же, что и pipeTo, но с обратным порядком аргументов.

import { Channel, Chunk, Effect } from "effect"

// compose(downstream, upstream) ≡ pipeTo(upstream, downstream)
const producer = Channel.write(Chunk.make(1, 2, 3))

const transformer: Channel.Channel<
  Chunk.Chunk<string>,
  Chunk.Chunk<number>,
  never,
  never,
  void,
  void
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) =>
    Channel.flatMap(
      Channel.write(Chunk.map(chunk, (n) => `item-${n}`)),
      () => transformer
    ),
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

// Оба варианта эквивалентны:
const via1 = Channel.pipeTo(producer, transformer)
const via2 = Channel.compose(transformer, producer)

Многоступенчатые пайплайны

Цепочка из нескольких трансформеров

Пайплайны могут состоять из произвольного числа ступеней. Каждая ступень — это Channel-трансформер.

import { Channel, Chunk, Effect, pipe } from "effect"

// Утилита для создания трансформера
const mapChannel = <A, B>(
  f: (a: A) => B
): Channel.Channel<
  Chunk.Chunk<B>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<B>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.write(Chunk.map(chunk, f)),
        () => go
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

const filterChannel = <A>(
  predicate: (a: A) => boolean
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) => {
      const filtered = Chunk.filter(chunk, predicate)
      return Chunk.isEmpty(filtered)
        ? go
        : Channel.flatMap(Channel.write(filtered), () => go)
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

// Producer
const source = pipe(
  Channel.write(Chunk.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
)

// Пайплайн: source → filter(чётные) → map(*10) → map(toString)
const pipeline = pipe(
  source,
  (ch) => Channel.pipeTo(ch, filterChannel<number>((n) => n % 2 === 0)),
  (ch) => Channel.pipeTo(ch, mapChannel<number, number>((n) => n * 10)),
  (ch) => Channel.pipeTo(ch, mapChannel<number, string>((n) => `value: ${n}`))
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// ["value: 20", "value: 40", "value: 60", "value: 80", "value: 100"]

Пайплайн с аккумулятором (Consumer)

Consumer — это финальная ступень пайплайна, которая собирает результат.

import { Channel, Chunk, Effect, pipe } from "effect"

// Consumer: подсчитывает сумму всех элементов
const sumConsumer = (
  acc: number = 0
): Channel.Channel<
  never,             // не эмитирует
  Chunk.Chunk<number>,  // принимает чанки чисел
  never,
  never,
  number,            // результат — сумма
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<number>) => {
      const chunkSum = Chunk.reduce(chunk, 0, (a, b) => a + b)
      return sumConsumer(acc + chunkSum)
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (_) => Channel.succeed(acc)
  })

// Producer → Consumer
const source = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6)))
)

const pipeline = Channel.pipeTo(source, sumConsumer())

const program = Effect.gen(function* () {
  const result = yield* Channel.run(pipeline)
  console.log("Сумма:", result)
})

Effect.runPromise(program)
// Сумма: 21

Паттерны построения пайплайнов

Паттерн 1: Identity Channel

Identity Channel пересылает данные без изменений. Полезен как нейтральный элемент в комбинациях.

import { Channel, Chunk } from "effect"

const identity = <A>(): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(Channel.write(chunk), () => go),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

// pipeTo(source, identity()) ≡ source

Паттерн 2: Tap (наблюдение без изменений)

Tap Channel наблюдает за данными (выполняет побочный эффект), но пересылает их без изменений.

import { Channel, Chunk, Effect } from "effect"

const tap = <A>(
  f: (chunk: Chunk.Chunk<A>) => Effect.Effect<void>
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.fromEffect(f(chunk)),
        () => Channel.flatMap(Channel.write(chunk), () => go)
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

// Использование:
// pipeTo(source, tap((chunk) => Effect.log(`Got ${Chunk.size(chunk)} items`)))

Паттерн 3: Stateful трансформер

Трансформер с внутренним состоянием, передаваемым через рекурсию.

import { Channel, Chunk, Effect } from "effect"

// Нумерация элементов: каждый чанк получает глобальный порядковый номер
const enumerate = <A>(
  startIndex: number = 0
): Channel.Channel<
  Chunk.Chunk<readonly [number, A]>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go = (
    idx: number
  ): Channel.Channel<
    Chunk.Chunk<readonly [number, A]>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<A>) => {
        let currentIdx = idx
        const enumerated = Chunk.map(chunk, (a) => {
          const pair = [currentIdx, a] as const
          currentIdx++
          return pair
        })
        return Channel.flatMap(
          Channel.write(enumerated),
          () => go(currentIdx)
        )
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (done) => Channel.succeed(done)
    })
  return go(startIndex)
}

Паттерн 4: Buffered Channel

Channel с внутренним буфером, который собирает элементы до определённого размера.

import { Channel, Chunk, Effect } from "effect"

const buffer = <A>(
  maxSize: number
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go = (
    buf: Chunk.Chunk<A>
  ): Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<A>) => {
        const combined = Chunk.appendAll(buf, chunk)
        if (Chunk.size(combined) >= maxSize) {
          const [flush, rest] = Chunk.splitAt(combined, maxSize)
          return Channel.flatMap(
            Channel.write(flush),
            () => go(rest)
          )
        }
        return go(combined)
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (_) =>
        Chunk.isEmpty(buf)
          ? Channel.void
          : Channel.write(buf) // flush remaining
    })
  return go(Chunk.empty())
}

Паттерн 5: Early termination

Channel, который прекращает обработку при выполнении условия.

import { Channel, Chunk, Effect } from "effect"

const takeWhileChannel = <A>(
  predicate: (a: A) => boolean
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) => {
      const taken = Chunk.takeWhile(chunk, predicate)
      if (Chunk.size(taken) < Chunk.size(chunk)) {
        // Нашли элемент, не удовлетворяющий условию — завершаемся
        return Chunk.isEmpty(taken)
          ? Channel.void
          : Channel.write(taken)
      }
      // Весь чанк прошёл — продолжаем
      return Channel.flatMap(
        Channel.write(taken),
        () => takeWhileChannel(predicate)
      )
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })

Интеграция с Stream и Sink

Stream → Channel → Stream

Основной сценарий: опускаемся на уровень Channel для реализации кастомной логики, затем возвращаемся на уровень Stream.

import { Channel, Chunk, Effect, Stream, pipe } from "effect"

// Кастомный Stream-оператор через Channel
const deduplicateAdjacent = <A>(
  stream: Stream.Stream<A>
): Stream.Stream<A> => {
  // Опускаемся на уровень Channel
  const sourceChannel = Stream.toChannel(stream)

  // Дедупликатор на уровне Channel
  const dedup = (
    last: A | undefined
  ): Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    unknown,
    unknown
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<A>) => {
        const result: Array<A> = []
        let current = last
        for (const item of chunk) {
          if (item !== current) {
            result.push(item)
            current = item
          }
        }
        const outChunk = Chunk.fromIterable(result)
        return Chunk.isEmpty(outChunk)
          ? dedup(current)
          : Channel.flatMap(
              Channel.write(outChunk),
              () => dedup(current)
            )
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (done) => Channel.succeed(done)
    })

  // Соединяем и возвращаемся на уровень Stream
  return Stream.fromChannel(
    Channel.pipeTo(sourceChannel, dedup(undefined))
  )
}

// Использование
const stream = Stream.make(1, 1, 2, 2, 2, 3, 1, 1, 4, 4)

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(
    deduplicateAdjacent(stream)
  )
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [1, 2, 3, 1, 4]

Stream.pipeThroughChannel

Effect предоставляет Stream.pipeThroughChannel для прямого подключения Stream к Channel-трансформеру без ручной конвертации.

import { Channel, Chunk, Effect, Stream } from "effect"

// Channel-трансформер
const toUpperCase: Channel.Channel<
  Chunk.Chunk<string>,
  Chunk.Chunk<string>,
  never,
  never,
  unknown,
  unknown
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<string>) =>
    Channel.flatMap(
      Channel.write(Chunk.map(chunk, (s) => s.toUpperCase())),
      () => toUpperCase
    ),
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

const stream = Stream.make("hello", "world", "effect")

const result = Stream.pipeThroughChannel(stream, toUpperCase)

const program = Effect.gen(function* () {
  const collected = yield* Stream.runCollect(result)
  console.log(Chunk.toReadonlyArray(collected))
})

Effect.runPromise(program)
// ["HELLO", "WORLD", "EFFECT"]

Полный пайплайн: Stream → Channel Pipeline → Sink

import { Channel, Chunk, Effect, Sink, Stream, pipe } from "effect"

// 1. Source Stream
const source = Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])

// 2. Channel-трансформер (фильтр + маппинг)
const transformer: Channel.Channel<
  Chunk.Chunk<string>,
  Chunk.Chunk<number>,
  never,
  never,
  unknown,
  unknown
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) => {
    const processed = pipe(
      chunk,
      Chunk.filter((n) => n % 2 === 0),
      Chunk.map((n) => `item-${n}`)
    )
    return Chunk.isEmpty(processed)
      ? transformer
      : Channel.flatMap(Channel.write(processed), () => transformer)
  },
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

// 3. Подключаем трансформер к Stream
const transformed = Stream.pipeThroughChannel(source, transformer)

// 4. Запускаем через Sink
const program = Effect.gen(function* () {
  const result = yield* Stream.run(
    transformed,
    Sink.collectAll<string>()
  )
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// ["item-2", "item-4", "item-6", "item-8", "item-10"]

Production-паттерны

Rate-limiting пайплайн

import { Channel, Chunk, Duration, Effect, pipe } from "effect"

// Трансформер с rate limiting: задержка между чанками
const rateLimiter = <A>(
  delay: Duration.DurationInput
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.fromEffect(Effect.sleep(delay)),
        () => Channel.flatMap(Channel.write(chunk), () => go)
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

Metrics-collecting пайплайн

import { Channel, Chunk, Effect, Ref, pipe } from "effect"

interface PipelineMetrics {
  readonly chunksProcessed: number
  readonly elementsProcessed: number
  readonly startTime: number
}

// Трансформер, собирающий метрики
const withMetrics = <A>(
  metricsRef: Ref.Ref<PipelineMetrics>
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    never,
    never,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.fromEffect(
          Ref.update(metricsRef, (m) => ({
            ...m,
            chunksProcessed: m.chunksProcessed + 1,
            elementsProcessed: m.elementsProcessed + Chunk.size(chunk)
          }))
        ),
        () => Channel.flatMap(Channel.write(chunk), () => go)
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

// Использование
const program = Effect.gen(function* () {
  const metrics = yield* Ref.make<PipelineMetrics>({
    chunksProcessed: 0,
    elementsProcessed: 0,
    startTime: Date.now()
  })

  const source = pipe(
    Channel.write(Chunk.make(1, 2, 3)),
    Channel.flatMap(() => Channel.write(Chunk.make(4, 5))),
    Channel.flatMap(() => Channel.write(Chunk.make(6)))
  )

  const pipeline = Channel.pipeTo(source, withMetrics<number>(metrics))
  yield* Channel.runDrain(pipeline)

  const finalMetrics = yield* Ref.get(metrics)
  console.log(`Обработано: ${finalMetrics.chunksProcessed} чанков, ${finalMetrics.elementsProcessed} элементов`)
})

Effect.runPromise(program)
// Обработано: 3 чанков, 6 элементов

Circuit breaker пайплайн

import { Channel, Chunk, Effect, Ref, pipe } from "effect"

class CircuitOpen {
  readonly _tag = "CircuitOpen" as const
  constructor(readonly failureCount: number) {}
}

// Трансформер с circuit breaker
const circuitBreaker = <A, E>(
  maxFailures: number,
  failureCountRef: Ref.Ref<number>
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  CircuitOpen | E,
  E,
  void,
  void
> => {
  const go: Channel.Channel<
    Chunk.Chunk<A>,
    Chunk.Chunk<A>,
    CircuitOpen | E,
    E,
    void,
    void
  > = Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        // Сбрасываем счётчик при успешном чанке
        Channel.fromEffect(Ref.set(failureCountRef, 0)),
        () => Channel.flatMap(Channel.write(chunk), () => go)
      ),
    onFailure: (err: E) =>
      Channel.unwrap(
        Effect.gen(function* () {
          const count = yield* Ref.updateAndGet(
            failureCountRef,
            (n) => n + 1
          )
          if (count >= maxFailures) {
            return Channel.fail(new CircuitOpen(count))
          }
          // Пропускаем ошибку upstream и продолжаем
          return go
        })
      ),
    onDone: (done) => Channel.succeed(done)
  })
  return go
}

API Reference

Операции пайплайнов

ОперацияОписание
Channel.pipeTo(upstream, downstream)Соединяет выход upstream со входом downstream
Channel.compose(downstream, upstream)Обратный порядок pipeTo
Channel.pipeToOrFail(upstream, downstream)pipeTo с возможностью ошибки
Stream.pipeThroughChannel(stream, channel)Подключает Stream к Channel-трансформеру
Stream.toChannel(stream)Конвертирует Stream в Channel
Stream.fromChannel(channel)Конвертирует Channel в Stream

Таблица совместимости типов для pipeTo

pipeTo(upstream, downstream):

upstream:   Channel<OutElem_U, InElem_U, OutErr_U, InErr_U, OutDone_U, InDone_U, Env_U>
downstream: Channel<OutElem_D, OutElem_U, OutErr_D, OutErr_U, OutDone_D, OutDone_U, Env_D>
                                ▲                    ▲                     ▲
                           = upstream OutElem    = upstream OutErr    = upstream OutDone

result:     Channel<OutElem_D, InElem_U, OutErr_D, InErr_U, OutDone_D, InDone_U, Env_U | Env_D>

Примеры

💻 Пример 1: ETL-пайплайн

import { Channel, Chunk, Effect, pipe } from "effect"

// Extract: чтение данных из "источника"
const extract = pipe(
  Channel.write(Chunk.make(
    { name: "Alice", age: "30" },
    { name: "Bob", age: "invalid" },
    { name: "Charlie", age: "25" }
  ))
)

// Transform: парсинг и валидация
interface Person {
  readonly name: string
  readonly age: number
}

const transform: Channel.Channel<
  Chunk.Chunk<Person>,
  Chunk.Chunk<{ readonly name: string; readonly age: string }>,
  never,
  never,
  void,
  void
> = Channel.readWith({
  onInput: (chunk) => {
    const validated = Chunk.filterMap(chunk, (raw) => {
      const age = parseInt(raw.age, 10)
      return isNaN(age)
        ? undefined  // пропускаем невалидные
        : ({ name: raw.name, age } as Person)
    })
    return Chunk.isEmpty(validated)
      ? transform
      : Channel.flatMap(Channel.write(validated), () => transform)
  },
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

// Load: "сохранение" в лог
const load: Channel.Channel<
  never,
  Chunk.Chunk<Person>,
  never,
  never,
  ReadonlyArray<Person>,
  void
> = (() => {
  const go = (
    acc: Array<Person>
  ): Channel.Channel<
    never,
    Chunk.Chunk<Person>,
    never,
    never,
    ReadonlyArray<Person>,
    void
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<Person>) => {
        for (const p of chunk) {
          acc.push(p)
        }
        return go(acc)
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (_) => Channel.succeed(acc as ReadonlyArray<Person>)
    })
  return go([])
})()

// Собираем ETL-пайплайн
const etl = pipe(
  extract,
  (ch) => Channel.pipeTo(ch, transform),
  (ch) => Channel.pipeTo(ch, load)
)

const program = Effect.gen(function* () {
  const result = yield* Channel.run(etl)
  console.log("Загружено:", result)
})

Effect.runPromise(program)
// Загружено: [{ name: "Alice", age: 30 }, { name: "Charlie", age: 25 }]

💻 Пример 2: Кастомный Stream-оператор

import { Channel, Chunk, Effect, Stream, pipe } from "effect"

// Sliding window: группирует элементы в окна скользящим образом
const slidingWindow = <A>(
  windowSize: number,
  stream: Stream.Stream<A>
): Stream.Stream<ReadonlyArray<A>> => {
  const sourceChannel = Stream.toChannel(stream)

  const slider = (
    buf: ReadonlyArray<A>
  ): Channel.Channel<
    Chunk.Chunk<ReadonlyArray<A>>,
    Chunk.Chunk<A>,
    never,
    never,
    unknown,
    unknown
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<A>) => {
        let currentBuf = [...buf]
        const windows: Array<ReadonlyArray<A>> = []

        for (const item of chunk) {
          currentBuf.push(item)
          if (currentBuf.length >= windowSize) {
            windows.push([...currentBuf.slice(-windowSize)])
          }
        }

        // Сохраняем только последние (windowSize - 1) элементов
        const nextBuf = currentBuf.slice(-(windowSize - 1))

        return windows.length > 0
          ? Channel.flatMap(
              Channel.write(Chunk.fromIterable(windows)),
              () => slider(nextBuf)
            )
          : slider(nextBuf)
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (done) => Channel.succeed(done)
    })

  return Stream.fromChannel(
    Channel.pipeTo(sourceChannel, slider([]))
  )
}

// Тест
const stream = Stream.fromIterable([1, 2, 3, 4, 5, 6])

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(slidingWindow(3, stream))
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [[1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]]

Упражнения

🟢 Basic

Упражнение 1: Простой пайплайн

Создайте пайплайн из трёх ступеней: producer эмитирует числа 1-10, трансформер фильтрует числа больше 5, consumer считает количество элементов.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const source = Channel.write(Chunk.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))

const filterGt5: Channel.Channel<
  Chunk.Chunk<number>, Chunk.Chunk<number>, never, never, void, void
> = Channel.readWith({
  onInput: (chunk: Chunk.Chunk<number>) => {
    const filtered = Chunk.filter(chunk, (n) => n > 5)
    return Chunk.isEmpty(filtered)
      ? filterGt5
      : Channel.flatMap(Channel.write(filtered), () => filterGt5)
  },
  onFailure: (err) => Channel.fail(err),
  onDone: (done) => Channel.succeed(done)
})

const counter = (
  count: number = 0
): Channel.Channel<never, Chunk.Chunk<number>, never, never, number, void> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<number>) =>
      counter(count + Chunk.size(chunk)),
    onFailure: (err) => Channel.fail(err),
    onDone: (_) => Channel.succeed(count)
  })

const pipeline = pipe(
  source,
  (ch) => Channel.pipeTo(ch, filterGt5),
  (ch) => Channel.pipeTo(ch, counter())
)

const program = Effect.gen(function* () {
  const result = yield* Channel.run(pipeline)
  console.log("Количество чисел > 5:", result)
})

Effect.runPromise(program)
// Количество чисел > 5: 5

🟡 Intermediate

Упражнение 2: Пайплайн с агрегацией

Создайте пайплайн, который:

  1. Принимает поток строк (логов)
  2. Группирует их по уровню (INFO, ERROR, WARN)
  3. Возвращает объект со статистикой

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

interface LogStats {
  readonly info: number
  readonly error: number
  readonly warn: number
}

const logSource = Channel.write(Chunk.make(
  "INFO: Started",
  "INFO: Processing",
  "WARN: Slow query",
  "ERROR: Connection failed",
  "INFO: Retrying",
  "ERROR: Timeout"
))

const statsCollector = (
  stats: LogStats = { info: 0, error: 0, warn: 0 }
): Channel.Channel<never, Chunk.Chunk<string>, never, never, LogStats, void> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<string>) => {
      let newStats = { ...stats }
      for (const line of chunk) {
        if (line.startsWith("INFO:")) newStats = { ...newStats, info: newStats.info + 1 }
        else if (line.startsWith("ERROR:")) newStats = { ...newStats, error: newStats.error + 1 }
        else if (line.startsWith("WARN:")) newStats = { ...newStats, warn: newStats.warn + 1 }
      }
      return statsCollector(newStats)
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (_) => Channel.succeed(stats)
  })

const pipeline = Channel.pipeTo(logSource, statsCollector())

const program = Effect.gen(function* () {
  const stats = yield* Channel.run(pipeline)
  console.log("Статистика:", stats)
})

Effect.runPromise(program)
// Статистика: { info: 3, error: 2, warn: 1 }

🔴 Advanced

Упражнение 3: Кастомный Stream-оператор через Channel

Реализуйте chunkBy — оператор, который группирует элементы Stream в подгруппы, разделяя их каждый раз, когда предикат возвращает true. Используйте Channel внутри.

Пример: chunkBy([1, 2, 3, 0, 4, 5, 0, 6], (n) => n === 0)[[1, 2, 3], [4, 5], [6]]

Решение:

import { Channel, Chunk, Effect, Stream, pipe } from "effect"

const chunkBy = <A>(
  stream: Stream.Stream<A>,
  isSeparator: (a: A) => boolean
): Stream.Stream<ReadonlyArray<A>> => {
  const sourceChannel = Stream.toChannel(stream)

  const splitter = (
    current: Array<A>
  ): Channel.Channel<
    Chunk.Chunk<ReadonlyArray<A>>,
    Chunk.Chunk<A>,
    never,
    never,
    unknown,
    unknown
  > =>
    Channel.readWith({
      onInput: (chunk: Chunk.Chunk<A>) => {
        const groups: Array<ReadonlyArray<A>> = []
        let buf = [...current]

        for (const item of chunk) {
          if (isSeparator(item)) {
            if (buf.length > 0) {
              groups.push([...buf])
              buf = []
            }
          } else {
            buf.push(item)
          }
        }

        return groups.length > 0
          ? Channel.flatMap(
              Channel.write(Chunk.fromIterable(groups)),
              () => splitter(buf)
            )
          : splitter(buf)
      },
      onFailure: (err) => Channel.fail(err),
      onDone: (_) => {
        if (current.length > 0) {
          return Channel.write(Chunk.of(current as ReadonlyArray<A>))
        }
        return Channel.void
      }
    })

  return Stream.fromChannel(
    Channel.pipeTo(sourceChannel, splitter([]))
  )
}

// Тест
const stream = Stream.fromIterable([1, 2, 3, 0, 4, 5, 0, 6])

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(chunkBy(stream, (n) => n === 0))
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [[1, 2, 3], [4, 5], [6]]

🔗 Связанные темы: 03-channel-operations.md, Модуль 10 (Stream), Модуль 11 (Sink), Модуль 08 (Fiber — конкурентность)