Стандартные Sink
Полный каталог встроенных потребителей потоков — от простых аккумуляторов до сложных fold-операций с весами и условиями.
Теория
Классификация встроенных Sink
Встроенные Sink в Effect-ts можно разделить на несколько категорий по характеру потребления:
Встроенные Sink
├── Элементные (единичный элемент)
│ ├── head — первый элемент
│ ├── last — последний элемент
│ └── take — первые N элементов
│
├── Collecting (аккумуляция в коллекцию)
│ ├── collectAll — все элементы в Chunk
│ ├── collectAllN — первые N в Chunk
│ ├── collectAllWhile — пока предикат true
│ ├── collectAllToSet — уникальные в HashSet
│ ├── collectAllToSetN — уникальные с лимитом
│ ├── collectAllToMap — группировка в HashMap
│ └── collectAllToMapN — группировка с лимитом ключей
│
├── Folding (свёртка)
│ ├── foldLeft — полная свёртка без остановки
│ ├── fold — свёртка с условием остановки
│ ├── foldUntil — свёртка до N элементов
│ ├── foldWeighted — свёртка по весам
│ └── foldEffect — эффективная свёртка
│
├── Агрегационные (числовые)
│ ├── sum — сумма
│ └── count — количество
│
└── Утилитарные
├── drain — потребить и отбросить
├── forEach — побочный эффект на каждый элемент
├── timed — измерить время
├── succeed — немедленный успех
└── fail — немедленная ошибка
Принцип полноты потребления
Каждый Sink можно охарактеризовать по полноте потребления:
- Полное потребление (
L = never): Sink обрабатывает все элементы потока. Примеры:collectAll,foldLeft,sum,count,drain,forEach. - Частичное потребление (
L = In): Sink обрабатывает только часть элементов, оставляя leftovers. Примеры:head,last,take,collectAllN,fold,foldUntil.
Это различие критически важно при последовательной композиции Sink через flatMap — leftovers первого Sink передаются второму.
Концепция ФП
Catamorphism — свёртка как уничтожение структуры
Операция fold является катаморфизмом — фундаментальным понятием теории категорий, означающим “разрушение” структуры данных с извлечением значения:
Catamorphism (fold):
F-алгебра: F(A) → A
Для списка [a₁, a₂, a₃, ...]:
foldLeft(z)(f) = f(f(f(z, a₁), a₂), a₃)
f
/ \
f a₃
/ \
f a₂
/ \
z a₁
В контексте Sink катаморфизм работает инкрементально: элементы поступают по одному из потока, и аккумулятор обновляется на каждом шаге, не требуя всех элементов в памяти одновременно.
Гомоморфизм списков
Многие Sink реализуют гомоморфизм списков — преобразование, сохраняющее алгебраическую структуру:
h: List<A> → B
h([]) = e // начальное значение
h(x :: xs) = f(x, h(xs)) // комбинирование
Это в точности fold!
Свойства гомоморфизма позволяют параллелизовать операции: если f ассоциативна и e — нейтральный элемент, свёртку можно разбить на независимые части и объединить.
Элементные Sink
Sink.head — первый элемент
Sink.head потребляет только первый элемент потока, оборачивая его в Option.Some. Для пустого потока возвращает Option.None. Все остальные элементы остаются как leftovers:
import { Stream, Sink, Effect } from "effect"
// ── Непустой поток ──────────────────────────────────────
const nonEmpty = Stream.make(1, 2, 3, 4)
Effect.runPromise(Stream.run(nonEmpty, Sink.head())).then(console.log)
// Output: { _id: 'Option', _tag: 'Some', value: 1 }
// ── Пустой поток ────────────────────────────────────────
const empty = Stream.empty
Effect.runPromise(Stream.run(empty, Sink.head())).then(console.log)
// Output: { _id: 'Option', _tag: 'None' }
Тип: Sink<Option<A>, A, A, never, never> — обратите внимание, что L = A, потому что непотреблённые элементы того же типа, что и вход.
Sink.last — последний элемент
Sink.last должен потребить все элементы потока, чтобы определить последний. Возвращает Option:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
Effect.runPromise(Stream.run(stream, Sink.last())).then(console.log)
// Output: { _id: 'Option', _tag: 'Some', value: 4 }
const empty = Stream.empty
Effect.runPromise(Stream.run(empty, Sink.last())).then(console.log)
// Output: { _id: 'Option', _tag: 'None' }
⚠️ Важно: несмотря на то что Sink.last потребляет все элементы, его тип всё равно показывает L = A для leftovers. Однако на практике leftovers будут пустыми после полного потребления.
Sink.take — первые N элементов
Sink.take(n) собирает первые n элементов в Chunk:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5, 6)
Effect.runPromise(Stream.run(stream, Sink.take(3))).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3 ] }
Тип: Sink<Chunk<A>, A, A, never, never> — leftovers содержат элементы после первых N.
Collecting — сбор элементов
Sink.collectAll — все элементы в Chunk
Собирает абсолютно все элементы потока в иммутабельный Chunk:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
Effect.runPromise(Stream.run(stream, Sink.collectAll())).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }
⚠️ Внимание к памяти: collectAll хранит все элементы в памяти. Для потоков неизвестной длины или потенциально бесконечных потоков используйте collectAllN или потоковые операции (fold, forEach).
Sink.collectAllN — первые N элементов в Chunk
Ограниченный сбор — останавливается после N элементов:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
Effect.runPromise(
Stream.run(stream, Sink.collectAllN(3))
).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2, 3 ] }
Sink.collectAllWhile — сбор по условию
Собирает элементы, пока предикат возвращает true. При первом false останавливается:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 0, 4, 0, 6, 7)
Effect.runPromise(
Stream.run(stream, Sink.collectAllWhile((n) => n !== 0))
).then(console.log)
// Output: { _id: 'Chunk', values: [ 1, 2 ] }
Это работает как takeWhile — после первого несовпадения сбор прекращается, даже если дальше есть подходящие элементы.
Sink.collectAllToSet — уникальные элементы в HashSet
Собирает все элементы в HashSet, автоматически убирая дубликаты:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 2, 3, 4, 4)
Effect.runPromise(Stream.run(stream, Sink.collectAllToSet())).then(
console.log
)
// Output: { _id: 'HashSet', values: [ 1, 2, 3, 4 ] }
Sink.collectAllToSetN — уникальные с лимитом
Собирает уникальные элементы, ограничивая размер множества:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 2, 3, 4, 4)
Effect.runPromise(
Stream.run(stream, Sink.collectAllToSetN(3))
).then(console.log)
// Output: { _id: 'HashSet', values: [ 1, 2, 3 ] }
Sink.collectAllToMap — группировка в HashMap
Самый мощный коллектор — собирает элементы в HashMap с произвольной стратегией ключей и слияния:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 3, 2, 3, 1, 5, 1)
Effect.runPromise(
Stream.run(
stream,
Sink.collectAllToMap(
(n) => n % 3, // Ключ: остаток от деления на 3
(a, b) => a + b // Слияние: сумма значений с одним ключом
)
)
).then(console.log)
// Output: { _id: 'HashMap', values: [ [ 0, 6 ], [ 1, 3 ], [ 2, 7 ] ] }
Разберём результат:
- Ключ
0(n % 3 === 0): элементы 3, 3 → 3 + 3 = 6 - Ключ
1(n % 3 === 1): элементы 1, 1, 1 → 1 + 1 + 1 = 3 - Ключ
2(n % 3 === 2): элементы 2, 5 → 2 + 5 = 7
Sink.collectAllToMapN — группировка с лимитом ключей
Ограничивает количество уникальных ключей в HashMap:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 3, 2, 3, 1, 5, 1)
Effect.runPromise(
Stream.run(
stream,
Sink.collectAllToMapN(
3, // Максимум 3 ключа
(n) => n, // Ключ: само значение
(a, b) => a + b // Слияние: сумма
)
)
).then(console.log)
// Output: { _id: 'HashMap', values: [ [ 1, 2 ], [ 2, 2 ], [ 3, 6 ] ] }
Folding — свёртка
Sink.foldLeft — полная левая свёртка
foldLeft — это классический левый fold без условия остановки. Потребляет все элементы потока:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
// foldLeft<S, In>(initial: S, f: (state: S, input: In) => S)
Effect.runPromise(
Stream.run(
stream,
Sink.foldLeft(0, (acc, n) => acc + n)
)
).then(console.log)
// Output: 10
Семантика:
foldLeft(0, +) на [1, 2, 3, 4]:
step 0: acc = 0
step 1: acc = 0 + 1 = 1
step 2: acc = 1 + 2 = 3
step 3: acc = 3 + 3 = 6
step 4: acc = 6 + 4 = 10
result: 10
foldLeft — чистая функция, не поддерживает побочные эффекты в аккумуляторе. Для эффективных операций используйте Sink.foldEffect.
Sink.fold — свёртка с условием остановки
Sink.fold добавляет предикат продолжения: свёртка продолжается, пока предикат возвращает true. При первом false Sink завершается с текущим аккумулятором:
import { Stream, Sink, Effect } from "effect"
// Бесконечный поток натуральных чисел
const naturals = Stream.iterate(0, (n) => n + 1)
// fold<S, In>(initial: S, contFn: (s: S) => boolean, body: (s: S, in: In) => S)
Effect.runPromise(
Stream.run(
naturals,
Sink.fold(
0, // Начальное значение
(sum) => sum <= 10, // Продолжать пока сумма ≤ 10
(acc, n) => acc + n // Аккумулировать
)
)
).then(console.log)
// Output: 15
Разберём выполнение:
fold(0, sum <= 10, +) на [0, 1, 2, 3, 4, 5, ...]:
step 0: acc = 0, continue? 0 <= 10 = true, acc = 0 + 0 = 0
step 1: acc = 0, continue? 0 <= 10 = true, acc = 0 + 1 = 1
step 2: acc = 1, continue? 1 <= 10 = true, acc = 1 + 2 = 3
step 3: acc = 3, continue? 3 <= 10 = true, acc = 3 + 3 = 6
step 4: acc = 6, continue? 6 <= 10 = true, acc = 6 + 4 = 10
step 5: acc = 10, continue? 10 <= 10 = true, acc = 10 + 5 = 15
step 6: acc = 15, continue? 15 <= 10 = false, STOP
result: 15
⚠️ Порядок проверки: предикат проверяется перед потреблением следующего элемента. Поэтому результат 15, а не 10 — на шаге 5 аккумулятор ещё был ≤ 10, поэтому элемент 5 был потреблён.
Sink.foldUntil — свёртка до N элементов
Свёртка с ограничением по количеству потреблённых элементов:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// foldUntil<S, In>(initial: S, maxElements: number, body: (s: S, in: In) => S)
Effect.runPromise(
Stream.run(
stream,
Sink.foldUntil(0, 3, (acc, n) => acc + n)
)
).then(console.log)
// Output: 6 (1 + 2 + 3, остановились после 3 элементов)
Sink.foldWeighted — свёртка по весам
Самый гибкий вариант fold — позволяет назначать вес каждому элементу и аккумулировать, пока суммарный вес не превысит порог:
import { Stream, Sink, Chunk, Effect } from "effect"
const stream = Stream.make(3, 2, 4, 1, 5, 6, 2, 1, 3, 5, 6).pipe(
// transduce применяет Sink повторно, собирая результаты
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<number>(), // Начальный аккумулятор
maxCost: 3, // Максимальный суммарный вес
cost: () => 1, // Вес каждого элемента = 1
body: (acc, el) => Chunk.append(acc, el) // Добавляем в Chunk
})
)
)
Effect.runPromise(Stream.runCollect(stream)).then((chunk) =>
console.log("%o", chunk)
)
// Output:
// {
// _id: 'Chunk',
// values: [
// { _id: 'Chunk', values: [ 3, 2, 4 ] }, ← первая группа (3 элемента)
// { _id: 'Chunk', values: [ 1, 5, 6 ] }, ← вторая группа
// { _id: 'Chunk', values: [ 2, 1, 3 ] }, ← третья группа
// { _id: 'Chunk', values: [ 5, 6 ] } ← остаток (< 3 элементов)
// ]
// }
Разберём параметры foldWeighted:
initial— начальное значение аккумулятора для каждой группыmaxCost— максимальный допустимый суммарный вес группыcost: (state: S, element: In) => number— функция вычисления веса элемента (может зависеть от текущего состояния)body: (state: S, element: In) => S— функция обновления аккумулятора
Практический пример: группировка по размеру батча:
import { Stream, Sink, Chunk, Effect } from "effect"
interface LogEntry {
readonly message: string
readonly sizeBytes: number
}
const logs: Stream.Stream<LogEntry> = Stream.make(
{ message: "Start", sizeBytes: 100 },
{ message: "Process A", sizeBytes: 500 },
{ message: "Process B", sizeBytes: 300 },
{ message: "Large payload", sizeBytes: 2000 },
{ message: "End", sizeBytes: 50 }
)
// Группируем логи в батчи по 1KB
const batched = logs.pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<LogEntry>(),
maxCost: 1024, // 1KB на батч
cost: (_state, entry) => entry.sizeBytes, // Вес = размер в байтах
body: (acc, entry) => Chunk.append(acc, entry)
})
)
)
Effect.runPromise(Stream.runCollect(batched)).then((chunks) => {
console.log(`Total batches: ${Chunk.size(chunks)}`)
})
Stream.transduce — повторное применение Sink
Stream.transduce — ключевая функция для работы с Sink, которые потребляют частично. Она повторно применяет Sink к потоку, собирая результаты каждого применения в новый поток:
import { Stream, Sink, Effect } from "effect"
// Поток из 10 чисел
const stream = Stream.range(1, 11)
// transduce: применяет foldUntil(0, 3, +) повторно
const batched = stream.pipe(
Stream.transduce(
Sink.foldUntil(0, 3, (acc: number, n: number) => acc + n)
)
)
Effect.runPromise(Stream.runCollect(batched)).then(console.log)
// Output: Chunk(6, 15, 24, 10)
// Группа 1: 1+2+3 = 6
// Группа 2: 4+5+6 = 15
// Группа 3: 7+8+9 = 24
// Группа 4: 10 = 10 (неполная группа)
Агрегационные Sink
Sink.sum — сумма
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
Effect.runPromise(Stream.run(stream, Sink.sum)).then(console.log)
// Output: 10
⚠️ Sink.sum ожидает number на входе. Для строковых чисел используйте Sink.mapInput:
import { Stream, Sink, Effect } from "effect"
const stringStream = Stream.make("10", "20", "30")
const numericSum = Sink.sum.pipe(
Sink.mapInput((s: string) => Number.parseFloat(s))
)
Effect.runPromise(Stream.run(stringStream, numericSum)).then(console.log)
// Output: 60
Sink.count — подсчёт
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
Effect.runPromise(Stream.run(stream, Sink.count)).then(console.log)
// Output: 4
Трансформация и фильтрация
Sink.map — трансформация выхода
Sink.map преобразует результат Sink, не меняя поведение потребления:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4)
// sum возвращает number → map преобразует в string
const formattedSum = Sink.sum.pipe(
Sink.map((total) => `Total: ${total}`)
)
Effect.runPromise(Stream.run(stream, formattedSum)).then(console.log)
// Output: "Total: 10"
Sink.as — замена выхода константой
Sink.as заменяет результат Sink на фиксированное значение:
import { Stream, Sink, Effect, Console } from "effect"
// forEach выполняет эффект, as(42) подменяет результат void на 42
const sink = Sink.forEach((n: number) =>
Console.log(`Processing: ${n}`)
).pipe(Sink.as(42))
const stream = Stream.make(1, 2, 3)
Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Processing: 1
// Processing: 2
// Processing: 3
// 42
Sink.mapInput — трансформация входа
Sink.mapInput адаптирует Sink к другому типу входных данных. Функция преобразования идёт в обратном направлении (контравариантность):
import { Stream, Sink, Effect } from "effect"
// Поток строк
const stream = Stream.make("1", "2", "3", "4", "5")
// Адаптируем числовой sum для строковых входов
const stringSum = Sink.sum.pipe(
Sink.mapInput((s: string) => Number.parseFloat(s))
)
Effect.runPromise(Stream.run(stream, stringSum)).then(console.log)
// Output: 15
Sink.dimap — трансформация входа и выхода
Sink.dimap комбинирует mapInput и map в одну операцию:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make("1", "2", "3", "4", "5")
// string → (mapInput) → number → [sum] → number → (map) → string
const sumSink = Sink.dimap(Sink.sum, {
onInput: (s: string) => Number.parseFloat(s),
onDone: (n) => `Sum = ${n}`
})
Effect.runPromise(Stream.run(stream, sumSink)).then(console.log)
// Output: "Sum = 15"
Sink.filterInput — фильтрация входящих элементов
Sink.filterInput пропускает к Sink только элементы, удовлетворяющие предикату:
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, -2, 3, -4, 5, -6)
// Считаем сумму только положительных чисел
const positiveSum = Sink.sum.pipe(
Sink.filterInput((n: number) => n > 0)
)
Effect.runPromise(Stream.run(stream, positiveSum)).then(console.log)
// Output: 9 (1 + 3 + 5)
⚠️ Важно: filterInput не создаёт leftovers — отфильтрованные элементы просто отбрасываются. Это отличается от collectAllWhile, который останавливается при первом несовпадении.
API Reference
Элементные Sink
| Функция | Тип результата | Полнота | Описание |
|---|---|---|---|
head<A>() | Option<A> | Частичный | Первый элемент |
last<A>() | Option<A> | Полный | Последний элемент |
take<A>(n) | Chunk<A> | Частичный | Первые n элементов |
Collecting Sink
| Функция | Тип результата | Описание |
|---|---|---|
collectAll<A>() | Chunk<A> | Все элементы |
collectAllN<A>(n) | Chunk<A> | Первые n элементов |
collectAllWhile<A>(pred) | Chunk<A> | Пока предикат true |
collectAllToSet<A>() | HashSet<A> | Уникальные элементы |
collectAllToSetN<A>(n) | HashSet<A> | Уникальные с лимитом |
collectAllToMap<A>(keyFn, mergeFn) | HashMap<K, A> | Группировка |
collectAllToMapN<A>(n, keyFn, mergeFn) | HashMap<K, A> | Группировка с лимитом |
Folding Sink
| Функция | Условие остановки | Описание |
|---|---|---|
foldLeft(init, f) | Нет | Полная свёртка |
fold(init, contFn, f) | Предикат на аккумулятор | С условием |
foldUntil(init, max, f) | Количество элементов | До N элементов |
foldWeighted({...}) | Суммарный вес | По весам |
Трансформации
| Функция | Направление | Описание |
|---|---|---|
map(f) | Выход | Трансформация результата |
as(value) | Выход | Замена результата константой |
mapInput(f) | Вход | Трансформация входных элементов |
dimap({onInput, onDone}) | Оба | Трансформация входа и выхода |
filterInput(pred) | Вход | Фильтрация входных элементов |
Примеры
💻 Пример 1: Анализ лог-файла
import { Stream, Sink, Effect, pipe } from "effect"
// Модель записи лога
interface LogEntry {
readonly level: "INFO" | "WARN" | "ERROR"
readonly message: string
readonly timestamp: number
}
const logStream: Stream.Stream<LogEntry> = Stream.make(
{ level: "INFO", message: "Server started", timestamp: 1000 },
{ level: "INFO", message: "Request received", timestamp: 1100 },
{ level: "WARN", message: "Slow query", timestamp: 1200 },
{ level: "ERROR", message: "Connection timeout", timestamp: 1300 },
{ level: "INFO", message: "Request completed", timestamp: 1400 },
{ level: "ERROR", message: "Disk full", timestamp: 1500 },
{ level: "WARN", message: "Memory high", timestamp: 1600 }
)
// Подсчёт ошибок с фильтрацией
const errorCount = Sink.count.pipe(
Sink.filterInput((entry: LogEntry) => entry.level === "ERROR")
)
// Группировка по уровню
const byLevel = Sink.collectAllToMap<LogEntry>(
(entry) => entry.level,
(a, _b) => a // берём первый элемент (можно считать)
)
const program = Effect.gen(function* () {
const errors = yield* Stream.run(logStream, errorCount)
yield* Effect.log(`Error count: ${errors}`)
// Output: Error count: 2
})
Effect.runPromise(program)
💻 Пример 2: Пакетная обработка с foldWeighted
import { Stream, Sink, Chunk, Effect, Console } from "effect"
interface DataPacket {
readonly id: string
readonly payload: string
readonly sizeKB: number
}
const packets: Stream.Stream<DataPacket> = Stream.make(
{ id: "p1", payload: "data1", sizeKB: 300 },
{ id: "p2", payload: "data2", sizeKB: 500 },
{ id: "p3", payload: "data3", sizeKB: 400 },
{ id: "p4", payload: "data4", sizeKB: 200 },
{ id: "p5", payload: "data5", sizeKB: 800 },
{ id: "p6", payload: "data6", sizeKB: 100 }
)
// Группируем пакеты в батчи до 1MB (1024KB)
const batchedPackets = packets.pipe(
Stream.transduce(
Sink.foldWeighted({
initial: Chunk.empty<DataPacket>(),
maxCost: 1024,
cost: (_acc, packet) => packet.sizeKB,
body: (acc, packet) => Chunk.append(acc, packet)
})
)
)
const program = batchedPackets.pipe(
Stream.mapEffect((batch, index) =>
Effect.gen(function* () {
const ids = Chunk.toReadonlyArray(batch).map((p) => p.id)
const totalSize = Chunk.toReadonlyArray(batch).reduce(
(sum, p) => sum + p.sizeKB, 0
)
yield* Console.log(
`Batch ${index}: [${ids.join(", ")}] (${totalSize}KB)`
)
})
),
Stream.run(Sink.drain)
)
Effect.runPromise(program)
// Output:
// Batch 0: [p1, p2] (800KB)
// Batch 1: [p3, p4] (600KB)
// Batch 2: [p5] (800KB)
// Batch 3: [p6] (100KB)
💻 Пример 3: CSV парсер с collectAllWhile
import { Stream, Sink, Effect, Chunk } from "effect"
// Поток строк CSV, где пустая строка разделяет секции
const csvLines = Stream.make(
"name,age",
"Alice,30",
"Bob,25",
"", // разделитель секций
"city,country",
"NYC,US",
"London,UK"
)
// Собираем строки до пустой строки
const sectionSink = Sink.collectAllWhile<string>((line) => line !== "")
// Используем transduce для обработки каждой секции
const sections = csvLines.pipe(
Stream.transduce(sectionSink)
)
Effect.runPromise(Stream.runCollect(sections)).then((result) => {
Chunk.toReadonlyArray(result).forEach((section, i) => {
console.log(`Section ${i}: ${Chunk.toReadonlyArray(section)}`)
})
})
// Output:
// Section 0: name,age,Alice,30,Bob,25
// Section 1: city,country,NYC,US,London,UK
Упражнения
🟢 Basic
Упражнение 1: Базовые агрегации
Дан поток оценок студентов (числа от 1 до 5). Используйте Sink.sum, Sink.count и Sink.map для вычисления средней оценки.
import { Stream, Sink, Effect } from "effect"
const grades = Stream.make(5, 4, 3, 5, 4, 2, 5, 3, 4, 5)
const program = Effect.gen(function* () {
// Ваш код здесь
})
Effect.runPromise(program)
Решение:
import { Stream, Sink, Effect } from "effect"
const grades = Stream.make(5, 4, 3, 5, 4, 2, 5, 3, 4, 5)
const program = Effect.gen(function* () {
const total = yield* Stream.run(grades, Sink.sum)
const count = yield* Stream.run(grades, Sink.count)
const average = count > 0 ? total / count : 0
yield* Effect.log(`Total: ${total}, Count: ${count}, Average: ${average}`)
})
Effect.runPromise(program)
// Output: Total: 40, Count: 10, Average: 4
Упражнение 2: Уникальные слова
Дан поток слов с повторениями. Соберите уникальные слова в HashSet.
Решение:
import { Stream, Sink, Effect } from "effect"
const words = Stream.make(
"hello", "world", "hello", "effect", "world", "sink", "effect"
)
const program = Effect.gen(function* () {
const unique = yield* Stream.run(words, Sink.collectAllToSet())
yield* Effect.log(`Unique words: ${unique}`)
})
Effect.runPromise(program)
// Output: Unique words: { _id: 'HashSet', values: [ 'hello', 'world', 'effect', 'sink' ] }
🟡 Intermediate
Упражнение 3: Группировка транзакций
Дан поток финансовых транзакций. Сгруппируйте их по категориям и посчитайте суммарные расходы в каждой категории.
import { Stream, Sink, Effect } from "effect"
interface Transaction {
readonly category: string
readonly amount: number
}
const transactions = Stream.make(
{ category: "food", amount: 50 },
{ category: "transport", amount: 30 },
{ category: "food", amount: 75 },
{ category: "entertainment", amount: 100 },
{ category: "transport", amount: 25 },
{ category: "food", amount: 40 }
)
Решение:
import { Stream, Sink, Effect } from "effect"
interface Transaction {
readonly category: string
readonly amount: number
}
const transactions: Stream.Stream<Transaction> = Stream.make(
{ category: "food", amount: 50 },
{ category: "transport", amount: 30 },
{ category: "food", amount: 75 },
{ category: "entertainment", amount: 100 },
{ category: "transport", amount: 25 },
{ category: "food", amount: 40 }
)
const program = Effect.gen(function* () {
const byCategory = yield* Stream.run(
transactions,
Sink.collectAllToMap<Transaction>(
(t) => t.category,
(a, b) => ({ ...a, amount: a.amount + b.amount })
)
)
yield* Effect.log(`Expenses by category: ${byCategory}`)
})
Effect.runPromise(program)
// Output: HashMap with food=165, transport=55, entertainment=100
Упражнение 4: Fold с short-circuit
Найдите сумму натуральных чисел, первую превышающую 1000. Используйте Sink.fold с бесконечным потоком.
Решение:
import { Stream, Sink, Effect } from "effect"
const naturals = Stream.iterate(1, (n) => n + 1)
const program = Effect.gen(function* () {
const result = yield* Stream.run(
naturals,
Sink.fold(
0,
(sum) => sum <= 1000,
(acc, n) => acc + n
)
)
yield* Effect.log(`First sum exceeding 1000: ${result}`)
})
Effect.runPromise(program)
// Output: First sum exceeding 1000: 1035
🔴 Advanced
Упражнение 5: Adaptive batching с foldWeighted
Реализуйте систему пакетной отправки сообщений, где каждый пакет ограничен:
- Максимум 5 сообщений ИЛИ
- Максимум 1000 символов суммарной длины
Используйте foldWeighted с пользовательской функцией веса.
Решение:
import { Stream, Sink, Chunk, Effect, Console } from "effect"
interface Message {
readonly id: number
readonly text: string
}
const messages: Stream.Stream<Message> = Stream.make(
{ id: 1, text: "Hi" },
{ id: 2, text: "How are you?" },
{ id: 3, text: "A".repeat(400) },
{ id: 4, text: "B".repeat(500) },
{ id: 5, text: "Short" },
{ id: 6, text: "Also short" },
{ id: 7, text: "C".repeat(300) },
{ id: 8, text: "Tiny" },
{ id: 9, text: "D".repeat(200) },
{ id: 10, text: "End" }
)
interface BatchState {
readonly messages: Chunk.Chunk<Message>
readonly totalChars: number
readonly count: number
}
const batchSink = Sink.foldWeighted({
initial: {
messages: Chunk.empty<Message>(),
totalChars: 0,
count: 0
} satisfies BatchState,
maxCost: 1, // Используем бинарный вес: 0 = ok, 1 = стоп
cost: (state, msg: Message) => {
const newCount = state.count + 1
const newChars = state.totalChars + msg.text.length
// Вес 1 (= maxCost) если лимиты превышены
return newCount > 5 || newChars > 1000 ? 1 : 0
},
body: (state, msg) => ({
messages: Chunk.append(state.messages, msg),
totalChars: state.totalChars + msg.text.length,
count: state.count + 1
})
})
const program = messages.pipe(
Stream.transduce(batchSink),
Stream.mapEffect((batch, index) =>
Console.log(
`Batch ${index}: ${batch.count} messages, ${batch.totalChars} chars`
)
),
Stream.run(Sink.drain)
)
Effect.runPromise(program)
🔗 Далее: Комбинаторы Sink →