Effect Курс Стандартные Sink
Глава

Стандартные Sink

Полный каталог встроенных потребителей потоков — от простых аккумуляторов до сложных fold-операций с весами и условиями.

Теория

Классификация встроенных Sink

Встроенные Sink в Effect-ts можно разделить на несколько категорий по характеру потребления:

Встроенные Sink
├── Элементные (единичный элемент)
│   ├── head      — первый элемент
│   ├── last      — последний элемент
│   └── take      — первые N элементов

├── Collecting (аккумуляция в коллекцию)
│   ├── collectAll           — все элементы в Chunk
│   ├── collectAllN          — первые N в Chunk
│   ├── collectAllWhile      — пока предикат true
│   ├── collectAllToSet      — уникальные в HashSet
│   ├── collectAllToSetN     — уникальные с лимитом
│   ├── collectAllToMap      — группировка в HashMap
│   └── collectAllToMapN     — группировка с лимитом ключей

├── Folding (свёртка)
│   ├── foldLeft             — полная свёртка без остановки
│   ├── fold                 — свёртка с условием остановки
│   ├── foldUntil            — свёртка до N элементов
│   ├── foldWeighted         — свёртка по весам
│   └── foldEffect           — эффективная свёртка

├── Агрегационные (числовые)
│   ├── sum       — сумма
│   └── count     — количество

└── Утилитарные
    ├── drain     — потребить и отбросить
    ├── forEach   — побочный эффект на каждый элемент
    ├── timed     — измерить время
    ├── succeed   — немедленный успех
    └── fail      — немедленная ошибка

Принцип полноты потребления

Каждый Sink можно охарактеризовать по полноте потребления:

  • Полное потребление (L = never): Sink обрабатывает все элементы потока. Примеры: collectAll, foldLeft, sum, count, drain, forEach.
  • Частичное потребление (L = In): Sink обрабатывает только часть элементов, оставляя leftovers. Примеры: head, last, take, collectAllN, fold, foldUntil.

Это различие критически важно при последовательной композиции Sink через flatMap — leftovers первого Sink передаются второму.


Концепция ФП

Catamorphism — свёртка как уничтожение структуры

Операция fold является катаморфизмом — фундаментальным понятием теории категорий, означающим “разрушение” структуры данных с извлечением значения:

Catamorphism (fold):
  F-алгебра: F(A) → A

Для списка [a₁, a₂, a₃, ...]:
  foldLeft(z)(f) = f(f(f(z, a₁), a₂), a₃)

     f
    / \
   f   a₃
  / \
 f   a₂
/ \
z  a₁

В контексте Sink катаморфизм работает инкрементально: элементы поступают по одному из потока, и аккумулятор обновляется на каждом шаге, не требуя всех элементов в памяти одновременно.

Гомоморфизм списков

Многие Sink реализуют гомоморфизм списков — преобразование, сохраняющее алгебраическую структуру:

h: List<A> → B

h([])     = e           // начальное значение
h(x :: xs) = f(x, h(xs)) // комбинирование

Это в точности fold!

Свойства гомоморфизма позволяют параллелизовать операции: если f ассоциативна и e — нейтральный элемент, свёртку можно разбить на независимые части и объединить.


Элементные Sink

Sink.head — первый элемент

Sink.head потребляет только первый элемент потока, оборачивая его в Option.Some. Для пустого потока возвращает Option.None. Все остальные элементы остаются как leftovers:

import { Stream, Sink, Effect } from "effect"

// ── Непустой поток ──────────────────────────────────────
const nonEmpty = Stream.make(1, 2, 3, 4)

Effect.runPromise(Stream.run(nonEmpty, Sink.head())).then(console.log)
// Output: { _id: 'Option', _tag: 'Some', value: 1 }


// ── Пустой поток ────────────────────────────────────────
const empty = Stream.empty

Effect.runPromise(Stream.run(empty, Sink.head())).then(console.log)
// Output: { _id: 'Option', _tag: 'None' }

Тип: Sink<Option<A>, A, A, never, never> — обратите внимание, что L = A, потому что непотреблённые элементы того же типа, что и вход.

Sink.last — последний элемент

Sink.last должен потребить все элементы потока, чтобы определить последний. Возвращает Option:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

