Effect Курс Агрегация
Глава

Агрегация

Агрегация — процесс сведения потока множества значений к сводным результатам. В Effect-ts агрегация реализована через scan (промежуточные результаты), fold (финальный результат) и aggregate (через Sink), обеспечивая как потоковую, так и batch-обработку.

Теория

Катаморфизм: от структуры к значению

Агрегация — это катаморфизм: структурная рекурсия, разрушающая коллекцию и производящая единичное значение (или поток промежуточных значений):

fold  (катаморфизм): Stream<A> → B
                     [1, 2, 3, 4, 5] → 15 (sum)

scan  (промежуточный катаморфизм): Stream<A> → Stream<B>
                     [1, 2, 3, 4, 5] → [0, 1, 3, 6, 10, 15]

Три уровня агрегации

┌─────────────────────────────────────────────────┐
│  Уровень 1: scan                                │
│  Stream<A> → Stream<S>                          │
│  Эмитирует каждый промежуточный результат       │
│  Поток остаётся потоком                         │
├─────────────────────────────────────────────────┤
│  Уровень 2: runFold                             │
│  Stream<A> → Effect<S>                          │
│  Возвращает только финальный результат          │
│  Поток сворачивается в Effect                   │
├─────────────────────────────────────────────────┤
│  Уровень 3: aggregate                           │
│  Stream<A> → Stream<B> (через Sink)             │
│  Sink потребляет часть элементов, эмитирует     │
│  сводное значение, затем повторяет              │
└─────────────────────────────────────────────────┘

Scan — кумулятивная аккумуляция

Stream.scan — базовый scan

scan применяет аккумулирующую функцию к каждому элементу и эмитирует каждый промежуточный результат:

import { Stream, Effect } from "effect"

// Нарастающая сумма
const runningSum = Stream.range(1, 5).pipe(
  Stream.scan(0, (acc, n) => acc + n)
)

Effect.runPromise(Stream.runCollect(runningSum)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
//                           ↑  ↑  ↑   ↑   ↑   ↑
//                         init +1 +2  +3  +4  +5

Важные свойства:

  • Первый эмитируемый элемент — начальное значение (init)
  • Для N входных элементов эмитируется N+1 выходных
  • Каждое выходное значение зависит от всех предыдущих входных

Stream.scanEffect — эффективный scan

Когда аккумулирующая функция — эффект:

import { Stream, Effect, Console } from "effect"

const stream = Stream.range(1, 5).pipe(
  Stream.scanEffect(0, (acc, n) =>
    Console.log(`Accumulating: ${acc} + ${n} = ${acc + n}`).pipe(
      Effect.as(acc + n)
    )
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Accumulating: 0 + 1 = 1
Accumulating: 1 + 2 = 3
Accumulating: 3 + 3 = 6
Accumulating: 6 + 4 = 10
Accumulating: 10 + 5 = 15
{ _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
*/

Паттерны использования scan

import { Stream, Effect } from "effect"

// Running maximum
const runningMax = Stream.make(3, 1, 4, 1, 5, 9, 2, 6).pipe(
  Stream.scan(-Infinity, (max, n) => Math.max(max, n))
)

// Running count
const runningCount = Stream.make("a", "b", "c", "d").pipe(
  Stream.scan(0, (count, _) => count + 1)
)

// Running average
const runningAvg = Stream.make(10, 20, 30, 40, 50).pipe(
  Stream.scan(
    { sum: 0, count: 0 } as { readonly sum: number; readonly count: number },
    (acc, n) => ({ sum: acc.sum + n, count: acc.count + 1 })
  ),
  Stream.map((acc) =>
    acc.count === 0 ? 0 : acc.sum / acc.count
  )
)

Effect.runPromise(Stream.runCollect(runningAvg)).then(console.log)
// { _id: 'Chunk', values: [ 0, 10, 15, 20, 25, 30 ] }

Fold — свёртка потока

Stream.runFold — полная свёртка

Сворачивает весь поток в единственное значение:

import { Stream, Effect } from "effect"

const sum = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.runFold(0, (acc, n) => acc + n)
)

Effect.runPromise(sum).then(console.log) // 15

Результат — Effect<S, E, R>, а не Stream — поток полностью потреблён.

Stream.runFoldWhile — свёртка с условием

Сворачивает элементы, пока условие на аккумулятор выполняется:

import { Stream, Effect } from "effect"

// Суммируем, пока сумма <= 10
const result = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.runFoldWhile(
    0,
    (sum) => sum <= 10,  // продолжаем пока sum ≤ 10
    (acc, n) => acc + n
  )
)

Effect.runPromise(result).then(console.log) // 15
// 0+1=1 (≤10 ✓), 1+2=3 (≤10 ✓), 3+3=6 (≤10 ✓), 6+4=10 (≤10 ✓), 10+5=15 (>10, но уже применён)

Stream.runFoldEffect — свёртка с эффектами

import { Stream, Effect, Console } from "effect"

const result = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.runFoldEffect(0, (acc, n) =>
    Console.log(`${acc} + ${n}`).pipe(Effect.as(acc + n))
  )
)

Effect.runPromise(result).then(console.log)

Aggregate — агрегация через Sink

Stream.aggregate — потоковая агрегация

aggregate применяет Sink к потоку повторно: Sink потребляет часть элементов, эмитирует результат, затем процесс повторяется для оставшихся элементов:

import { Stream, Sink, Effect } from "effect"

// Суммируем потоком по 3 элемента
const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9).pipe(
  Stream.aggregate(
    Sink.foldUntil(0, 3, (acc, n: number) => acc + n)
    //                  ↑
    //          после 3 элементов Sink завершается
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 6, 15, 24 ] }
// (1+2+3=6), (4+5+6=15), (7+8+9=24)

