Агрегация
Агрегация — процесс сведения потока множества значений к сводным результатам. В Effect-ts агрегация реализована через scan (промежуточные результаты), fold (финальный результат) и aggregate (через Sink), обеспечивая как потоковую, так и batch-обработку.
Теория
Катаморфизм: от структуры к значению
Агрегация — это катаморфизм: структурная рекурсия, разрушающая коллекцию и производящая единичное значение (или поток промежуточных значений):
fold (катаморфизм): Stream<A> → B
[1, 2, 3, 4, 5] → 15 (sum)
scan (промежуточный катаморфизм): Stream<A> → Stream<B>
[1, 2, 3, 4, 5] → [0, 1, 3, 6, 10, 15]
Три уровня агрегации
┌─────────────────────────────────────────────────┐
│ Уровень 1: scan │
│ Stream<A> → Stream<S> │
│ Эмитирует каждый промежуточный результат │
│ Поток остаётся потоком │
├─────────────────────────────────────────────────┤
│ Уровень 2: runFold │
│ Stream<A> → Effect<S> │
│ Возвращает только финальный результат │
│ Поток сворачивается в Effect │
├─────────────────────────────────────────────────┤
│ Уровень 3: aggregate │
│ Stream<A> → Stream<B> (через Sink) │
│ Sink потребляет часть элементов, эмитирует │
│ сводное значение, затем повторяет │
└─────────────────────────────────────────────────┘
Scan — кумулятивная аккумуляция
Stream.scan — базовый scan
scan применяет аккумулирующую функцию к каждому элементу и эмитирует каждый промежуточный результат:
import { Stream, Effect } from "effect"
// Нарастающая сумма
const runningSum = Stream.range(1, 5).pipe(
Stream.scan(0, (acc, n) => acc + n)
)
Effect.runPromise(Stream.runCollect(runningSum)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
// ↑ ↑ ↑ ↑ ↑ ↑
// init +1 +2 +3 +4 +5
Важные свойства:
- Первый эмитируемый элемент — начальное значение (
init) - Для N входных элементов эмитируется N+1 выходных
- Каждое выходное значение зависит от всех предыдущих входных
Stream.scanEffect — эффективный scan
Когда аккумулирующая функция — эффект:
import { Stream, Effect, Console } from "effect"
const stream = Stream.range(1, 5).pipe(
Stream.scanEffect(0, (acc, n) =>
Console.log(`Accumulating: ${acc} + ${n} = ${acc + n}`).pipe(
Effect.as(acc + n)
)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Accumulating: 0 + 1 = 1
Accumulating: 1 + 2 = 3
Accumulating: 3 + 3 = 6
Accumulating: 6 + 4 = 10
Accumulating: 10 + 5 = 15
{ _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
*/
Паттерны использования scan
import { Stream, Effect } from "effect"
// Running maximum
const runningMax = Stream.make(3, 1, 4, 1, 5, 9, 2, 6).pipe(
Stream.scan(-Infinity, (max, n) => Math.max(max, n))
)
// Running count
const runningCount = Stream.make("a", "b", "c", "d").pipe(
Stream.scan(0, (count, _) => count + 1)
)
// Running average
const runningAvg = Stream.make(10, 20, 30, 40, 50).pipe(
Stream.scan(
{ sum: 0, count: 0 } as { readonly sum: number; readonly count: number },
(acc, n) => ({ sum: acc.sum + n, count: acc.count + 1 })
),
Stream.map((acc) =>
acc.count === 0 ? 0 : acc.sum / acc.count
)
)
Effect.runPromise(Stream.runCollect(runningAvg)).then(console.log)
// { _id: 'Chunk', values: [ 0, 10, 15, 20, 25, 30 ] }
Fold — свёртка потока
Stream.runFold — полная свёртка
Сворачивает весь поток в единственное значение:
import { Stream, Effect } from "effect"
const sum = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFold(0, (acc, n) => acc + n)
)
Effect.runPromise(sum).then(console.log) // 15
Результат — Effect<S, E, R>, а не Stream — поток полностью потреблён.
Stream.runFoldWhile — свёртка с условием
Сворачивает элементы, пока условие на аккумулятор выполняется:
import { Stream, Effect } from "effect"
// Суммируем, пока сумма <= 10
const result = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFoldWhile(
0,
(sum) => sum <= 10, // продолжаем пока sum ≤ 10
(acc, n) => acc + n
)
)
Effect.runPromise(result).then(console.log) // 15
// 0+1=1 (≤10 ✓), 1+2=3 (≤10 ✓), 3+3=6 (≤10 ✓), 6+4=10 (≤10 ✓), 10+5=15 (>10, но уже применён)
Stream.runFoldEffect — свёртка с эффектами
import { Stream, Effect, Console } from "effect"
const result = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.runFoldEffect(0, (acc, n) =>
Console.log(`${acc} + ${n}`).pipe(Effect.as(acc + n))
)
)
Effect.runPromise(result).then(console.log)
Aggregate — агрегация через Sink
Stream.aggregate — потоковая агрегация
aggregate применяет Sink к потоку повторно: Sink потребляет часть элементов, эмитирует результат, затем процесс повторяется для оставшихся элементов:
import { Stream, Sink, Effect } from "effect"
// Суммируем потоком по 3 элемента
const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9).pipe(
Stream.aggregate(
Sink.foldUntil(0, 3, (acc, n: number) => acc + n)
// ↑
// после 3 элементов Sink завершается
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 6, 15, 24 ] }
// (1+2+3=6), (4+5+6=15), (7+8+9=24)
Stream.aggregateWithin — агрегация с расписанием
Совмещает агрегацию через Sink с временными ограничениями через Schedule:
import { Stream, Sink, Schedule, Effect } from "effect"
// Собираем элементы:
// - Максимум 5 элементов
// - ИЛИ каждые 2 секунды
const stream = Stream.range(1, 20).pipe(
Stream.schedule(Schedule.spaced("500 millis")),
Stream.aggregateWithin(
Sink.collectAllN<number>(5),
Schedule.spaced("2 seconds")
)
)
Потоковые аккумуляторы
Использование Sink с Stream.run
import { Stream, Sink, Effect } from "effect"
// Сумма через Sink
const sum = Stream.make(1, 2, 3).pipe(
Stream.run(Sink.sum)
)
Effect.runPromise(sum).then(console.log) // 6
// Подсчёт элементов
const count = Stream.make("a", "b", "c", "d").pipe(
Stream.run(Sink.count)
)
Effect.runPromise(count).then(console.log) // 4
Комбинирование Sink для множественной агрегации
import { Stream, Sink, Effect } from "effect"
// Одновременно считаем сумму и количество
const combined = Sink.zip(Sink.sum, Sink.count)
const result = Stream.make(10, 20, 30, 40, 50).pipe(
Stream.run(combined)
)
Effect.runPromise(result).then(console.log)
// [150, 5]
API Reference
| Функция | Тип | Описание |
|---|---|---|
Stream.scan(init, f) | → Stream<S> | Эмитирует промежуточные результаты |
Stream.scanEffect(init, f) | → Stream<S> | Scan с эффективной функцией |
Stream.runFold(init, f) | → Effect<S> | Полная свёртка |
Stream.runFoldWhile(init, pred, f) | → Effect<S> | Свёртка с условием |
Stream.runFoldEffect(init, f) | → Effect<S> | Свёртка с эффектами |
Stream.aggregate(sink) | → Stream<B> | Повторная агрегация через Sink |
Stream.aggregateWithin(sink, sched) | → Stream<B> | Агрегация + расписание |
Stream.run(sink) | → Effect<B> | Запуск с Sink |
Примеры
Production: статистика в реальном времени
import { Stream, Effect, Schedule, Random } from "effect"
interface Stats {
readonly count: number
readonly sum: number
readonly min: number
readonly max: number
readonly mean: number
}
const emptyStats: Stats = {
count: 0, sum: 0, min: Infinity, max: -Infinity, mean: 0
}
const updateStats = (stats: Stats, value: number): Stats => {
const count = stats.count + 1
const sum = stats.sum + value
return {
count,
sum,
min: Math.min(stats.min, value),
max: Math.max(stats.max, value),
mean: sum / count
}
}
// Поток реальных метрик → scan → статистика
const metricsStream = Stream.repeatEffect(
Random.nextIntBetween(0, 100)
).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.take(100)
)
const statsStream = metricsStream.pipe(
Stream.scan(emptyStats, updateStats),
Stream.drop(1), // пропускаем начальное пустое состояние
Stream.filter((stats) => stats.count % 10 === 0), // каждые 10 элементов
Stream.runForEach((stats) =>
Effect.sync(() =>
console.log(
`[${stats.count}] mean=${stats.mean.toFixed(1)} ` +
`min=${stats.min} max=${stats.max}`
)
)
)
)
Упражнения
🟢 Basic
Упражнение 1: Используя Stream.scan, вычислите нарастающее произведение для потока [1, 2, 3, 4, 5].
Решение:
import { Stream, Effect } from "effect"
const program = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.scan(1, (acc, n) => acc * n),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 2, 6, 24, 120 ] }
🟡 Intermediate
Упражнение 2: Реализуйте потоковый подсчёт частоты слов (word frequency) с помощью scan. Для потока слов эмитируйте обновлённую таблицу частот после каждого слова.
Решение:
import { Stream, Effect, HashMap } from "effect"
const words = Stream.make("hello", "world", "hello", "effect", "hello", "world")
const program = words.pipe(
Stream.scan(
HashMap.empty<string, number>(),
(freq, word) =>
HashMap.set(freq, word, (HashMap.get(freq, word).pipe(
(opt) => opt._tag === "Some" ? opt.value + 1 : 1
)))
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
🔴 Advanced
Упражнение 3: Реализуйте exponential moving average (EMA) через scan с коэффициентом α=0.3. EMA(t) = α × value(t) + (1-α) × EMA(t-1). Примените к потоку случайных метрик.
Решение:
import { Stream, Effect, Random } from "effect"
const alpha = 0.3
const ema = Stream.repeatEffect(Random.nextIntBetween(50, 150)).pipe(
Stream.take(20),
Stream.scan(0, (prevEma, value) =>
alpha * value + (1 - alpha) * prevEma
),
Stream.drop(1), // пропускаем начальное значение 0
Stream.runCollect
)
Effect.runPromise(ema).then(console.log)
🔗 Далее: 06-error-handling.md — catchAll, orElse, retry в стримах