Effect.runPromise(Stream.run(stream, Sink.last())).then(console.log)
// Output: { _id: 'Option', _tag: 'Some', value: 4 }


const empty = Stream.empty

Effect.runPromise(Stream.run(empty, Sink.last())).then(console.log)
// Output: { _id: 'Option', _tag: 'None' }

⚠️ Важно: несмотря на то что Sink.last потребляет все элементы, его тип всё равно показывает L = A для leftovers. Однако на практике leftovers будут пустыми после полного потребления.

Sink.take — первые N элементов

Sink.take(n) собирает первые n элементов в Chunk:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5, 6)

Effect.runPromise(Stream.run(stream, Sink.take(3))).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3 ] }

Тип: Sink<Chunk<A>, A, A, never, never> — leftovers содержат элементы после первых N.


Collecting — сбор элементов

Sink.collectAll — все элементы в Chunk

Собирает абсолютно все элементы потока в иммутабельный Chunk:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

Effect.runPromise(Stream.run(stream, Sink.collectAll())).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }

⚠️ Внимание к памяти: collectAll хранит все элементы в памяти. Для потоков неизвестной длины или потенциально бесконечных потоков используйте collectAllN или потоковые операции (fold, forEach).

Sink.collectAllN — первые N элементов в Chunk

Ограниченный сбор — останавливается после N элементов:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

Effect.runPromise(
  Stream.run(stream, Sink.collectAllN(3))
).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3 ] }

Sink.collectAllWhile — сбор по условию

Собирает элементы, пока предикат возвращает true. При первом false останавливается:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 0, 4, 0, 6, 7)

Effect.runPromise(
  Stream.run(stream, Sink.collectAllWhile((n) => n !== 0))
).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2 ] }

Это работает как takeWhile — после первого несовпадения сбор прекращается, даже если дальше есть подходящие элементы.

Sink.collectAllToSet — уникальные элементы в HashSet

Собирает все элементы в HashSet, автоматически убирая дубликаты:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 2, 3, 4, 4)

Effect.runPromise(Stream.run(stream, Sink.collectAllToSet())).then(
  console.log
)
// Output: { _id: 'HashSet', values: [ 1, 2, 3, 4 ] }

Sink.collectAllToSetN — уникальные с лимитом

Собирает уникальные элементы, ограничивая размер множества:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 2, 3, 4, 4)

Effect.runPromise(
  Stream.run(stream, Sink.collectAllToSetN(3))
).then(console.log)
// Output: { _id: 'HashSet', values: [ 1, 2, 3 ] }

Sink.collectAllToMap — группировка в HashMap

Самый мощный коллектор — собирает элементы в HashMap с произвольной стратегией ключей и слияния:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 3, 2, 3, 1, 5, 1)

Effect.runPromise(
  Stream.run(
    stream,
    Sink.collectAllToMap(
      (n) => n % 3,          // Ключ: остаток от деления на 3
      (a, b) => a + b        // Слияние: сумма значений с одним ключом
    )
  )
).then(console.log)
// Output: { _id: 'HashMap', values: [ [ 0, 6 ], [ 1, 3 ], [ 2, 7 ] ] }

Разберём результат:

  • Ключ 0 (n % 3 === 0): элементы 3, 3 → 3 + 3 = 6
  • Ключ 1 (n % 3 === 1): элементы 1, 1, 1 → 1 + 1 + 1 = 3
  • Ключ 2 (n % 3 === 2): элементы 2, 5 → 2 + 5 = 7

Sink.collectAllToMapN — группировка с лимитом ключей

Ограничивает количество уникальных ключей в HashMap:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 3, 2, 3, 1, 5, 1)

Effect.runPromise(
  Stream.run(
    stream,
    Sink.collectAllToMapN(
      3,                      // Максимум 3 ключа
      (n) => n,               // Ключ: само значение
      (a, b) => a + b         // Слияние: сумма
    )
  )
).then(console.log)
// Output: { _id: 'HashMap', values: [ [ 1, 2 ], [ 2, 2 ], [ 3, 6 ] ] }

Folding — свёртка

Sink.foldLeft — полная левая свёртка

foldLeft — это классический левый fold без условия остановки. Потребляет все элементы потока:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

