Effect Курс Конструкторы Channel
Глава

Конструкторы Channel

Полный обзор всех способов создания Channel: от простейших примитивов до сложных конструкторов с управлением ресурсами и обработкой upstream-событий.

Теория

Классификация конструкторов

Конструкторы Channel можно разделить на несколько категорий в зависимости от их назначения:

Конструкторы Channel
├── Примитивные (базовые операции)
│   ├── succeed / fail / failCause   — завершение
│   ├── write / writeAll             — эмиссия элементов
│   └── unit / void                  — пустой Channel

├── Из Effect (мост с Effect)
│   ├── fromEffect                   — Effect → Channel
│   ├── unwrap                       — Effect<Channel> → Channel
│   └── scoped                       — с управлением Scope

├── С чтением upstream (двунаправленные)
│   ├── read                         — базовое чтение
│   ├── readWith                     — чтение с обработчиками
│   └── readOrFail                   — чтение или ошибка

├── С управлением ресурсами
│   ├── acquireReleaseOut            — acquire/release для Channel
│   └── ensuring / ensuringWith      — финализаторы

└── Из других абстракций
    ├── Stream.toChannel             — Stream → Channel
    └── Sink.toChannel               — Sink → Channel

Принцип ленивости

Все конструкторы Channel создают описание вычисления, а не выполняют его. Channel — это ленивая структура, которая материализуется только при запуске через Channel.run, Channel.runCollect и подобные функции. Это фундаментальное свойство Effect-ts: отделение описания от исполнения.

import { Channel, Chunk, Effect } from "effect"

// Это НЕ выполняет side-effect — только описывает
const channel = Channel.flatMap(
  Channel.fromEffect(Effect.log("Hello")),
  () => Channel.write(Chunk.make(1, 2, 3))
)

// Выполнение происходит только здесь
Effect.runPromise(Channel.run(channel))

Примитивные конструкторы

Channel.succeed — завершение с результатом

Channel.succeed создаёт Channel, который немедленно завершается с заданным значением, не эмитируя никаких элементов. Это аналог Effect.succeed для Channel.

import { Channel, Effect } from "effect"

// Channel<never, unknown, never, unknown, number, unknown, never>
const done = Channel.succeed(42)

const program = Effect.gen(function* () {
  const [chunks, value] = yield* Channel.runCollect(done)
  console.log("Элементы:", chunks)  // пустой Chunk
  console.log("Результат:", value)  // 42
})

Effect.runPromise(program)
// Элементы: { _id: 'Chunk', values: [] }
// Результат: 42

⚠️ Channel.succeed не эмитирует элементы. Он задаёт финальное значение (OutDone). Для эмиссии элементов используйте Channel.write.

Channel.fail — завершение с ошибкой

Channel.fail создаёт Channel, который немедленно завершается с типизированной ошибкой.

import { Channel, Effect } from "effect"

class DatabaseError {
  readonly _tag = "DatabaseError" as const
  constructor(readonly message: string) {}
}

// Channel<never, unknown, DatabaseError, unknown, never, unknown, never>
const failing = Channel.fail(new DatabaseError("Connection lost"))

const program = Effect.gen(function* () {
  const exit = yield* Effect.exit(Channel.run(failing))
  console.log(exit)
})

Effect.runPromise(program)
// { _id: 'Exit', _tag: 'Failure', cause: { _tag: 'Fail', failure: DatabaseError } }

Channel.failCause — завершение с полной Cause

Channel.failCause позволяет указать полную причину сбоя, включая defects и interruptions.

import { Channel, Cause, Effect } from "effect"

// Channel с дефектом (непредвиденной ошибкой)
const defect = Channel.failCause(
  Cause.die(new Error("Unexpected crash"))
)

// Channel с составной ошибкой
const composite = Channel.failCause(
  Cause.sequential(
    Cause.fail("First error"),
    Cause.fail("Second error")
  )
)

Channel.write — эмиссия одного элемента

Channel.write создаёт Channel, который эмитирует один элемент и завершается.

import { Channel, Chunk, Effect } from "effect"