Stream.aggregateWithin — агрегация с расписанием

Совмещает агрегацию через Sink с временными ограничениями через Schedule:

import { Stream, Sink, Schedule, Effect } from "effect"

// Собираем элементы:
// - Максимум 5 элементов
// - ИЛИ каждые 2 секунды
const stream = Stream.range(1, 20).pipe(
  Stream.schedule(Schedule.spaced("500 millis")),
  Stream.aggregateWithin(
    Sink.collectAllN<number>(5),
    Schedule.spaced("2 seconds")
  )
)

Потоковые аккумуляторы

Использование Sink с Stream.run

import { Stream, Sink, Effect } from "effect"

// Сумма через Sink
const sum = Stream.make(1, 2, 3).pipe(
  Stream.run(Sink.sum)
)

Effect.runPromise(sum).then(console.log) // 6

// Подсчёт элементов
const count = Stream.make("a", "b", "c", "d").pipe(
  Stream.run(Sink.count)
)

Effect.runPromise(count).then(console.log) // 4

Комбинирование Sink для множественной агрегации

import { Stream, Sink, Effect } from "effect"

// Одновременно считаем сумму и количество
const combined = Sink.zip(Sink.sum, Sink.count)

const result = Stream.make(10, 20, 30, 40, 50).pipe(
  Stream.run(combined)
)

Effect.runPromise(result).then(console.log)
// [150, 5]

API Reference

ФункцияТипОписание
Stream.scan(init, f)→ Stream<S>Эмитирует промежуточные результаты
Stream.scanEffect(init, f)→ Stream<S>Scan с эффективной функцией
Stream.runFold(init, f)→ Effect<S>Полная свёртка
Stream.runFoldWhile(init, pred, f)→ Effect<S>Свёртка с условием
Stream.runFoldEffect(init, f)→ Effect<S>Свёртка с эффектами
Stream.aggregate(sink)→ Stream<B>Повторная агрегация через Sink
Stream.aggregateWithin(sink, sched)→ Stream<B>Агрегация + расписание
Stream.run(sink)→ Effect<B>Запуск с Sink

Примеры

Production: статистика в реальном времени

import { Stream, Effect, Schedule, Random } from "effect"

interface Stats {
  readonly count: number
  readonly sum: number
  readonly min: number
  readonly max: number
  readonly mean: number
}

const emptyStats: Stats = {
  count: 0, sum: 0, min: Infinity, max: -Infinity, mean: 0
}

const updateStats = (stats: Stats, value: number): Stats => {
  const count = stats.count + 1
  const sum = stats.sum + value
  return {
    count,
    sum,
    min: Math.min(stats.min, value),
    max: Math.max(stats.max, value),
    mean: sum / count
  }
}

// Поток реальных метрик → scan → статистика
const metricsStream = Stream.repeatEffect(
  Random.nextIntBetween(0, 100)
).pipe(
  Stream.schedule(Schedule.spaced("100 millis")),
  Stream.take(100)
)

const statsStream = metricsStream.pipe(
  Stream.scan(emptyStats, updateStats),
  Stream.drop(1), // пропускаем начальное пустое состояние
  Stream.filter((stats) => stats.count % 10 === 0), // каждые 10 элементов
  Stream.runForEach((stats) =>
    Effect.sync(() =>
      console.log(
        `[${stats.count}] mean=${stats.mean.toFixed(1)} ` +
        `min=${stats.min} max=${stats.max}`
      )
    )
  )
)

Упражнения

🟢 Basic

Упражнение 1: Используя Stream.scan, вычислите нарастающее произведение для потока [1, 2, 3, 4, 5].

Решение:

import { Stream, Effect } from "effect"

const program = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.scan(1, (acc, n) => acc * n),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 2, 6, 24, 120 ] }

🟡 Intermediate

Упражнение 2: Реализуйте потоковый подсчёт частоты слов (word frequency) с помощью scan. Для потока слов эмитируйте обновлённую таблицу частот после каждого слова.

Решение:

import { Stream, Effect, HashMap } from "effect"

const words = Stream.make("hello", "world", "hello", "effect", "hello", "world")

const program = words.pipe(
  Stream.scan(
    HashMap.empty<string, number>(),
    (freq, word) =>
      HashMap.set(freq, word, (HashMap.get(freq, word).pipe(
        (opt) => opt._tag === "Some" ? opt.value + 1 : 1
      )))
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)

🔴 Advanced

Упражнение 3: Реализуйте exponential moving average (EMA) через scan с коэффициентом α=0.3. EMA(t) = α × value(t) + (1-α) × EMA(t-1). Примените к потоку случайных метрик.

Решение:

import { Stream, Effect, Random } from "effect"

const alpha = 0.3

const ema = Stream.repeatEffect(Random.nextIntBetween(50, 150)).pipe(
  Stream.take(20),
  Stream.scan(0, (prevEma, value) =>
    alpha * value + (1 - alpha) * prevEma
  ),
  Stream.drop(1), // пропускаем начальное значение 0
  Stream.runCollect
)

Effect.runPromise(ema).then(console.log)

🔗 Далее: 06-error-handling.md — catchAll, orElse, retry в стримах