// foldLeft<S, In>(initial: S, f: (state: S, input: In) => S)
Effect.runPromise(
  Stream.run(
    stream,
    Sink.foldLeft(0, (acc, n) => acc + n)
  )
).then(console.log)
// Output: 10

Семантика:

foldLeft(0, +) на [1, 2, 3, 4]:
  step 0: acc = 0
  step 1: acc = 0 + 1 = 1
  step 2: acc = 1 + 2 = 3
  step 3: acc = 3 + 3 = 6
  step 4: acc = 6 + 4 = 10
  result: 10

foldLeft — чистая функция, не поддерживает побочные эффекты в аккумуляторе. Для эффективных операций используйте Sink.foldEffect.

Sink.fold — свёртка с условием остановки

Sink.fold добавляет предикат продолжения: свёртка продолжается, пока предикат возвращает true. При первом false Sink завершается с текущим аккумулятором:

import { Stream, Sink, Effect } from "effect"

// Бесконечный поток натуральных чисел
const naturals = Stream.iterate(0, (n) => n + 1)

// fold<S, In>(initial: S, contFn: (s: S) => boolean, body: (s: S, in: In) => S)
Effect.runPromise(
  Stream.run(
    naturals,
    Sink.fold(
      0,                        // Начальное значение
      (sum) => sum <= 10,       // Продолжать пока сумма ≤ 10
      (acc, n) => acc + n       // Аккумулировать
    )
  )
).then(console.log)
// Output: 15

Разберём выполнение:

fold(0, sum <= 10, +) на [0, 1, 2, 3, 4, 5, ...]:
  step 0: acc = 0,  continue? 0 <= 10 = true,   acc = 0 + 0 = 0
  step 1: acc = 0,  continue? 0 <= 10 = true,   acc = 0 + 1 = 1
  step 2: acc = 1,  continue? 1 <= 10 = true,   acc = 1 + 2 = 3
  step 3: acc = 3,  continue? 3 <= 10 = true,   acc = 3 + 3 = 6
  step 4: acc = 6,  continue? 6 <= 10 = true,   acc = 6 + 4 = 10
  step 5: acc = 10, continue? 10 <= 10 = true,  acc = 10 + 5 = 15
  step 6: acc = 15, continue? 15 <= 10 = false,  STOP
  result: 15

⚠️ Порядок проверки: предикат проверяется перед потреблением следующего элемента. Поэтому результат 15, а не 10 — на шаге 5 аккумулятор ещё был ≤ 10, поэтому элемент 5 был потреблён.

Sink.foldUntil — свёртка до N элементов

Свёртка с ограничением по количеству потреблённых элементов:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// foldUntil<S, In>(initial: S, maxElements: number, body: (s: S, in: In) => S)
Effect.runPromise(
  Stream.run(
    stream,
    Sink.foldUntil(0, 3, (acc, n) => acc + n)
  )
).then(console.log)
// Output: 6   (1 + 2 + 3, остановились после 3 элементов)

Sink.foldWeighted — свёртка по весам

Самый гибкий вариант fold — позволяет назначать вес каждому элементу и аккумулировать, пока суммарный вес не превысит порог:

import { Stream, Sink, Chunk, Effect } from "effect"

const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
  // transduce применяет Sink повторно, собирая результаты
  Stream.transduce(
    Sink.foldWeighted({
      initial: Chunk.empty<number>(),     // Начальный аккумулятор
      maxCost: 3,                          // Максимальный суммарный вес
      cost: () => 1,                       // Вес каждого элемента = 1
      body: (acc, el) => Chunk.append(acc, el) // Добавляем в Chunk
    })
  )
)

Effect.runPromise(Stream.runCollect(stream)).then((chunk) =>
  console.log("%o", chunk)
)
// Output:
// {
//   _id: 'Chunk',
//   values: [
//     { _id: 'Chunk', values: [ 3, 2, 4 ] },    ← первая группа (3 элемента)
//     { _id: 'Chunk', values: [ 1, 5, 6 ] },    ← вторая группа
//     { _id: 'Chunk', values: [ 2, 1, 3 ] },    ← третья группа
//     { _id: 'Chunk', values: [ 5, 6 ] }         ← остаток (< 3 элементов)
//   ]
// }