// Эмиссия одного чанка
const singleChunk = Channel.write(Chunk.make(1, 2, 3))

// Эмиссия одного скалярного значения
const singleValue = Channel.write("hello")

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(singleChunk)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3]

💡 В контексте Stream Channel обычно эмитирует Chunk<A>, но сам по себе Channel.write может эмитировать любой тип.

Channel.writeAll — эмиссия нескольких элементов

Channel.writeAll создаёт Channel, эмитирующий несколько элементов последовательно.

import { Channel, Chunk, Effect } from "effect"

// Эмиссия нескольких чанков
const multipleChunks = Channel.writeAll(
  Chunk.make(1, 2),
  Chunk.make(3, 4),
  Chunk.make(5, 6)
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(multipleChunks)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5, 6]

Channel.unit / Channel.void — пустой Channel

Channel.unit создаёт Channel, который немедленно завершается с void, не эмитируя элементов. Полезен как “нейтральный” элемент в композициях.

import { Channel, Effect } from "effect"

const empty = Channel.void

const program = Effect.gen(function* () {
  const [chunks, value] = yield* Channel.runCollect(empty)
  console.log("Элементы:", chunks)   // пустой Chunk
  console.log("Результат:", value)   // undefined
})

Effect.runPromise(program)

Channel.suspend — отложенное создание

Channel.suspend создаёт Channel лениво — фабричная функция вызывается при каждом запуске. Это полезно для рекурсивных определений и предотвращения eager evaluation.

import { Channel, Chunk, Effect } from "effect"

// Рекурсивный Channel, который эмитирует числа 0, 1, 2, ..., n
const countTo = (n: number, current: number = 0): Channel.Channel<
  Chunk.Chunk<number>,
  unknown,
  never,
  unknown,
  void,
  unknown
> =>
  current > n
    ? Channel.void
    : Channel.suspend(() =>
        Channel.flatMap(
          Channel.write(Chunk.of(current)),
          () => countTo(n, current + 1)
        )
      )

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(countTo(5))
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [0, 1, 2, 3, 4, 5]

⚠️ Без Channel.suspend рекурсивное определение может вызвать stack overflow, потому что Channel будет раскрыт сразу.


Конструкторы из Effect

Channel.fromEffect — мост Effect → Channel

Channel.fromEffect поднимает произвольный Effect в Channel. Результат Effect становится финальным значением Channel (OutDone), а не эмитируемым элементом.

import { Channel, Effect } from "effect"

// Effect → Channel
const fromEffect = Channel.fromEffect(
  Effect.succeed(42)
)
// Channel<never, unknown, never, unknown, number, unknown, never>
//                                         ▲
//                                    OutDone = 42

const program = Effect.gen(function* () {
  const [_, value] = yield* Channel.runCollect(fromEffect)
  console.log("Результат:", value) // 42
})

Effect.runPromise(program)

⚠️ Типичная ошибка: Channel.fromEffect помещает результат в OutDone, а не в OutElem. Чтобы эмитировать результат Effect как элемент:

import { Channel, Chunk, Effect, pipe } from "effect"

// НЕПРАВИЛЬНО: результат попадает в OutDone, а не в OutElem
const wrong = Channel.fromEffect(Effect.succeed(42))
// Channel<never, unknown, never, unknown, number, unknown, never>
//          ▲                               ▲
//       OutElem=never                 OutDone=number

// ПРАВИЛЬНО: маппим результат Effect и записываем как элемент
const correct = pipe(
  Channel.fromEffect(Effect.succeed(42)),
  Channel.flatMap((value) => Channel.write(Chunk.of(value)))
)
// Channel<Chunk<number>, unknown, never, unknown, void, unknown, never>
//          ▲
//       OutElem=Chunk<number>

Channel.fromEffect с ошибкой

import { Channel, Effect } from "effect"

class AppError {
  readonly _tag = "AppError" as const
  constructor(readonly reason: string) {}
}

const mayFail = Channel.fromEffect(
  Effect.fail(new AppError("Something went wrong"))
)
// Channel<never, unknown, AppError, unknown, never, unknown, never>

const withRecovery = Channel.fromEffect(
  Effect.tryPromise({
    try: () => fetch("https://api.example.com/data"),
    catch: () => new AppError("Network error")
  })
)

Channel.unwrap — разворачивание Effect

Channel.unwrap позволяет создать Channel, определение которого зависит от результата Effect. Это аналог Effect.flatMap для перехода из мира Effect в мир Channel.

import { Channel, Chunk, Effect } from "effect"

// Конфигурация определяет, какой Channel создать
const dynamicChannel = Channel.unwrap(
  Effect.gen(function* () {
    const config = yield* Effect.succeed({ batchSize: 3 })
    return Channel.write(
      Chunk.fromIterable(
        Array.from({ length: config.batchSize }, (_, i) => i + 1)
      )
    )
  })
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(dynamicChannel)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3]

Channel.scoped — Channel с Scope

Channel.scoped создаёт Channel из scoped Effect. Ресурсы, приобретённые в Scope, будут освобождены при завершении Channel.

import { Channel, Effect, Scope } from "effect"

const scopedChannel = Channel.scoped(
  Effect.gen(function* () {
    yield* Effect.addFinalizer(() =>
      Effect.log("Ресурс освобождён")
    )
    yield* Effect.log("Ресурс приобретён")
    return "resource-handle"
  })
)
// Ресурс будет освобождён когда Channel завершится

Конструкторы с чтением upstream

Channel.read — базовое чтение

Channel.read создаёт Channel, который читает один элемент из upstream. Возвращает Either<InDone, InElem> для различения элемента и завершения.

import { Channel, Effect } from "effect"

// Прочитать один элемент из upstream
const reader = Channel.read<number>()

Channel.readWith — чтение с тремя обработчиками

Channel.readWith — самый мощный конструктор для двунаправленных Channel. Он принимает три обработчика:

  • onInput — обработка входного элемента
  • onFailure — обработка ошибки upstream
  • onDone — обработка завершения upstream
import { Channel, Chunk, Effect, pipe } from "effect"

// Channel, который логирует каждый входящий чанк и пересылает дальше
const logger = <A>(): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.fromEffect(
          Effect.log(`Получен чанк размером ${Chunk.size(chunk)}`)
        ),
        () => Channel.flatMap(
          Channel.write(chunk),
          () => logger<A>() // рекурсивно продолжаем
        )
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })

Паттерн рекурсивного Channel

readWith часто используется рекурсивно: после обработки одного элемента Channel вызывает сам себя для обработки следующего. Это идиоматический паттерн для Channel-трансформеров:

import { Channel, Chunk, Effect } from "effect"

// Паттерн: рекурсивный трансформер
const transform = <A, B>(
  f: (a: A) => B
): Channel.Channel<
  Chunk.Chunk<B>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) =>
      Channel.flatMap(
        Channel.write(Chunk.map(chunk, f)),
        () => transform(f) // рекурсивный вызов
      ),
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })

Channel.readOrFail — чтение или ошибка

Channel.readOrFail читает элемент из upstream. Если upstream завершился (нет элементов), Channel завершается с указанной ошибкой.

import { Channel, Chunk, Effect, Option } from "effect"

// Прочитать элемент или завершиться с ошибкой
const readOrError = Channel.readOrFail("No more data")
// Если upstream завершён, ошибка "No more data"

Конструкторы с управлением ресурсами

Channel.acquireReleaseOut — безопасное управление ресурсами

Channel.acquireReleaseOut создаёт Channel, который:

  1. Приобретает ресурс (acquire)
  2. Эмитирует его как элемент
  3. Освобождает при завершении (release)
import { Channel, Effect } from "effect"

// Пример: канал с файловым ресурсом
const fileChannel = Channel.acquireReleaseOut(
  Effect.gen(function* () {
    yield* Effect.log("Открываем файл")
    return { fd: 42, path: "/data/log.txt" } as const
  }),
  (file) => Effect.log(`Закрываем файл ${file.path}`)
)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(
    Effect.scoped(fileChannel)
  )
  console.log("Файл:", chunks)
})