Разберём параметры foldWeighted:

  • initial — начальное значение аккумулятора для каждой группы
  • maxCost — максимальный допустимый суммарный вес группы
  • cost: (state: S, element: In) => number — функция вычисления веса элемента (может зависеть от текущего состояния)
  • body: (state: S, element: In) => S — функция обновления аккумулятора

Практический пример: группировка по размеру батча:

import { Stream, Sink, Chunk, Effect } from "effect"

interface LogEntry {
  readonly message: string
  readonly sizeBytes: number
}

const logs: Stream.Stream<LogEntry> = Stream.make(
  { message: "Start", sizeBytes: 100 },
  { message: "Process A", sizeBytes: 500 },
  { message: "Process B", sizeBytes: 300 },
  { message: "Large payload", sizeBytes: 2000 },
  { message: "End", sizeBytes: 50 }
)

// Группируем логи в батчи по 1KB
const batched = logs.pipe(
  Stream.transduce(
    Sink.foldWeighted({
      initial: Chunk.empty<LogEntry>(),
      maxCost: 1024,                                 // 1KB на батч
      cost: (_state, entry) => entry.sizeBytes,      // Вес = размер в байтах
      body: (acc, entry) => Chunk.append(acc, entry)
    })
  )
)

Effect.runPromise(Stream.runCollect(batched)).then((chunks) => {
  console.log(`Total batches: ${Chunk.size(chunks)}`)
})

Stream.transduce — повторное применение Sink

Stream.transduce — ключевая функция для работы с Sink, которые потребляют частично. Она повторно применяет Sink к потоку, собирая результаты каждого применения в новый поток:

import { Stream, Sink, Effect } from "effect"

// Поток из 10 чисел
const stream = Stream.range(1, 11)

// transduce: применяет foldUntil(0, 3, +) повторно
const batched = stream.pipe(
  Stream.transduce(
    Sink.foldUntil(0, 3, (acc: number, n: number) => acc + n)
  )
)

Effect.runPromise(Stream.runCollect(batched)).then(console.log)
// Output: Chunk(6, 15, 24, 10)
// Группа 1: 1+2+3 = 6
// Группа 2: 4+5+6 = 15
// Группа 3: 7+8+9 = 24
// Группа 4: 10 = 10 (неполная группа)

Агрегационные Sink

Sink.sum — сумма

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

Effect.runPromise(Stream.run(stream, Sink.sum)).then(console.log)
// Output: 10

⚠️ Sink.sum ожидает number на входе. Для строковых чисел используйте Sink.mapInput:

import { Stream, Sink, Effect } from "effect"

const stringStream = Stream.make("10", "20", "30")

const numericSum = Sink.sum.pipe(
  Sink.mapInput((s: string) => Number.parseFloat(s))
)

Effect.runPromise(Stream.run(stringStream, numericSum)).then(console.log)
// Output: 60

Sink.count — подсчёт

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

Effect.runPromise(Stream.run(stream, Sink.count)).then(console.log)
// Output: 4

Трансформация и фильтрация

Sink.map — трансформация выхода

Sink.map преобразует результат Sink, не меняя поведение потребления:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4)

// sum возвращает number → map преобразует в string
const formattedSum = Sink.sum.pipe(
  Sink.map((total) => `Total: ${total}`)
)

Effect.runPromise(Stream.run(stream, formattedSum)).then(console.log)
// Output: "Total: 10"

Sink.as — замена выхода константой

Sink.as заменяет результат Sink на фиксированное значение:

import { Stream, Sink, Effect, Console } from "effect"

// forEach выполняет эффект, as(42) подменяет результат void на 42
const sink = Sink.forEach((n: number) =>
  Console.log(`Processing: ${n}`)
).pipe(Sink.as(42))

const stream = Stream.make(1, 2, 3)

Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Processing: 1
// Processing: 2
// Processing: 3
// 42

Sink.mapInput — трансформация входа

Sink.mapInput адаптирует Sink к другому типу входных данных. Функция преобразования идёт в обратном направлении (контравариантность):

import { Stream, Sink, Effect } from "effect"

// Поток строк
const stream = Stream.make("1", "2", "3", "4", "5")

// Адаптируем числовой sum для строковых входов
const stringSum = Sink.sum.pipe(
  Sink.mapInput((s: string) => Number.parseFloat(s))
)