// Вывод:
// Открываем файл
// Закрываем файл /data/log.txt

Channel.ensuring — добавление финализатора

Channel.ensuring добавляет финализатор к существующему Channel. Финализатор выполнится при любом завершении Channel.

import { Channel, Chunk, Effect, pipe } from "effect"

const withCleanup = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.ensuring(
    Effect.log("Channel завершился, выполняем cleanup")
  )
)

const program = Channel.run(withCleanup)

Effect.runPromise(program)
// Channel завершился, выполняем cleanup

Channel.ensuringWith — финализатор с информацией о завершении

Channel.ensuringWith даёт доступ к Exit-значению в финализаторе, позволяя реагировать на способ завершения.

import { Channel, Chunk, Effect, Exit, pipe } from "effect"

const withDetailedCleanup = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.ensuringWith((exit) =>
    Exit.isSuccess(exit)
      ? Effect.log("Успешное завершение")
      : Effect.log("Ошибка при завершении")
  )
)

Конструкторы из других абстракций

Stream.toChannel — Stream → Channel

import { Channel, Chunk, Effect, Stream } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Конвертируем Stream в Channel
const channel = Stream.toChannel(stream)
// Channel<Chunk<number>, unknown, never, unknown, unknown, unknown, never>

// Теперь можно соединить с другими Channel
const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(channel)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5]

Sink.toChannel — Sink → Channel

import { Chunk, Effect, Sink, Stream } from "effect"

const sink = Sink.collectAll<number>()

// Sink как Channel — потребляет элементы, производит результат
const sinkChannel = Sink.toChannel(sink)
// Channel<never, Chunk<number>, never, unknown, Chunk<number>, unknown, never>

API Reference

Сводная таблица конструкторов

КонструкторТипОписание
Channel.succeed(value)[STABLE]Завершается с value, не эмитирует
Channel.fail(error)[STABLE]Завершается с ошибкой
Channel.failCause(cause)[STABLE]Завершается с полной Cause
Channel.write(elem)[STABLE]Эмитирует один элемент
Channel.writeAll(...elems)[STABLE]Эмитирует несколько элементов
Channel.void[STABLE]Пустой Channel (void)
Channel.suspend(f)[STABLE]Ленивое создание
Channel.fromEffect(effect)[STABLE]Effect → Channel (результат в OutDone)
Channel.unwrap(effect)[STABLE]Effect → Channel
Channel.scoped(effect)[STABLE]Scoped Effect → Channel
Channel.read()[STABLE]Чтение из upstream
Channel.readWith({...})[STABLE]Чтение с обработчиками
Channel.readOrFail(error)[STABLE]Чтение или ошибка
Channel.acquireReleaseOut(acq, rel)[STABLE]Ресурс как элемент
Channel.ensuring(finalizer)[STABLE]Финализатор
Channel.ensuringWith(f)[STABLE]Финализатор с Exit
Stream.toChannel(stream)[STABLE]Stream → Channel
Sink.toChannel(sink)[STABLE]Sink → Channel

Примеры

💻 Пример 1: Генератор последовательности Фибоначчи

import { Channel, Chunk, Effect, Stream } from "effect"

// Channel-генератор чисел Фибоначчи
const fibonacciChannel = (
  limit: number,
  a: number = 0,
  b: number = 1,
  count: number = 0
): Channel.Channel<Chunk.Chunk<number>> =>
  count >= limit
    ? Channel.void
    : Channel.suspend(() =>
        Channel.flatMap(
          Channel.write(Chunk.of(a)),
          () => fibonacciChannel(limit, b, a + b, count + 1)
        )
      )

// Используем через Stream
const fibStream = Stream.fromChannel(fibonacciChannel(10))

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(fibStream)
  console.log(Chunk.toReadonlyArray(result))
})

Effect.runPromise(program)
// [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

💻 Пример 2: Channel с зависимостями

import { Channel, Chunk, Context, Effect, Layer, Stream } from "effect"

// Определяем сервис
class Config extends Context.Tag("Config")<Config, {
  readonly batchSize: number
  readonly prefix: string
}>() {}

// Channel, использующий зависимость
const configuredChannel = Channel.unwrap(
  Effect.gen(function* () {
    const { batchSize, prefix } = yield* Config
    const items = Array.from(
      { length: batchSize },
      (_, i) => `${prefix}-${i}`
    )
    return Channel.write(Chunk.fromIterable(items))
  })
)

// Используем через Stream с provide
const stream = Stream.fromChannel(configuredChannel)

const ConfigLive = Layer.succeed(Config, {
  batchSize: 5,
  prefix: "item"
})

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(stream)
  console.log(Chunk.toReadonlyArray(result))
}).pipe(Effect.provide(ConfigLive))

Effect.runPromise(program)
// ["item-0", "item-1", "item-2", "item-3", "item-4"]

💻 Пример 3: Batching Channel

import { Channel, Chunk, Effect, pipe } from "effect"

// Channel-трансформер, который группирует элементы в батчи
const batcher = <A>(
  batchSize: number,
  buffer: Chunk.Chunk<A> = Chunk.empty()
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> =>
  Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) => {
      const combined = Chunk.appendAll(buffer, chunk)
      if (Chunk.size(combined) >= batchSize) {
        const [batch, rest] = Chunk.splitAt(combined, batchSize)
        return Channel.flatMap(
          Channel.write(batch),
          () => batcher(batchSize, rest)
        )
      }
      return batcher(batchSize, combined)
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (_) => {
      // Эмитируем оставшиеся элементы
      if (Chunk.isEmpty(buffer)) return Channel.void
      return Channel.write(buffer)
    }
  })

// Producer: элементы по одному
const producer = pipe(
  Channel.write(Chunk.make(1)),
  Channel.flatMap(() => Channel.write(Chunk.make(2))),
  Channel.flatMap(() => Channel.write(Chunk.make(3))),
  Channel.flatMap(() => Channel.write(Chunk.make(4))),
  Channel.flatMap(() => Channel.write(Chunk.make(5)))
)

const pipeline = Channel.pipeTo(producer, batcher<number>(3))

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5] (батч из 3, затем остаток из 2)

Упражнения

🟢 Basic

Упражнение 1: Цепочка конструкторов

Создайте Channel, который:

  1. Эмитирует чанк ["hello", "world"]
  2. Выполняет Effect.log("Emitted first batch")
  3. Эмитирует чанк ["foo", "bar"]
  4. Завершается со значением 4 (общее количество элементов)

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const channel = pipe(
  Channel.write(Chunk.make("hello", "world")),
  Channel.flatMap(() =>
    Channel.fromEffect(Effect.log("Emitted first batch"))
  ),
  Channel.flatMap(() => Channel.write(Chunk.make("foo", "bar"))),
  Channel.flatMap(() => Channel.succeed(4))
)

const program = Effect.gen(function* () {
  const [chunks, count] = yield* Channel.runCollect(channel)
  console.log("Элементы:", Chunk.toReadonlyArray(chunks))
  console.log("Количество:", count)
})

Effect.runPromise(program)
// timestamp=... level=INFO message="Emitted first batch"
// Элементы: ["hello", "world", "foo", "bar"]
// Количество: 4

Упражнение 2: Channel из Effect

Создайте Channel, который выполняет Effect, генерирующий случайное число, и эмитирует его как элемент. Повторите 5 раз.

Решение:

import { Channel, Chunk, Effect, Random } from "effect"

const randomElement = Channel.flatMap(
  Channel.fromEffect(Random.nextIntBetween(1, 100)),
  (n) => Channel.write(Chunk.of(n))
)

const fiveRandoms = (
  remaining: number = 5
): Channel.Channel<Chunk.Chunk<number>, unknown, never, unknown, void> =>
  remaining <= 0
    ? Channel.void
    : Channel.flatMap(
        randomElement,
        () => fiveRandoms(remaining - 1)
      )

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(fiveRandoms())
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Пример: [42, 17, 83, 5, 61]