Effect.runPromise(Stream.run(stream, stringSum)).then(console.log)
// Output: 15

Sink.dimap — трансформация входа и выхода

Sink.dimap комбинирует mapInput и map в одну операцию:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make("1", "2", "3", "4", "5")

// string → (mapInput) → number → [sum] → number → (map) → string
const sumSink = Sink.dimap(Sink.sum, {
  onInput: (s: string) => Number.parseFloat(s),
  onDone: (n) => `Sum = ${n}`
})

Effect.runPromise(Stream.run(stream, sumSink)).then(console.log)
// Output: "Sum = 15"

Sink.filterInput — фильтрация входящих элементов

Sink.filterInput пропускает к Sink только элементы, удовлетворяющие предикату:

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, -2, 3, -4, 5, -6)

// Считаем сумму только положительных чисел
const positiveSum = Sink.sum.pipe(
  Sink.filterInput((n: number) => n > 0)
)

Effect.runPromise(Stream.run(stream, positiveSum)).then(console.log)
// Output: 9   (1 + 3 + 5)

⚠️ Важно: filterInput не создаёт leftovers — отфильтрованные элементы просто отбрасываются. Это отличается от collectAllWhile, который останавливается при первом несовпадении.


API Reference

Элементные Sink

ФункцияТип результатаПолнотаОписание
head<A>()Option<A>ЧастичныйПервый элемент
last<A>()Option<A>ПолныйПоследний элемент
take<A>(n)Chunk<A>ЧастичныйПервые n элементов

Collecting Sink

ФункцияТип результатаОписание
collectAll<A>()Chunk<A>Все элементы
collectAllN<A>(n)Chunk<A>Первые n элементов
collectAllWhile<A>(pred)Chunk<A>Пока предикат true
collectAllToSet<A>()HashSet<A>Уникальные элементы
collectAllToSetN<A>(n)HashSet<A>Уникальные с лимитом
collectAllToMap<A>(keyFn, mergeFn)HashMap<K, A>Группировка
collectAllToMapN<A>(n, keyFn, mergeFn)HashMap<K, A>Группировка с лимитом

Folding Sink

ФункцияУсловие остановкиОписание
foldLeft(init, f)НетПолная свёртка
fold(init, contFn, f)Предикат на аккумуляторС условием
foldUntil(init, max, f)Количество элементовДо N элементов
foldWeighted({...})Суммарный весПо весам

Трансформации

ФункцияНаправлениеОписание
map(f)ВыходТрансформация результата
as(value)ВыходЗамена результата константой
mapInput(f)ВходТрансформация входных элементов
dimap({onInput, onDone})ОбаТрансформация входа и выхода
filterInput(pred)ВходФильтрация входных элементов

Примеры

💻 Пример 1: Анализ лог-файла

import { Stream, Sink, Effect, pipe } from "effect"

// Модель записи лога
interface LogEntry {
  readonly level: "INFO" | "WARN" | "ERROR"
  readonly message: string
  readonly timestamp: number
}

const logStream: Stream.Stream<LogEntry> = Stream.make(
  { level: "INFO", message: "Server started", timestamp: 1000 },
  { level: "INFO", message: "Request received", timestamp: 1100 },
  { level: "WARN", message: "Slow query", timestamp: 1200 },
  { level: "ERROR", message: "Connection timeout", timestamp: 1300 },
  { level: "INFO", message: "Request completed", timestamp: 1400 },
  { level: "ERROR", message: "Disk full", timestamp: 1500 },
  { level: "WARN", message: "Memory high", timestamp: 1600 }
)

// Подсчёт ошибок с фильтрацией
const errorCount = Sink.count.pipe(
  Sink.filterInput((entry: LogEntry) => entry.level === "ERROR")
)

// Группировка по уровню
const byLevel = Sink.collectAllToMap<LogEntry>(
  (entry) => entry.level,
  (a, _b) => a  // берём первый элемент (можно считать)
)

const program = Effect.gen(function* () {
  const errors = yield* Stream.run(logStream, errorCount)
  yield* Effect.log(`Error count: ${errors}`)
  // Output: Error count: 2
})

Effect.runPromise(program)

💻 Пример 2: Пакетная обработка с foldWeighted

import { Stream, Sink, Chunk, Effect, Console } from "effect"

interface DataPacket {
  readonly id: string
  readonly payload: string
  readonly sizeKB: number
}

const packets: Stream.Stream<DataPacket> = Stream.make(
  { id: "p1", payload: "data1", sizeKB: 300 },
  { id: "p2", payload: "data2", sizeKB: 500 },
  { id: "p3", payload: "data3", sizeKB: 400 },
  { id: "p4", payload: "data4", sizeKB: 200 },
  { id: "p5", payload: "data5", sizeKB: 800 },
  { id: "p6", payload: "data6", sizeKB: 100 }
)

// Группируем пакеты в батчи до 1MB (1024KB)
const batchedPackets = packets.pipe(
  Stream.transduce(
    Sink.foldWeighted({
      initial: Chunk.empty<DataPacket>(),
      maxCost: 1024,
      cost: (_acc, packet) => packet.sizeKB,
      body: (acc, packet) => Chunk.append(acc, packet)
    })
  )
)

const program = batchedPackets.pipe(
  Stream.mapEffect((batch, index) =>
    Effect.gen(function* () {
      const ids = Chunk.toReadonlyArray(batch).map((p) => p.id)
      const totalSize = Chunk.toReadonlyArray(batch).reduce(
        (sum, p) => sum + p.sizeKB, 0
      )
      yield* Console.log(
        `Batch ${index}: [${ids.join(", ")}] (${totalSize}KB)`
      )
    })
  ),
  Stream.run(Sink.drain)
)

Effect.runPromise(program)
// Output:
// Batch 0: [p1, p2] (800KB)
// Batch 1: [p3, p4] (600KB)
// Batch 2: [p5] (800KB)
// Batch 3: [p6] (100KB)

💻 Пример 3: CSV парсер с collectAllWhile

import { Stream, Sink, Effect, Chunk } from "effect"

// Поток строк CSV, где пустая строка разделяет секции
const csvLines = Stream.make(
  "name,age",
  "Alice,30",
  "Bob,25",
  "",            // разделитель секций
  "city,country",
  "NYC,US",
  "London,UK"
)

// Собираем строки до пустой строки
const sectionSink = Sink.collectAllWhile<string>((line) => line !== "")

// Используем transduce для обработки каждой секции
const sections = csvLines.pipe(
  Stream.transduce(sectionSink)
)

Effect.runPromise(Stream.runCollect(sections)).then((result) => {
  Chunk.toReadonlyArray(result).forEach((section, i) => {
    console.log(`Section ${i}: ${Chunk.toReadonlyArray(section)}`)
  })
})
// Output:
// Section 0: name,age,Alice,30,Bob,25
// Section 1: city,country,NYC,US,London,UK

Упражнения

🟢 Basic

Упражнение 1: Базовые агрегации

Дан поток оценок студентов (числа от 1 до 5). Используйте Sink.sum, Sink.count и Sink.map для вычисления средней оценки.

import { Stream, Sink, Effect } from "effect"

const grades = Stream.make(5, 4, 3, 5, 4, 2, 5, 3, 4, 5)

const program = Effect.gen(function* () {
  // Ваш код здесь
})

Effect.runPromise(program)

Решение:

import { Stream, Sink, Effect } from "effect"

const grades = Stream.make(5, 4, 3, 5, 4, 2, 5, 3, 4, 5)

const program = Effect.gen(function* () {
  const total = yield* Stream.run(grades, Sink.sum)
  const count = yield* Stream.run(grades, Sink.count)
  const average = count > 0 ? total / count : 0

  yield* Effect.log(`Total: ${total}, Count: ${count}, Average: ${average}`)
})

Effect.runPromise(program)
// Output: Total: 40, Count: 10, Average: 4

Упражнение 2: Уникальные слова

Дан поток слов с повторениями. Соберите уникальные слова в HashSet.

Решение:

import { Stream, Sink, Effect } from "effect"

const words = Stream.make(
  "hello", "world", "hello", "effect", "world", "sink", "effect"
)

const program = Effect.gen(function* () {
  const unique = yield* Stream.run(words, Sink.collectAllToSet())
  yield* Effect.log(`Unique words: ${unique}`)
})

Effect.runPromise(program)
// Output: Unique words: { _id: 'HashSet', values: [ 'hello', 'world', 'effect', 'sink' ] }