🟡 Intermediate

Упражнение 3: readWith-трансформер

Реализуйте Channel-трансформер takeChannel(n), который пропускает первые n элементов из upstream и завершается. Используйте Channel.readWith и рекурсию.

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

const takeChannel = <A>(
  remaining: number
): Channel.Channel<
  Chunk.Chunk<A>,
  Chunk.Chunk<A>,
  never,
  never,
  void,
  void
> => {
  if (remaining <= 0) return Channel.void

  return Channel.readWith({
    onInput: (chunk: Chunk.Chunk<A>) => {
      const size = Chunk.size(chunk)
      if (size <= remaining) {
        return Channel.flatMap(
          Channel.write(chunk),
          () => takeChannel(remaining - size)
        )
      }
      const taken = Chunk.take(chunk, remaining)
      return Channel.write(taken)
    },
    onFailure: (err) => Channel.fail(err),
    onDone: (done) => Channel.succeed(done)
  })
}

const producer = pipe(
  Channel.write(Chunk.make(1, 2, 3)),
  Channel.flatMap(() => Channel.write(Chunk.make(4, 5, 6))),
  Channel.flatMap(() => Channel.write(Chunk.make(7, 8, 9)))
)

const pipeline = Channel.pipeTo(producer, takeChannel<number>(5))

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// [1, 2, 3, 4, 5]

🔴 Advanced

Упражнение 4: Channel с управлением ресурсами

Создайте Channel, который:

  1. Приобретает «подключение к БД» (мок)
  2. Читает чанки запросов из upstream
  3. Для каждого запроса эмитирует «результат»
  4. Корректно освобождает подключение при завершении (успешном или ошибочном)

Решение:

import { Channel, Chunk, Effect, pipe } from "effect"

interface DbConnection {
  readonly id: string
  readonly query: (sql: string) => Effect.Effect<string>
}

const dbChannel = pipe(
  // Acquire: создаём подключение
  Channel.fromEffect(
    Effect.gen(function* () {
      yield* Effect.log("Подключаемся к БД")
      const conn: DbConnection = {
        id: "conn-001",
        query: (sql) => Effect.succeed(`Result for: ${sql}`)
      }
      return conn
    })
  ),
  // Используем подключение для обработки запросов
  Channel.flatMap((conn) => {
    const processQueries: Channel.Channel<
      Chunk.Chunk<string>,
      Chunk.Chunk<string>,
      never,
      never,
      void,
      void
    > = Channel.readWith({
      onInput: (queries: Chunk.Chunk<string>) =>
        Channel.flatMap(
          Channel.fromEffect(
            Effect.gen(function* () {
              const results: ReadonlyArray<string> = []
              for (const q of queries) {
                const r = yield* conn.query(q)
                ;(results as Array<string>).push(r)
              }
              return Chunk.fromIterable(results)
            })
          ),
          (results) =>
            Channel.flatMap(
              Channel.write(results),
              () => processQueries
            )
        ),
      onFailure: (err) => Channel.fail(err),
      onDone: (_) => Channel.void
    })
    return processQueries
  }),
  // Release: финализатор
  Channel.ensuring(
    Effect.log("Отключаемся от БД")
  )
)

// Producer: запросы
const queries = pipe(
  Channel.write(Chunk.make("SELECT 1", "SELECT 2")),
  Channel.flatMap(() =>
    Channel.write(Chunk.make("SELECT 3"))
  )
)

const pipeline = Channel.pipeTo(queries, dbChannel)

const program = Effect.gen(function* () {
  const [chunks, _] = yield* Channel.runCollect(pipeline)
  console.log(Chunk.toReadonlyArray(chunks))
})

Effect.runPromise(program)
// Подключаемся к БД
// ["Result for: SELECT 1", "Result for: SELECT 2", "Result for: SELECT 3"]
// Отключаемся от БД

🔗 Связанные темы: 01-channel-concept.md, 03-channel-operations.md, Модуль 05 (Scope & Resources)