🟡 Intermediate

Упражнение 3: Группировка транзакций

Дан поток финансовых транзакций. Сгруппируйте их по категориям и посчитайте суммарные расходы в каждой категории.

import { Stream, Sink, Effect } from "effect"

interface Transaction {
  readonly category: string
  readonly amount: number
}

const transactions = Stream.make(
  { category: "food", amount: 50 },
  { category: "transport", amount: 30 },
  { category: "food", amount: 75 },
  { category: "entertainment", amount: 100 },
  { category: "transport", amount: 25 },
  { category: "food", amount: 40 }
)

Решение:

import { Stream, Sink, Effect } from "effect"

interface Transaction {
  readonly category: string
  readonly amount: number
}

const transactions: Stream.Stream<Transaction> = Stream.make(
  { category: "food", amount: 50 },
  { category: "transport", amount: 30 },
  { category: "food", amount: 75 },
  { category: "entertainment", amount: 100 },
  { category: "transport", amount: 25 },
  { category: "food", amount: 40 }
)

const program = Effect.gen(function* () {
  const byCategory = yield* Stream.run(
    transactions,
    Sink.collectAllToMap<Transaction>(
      (t) => t.category,
      (a, b) => ({ ...a, amount: a.amount + b.amount })
    )
  )
  yield* Effect.log(`Expenses by category: ${byCategory}`)
})

Effect.runPromise(program)
// Output: HashMap with food=165, transport=55, entertainment=100

Упражнение 4: Fold с short-circuit

Найдите сумму натуральных чисел, первую превышающую 1000. Используйте Sink.fold с бесконечным потоком.

Решение:

import { Stream, Sink, Effect } from "effect"

const naturals = Stream.iterate(1, (n) => n + 1)

const program = Effect.gen(function* () {
  const result = yield* Stream.run(
    naturals,
    Sink.fold(
      0,
      (sum) => sum <= 1000,
      (acc, n) => acc + n
    )
  )
  yield* Effect.log(`First sum exceeding 1000: ${result}`)
})

Effect.runPromise(program)
// Output: First sum exceeding 1000: 1035

🔴 Advanced

Упражнение 5: Adaptive batching с foldWeighted

Реализуйте систему пакетной отправки сообщений, где каждый пакет ограничен:

  • Максимум 5 сообщений ИЛИ
  • Максимум 1000 символов суммарной длины

Используйте foldWeighted с пользовательской функцией веса.

Решение:

import { Stream, Sink, Chunk, Effect, Console } from "effect"

interface Message {
  readonly id: number
  readonly text: string
}

const messages: Stream.Stream<Message> = Stream.make(
  { id: 1, text: "Hi" },
  { id: 2, text: "How are you?" },
  { id: 3, text: "A".repeat(400) },
  { id: 4, text: "B".repeat(500) },
  { id: 5, text: "Short" },
  { id: 6, text: "Also short" },
  { id: 7, text: "C".repeat(300) },
  { id: 8, text: "Tiny" },
  { id: 9, text: "D".repeat(200) },
  { id: 10, text: "End" }
)

interface BatchState {
  readonly messages: Chunk.Chunk<Message>
  readonly totalChars: number
  readonly count: number
}

const batchSink = Sink.foldWeighted({
  initial: {
    messages: Chunk.empty<Message>(),
    totalChars: 0,
    count: 0
  } satisfies BatchState,
  maxCost: 1,  // Используем бинарный вес: 0 = ok, 1 = стоп
  cost: (state, msg: Message) => {
    const newCount = state.count + 1
    const newChars = state.totalChars + msg.text.length
    // Вес 1 (= maxCost) если лимиты превышены
    return newCount > 5 || newChars > 1000 ? 1 : 0
  },
  body: (state, msg) => ({
    messages: Chunk.append(state.messages, msg),
    totalChars: state.totalChars + msg.text.length,
    count: state.count + 1
  })
})

const program = messages.pipe(
  Stream.transduce(batchSink),
  Stream.mapEffect((batch, index) =>
    Console.log(
      `Batch ${index}: ${batch.count} messages, ${batch.totalChars} chars`
    )
  ),
  Stream.run(Sink.drain)
)

Effect.runPromise(program)

🔗 Далее: Комбинаторы Sink →