Effect Курс Chunking
Глава

Chunking

Чанкинг — механизм управления размером и структурой пакетов данных внутри потока. Правильная настройка чанков определяет баланс между latency и throughput, а также эффективность batch-операций

Теория

Почему чанкинг критически важен

Stream в Effect-ts обрабатывает данные не поэлементно, а чанками (Chunk<A>). Это архитектурное решение даёт колоссальный прирост производительности:

Поэлементная обработка:
  for each element:
    call transform()     ← overhead вызова функции
    allocate result      ← давление на GC
    yield to consumer    ← context switch

Чанковая обработка:
  for each chunk (1000 elements):
    call transform()     ← 1 вызов на 1000 элементов
    allocate chunk       ← 1 аллокация на 1000 элементов
    yield to consumer    ← 1 context switch на 1000 элементов

Прирост: ~100-1000x меньше overhead

Размер чанка и trade-offs

Размер чанка

 1    │  ● Минимальная задержка, максимальный overhead

 10   │  ● Real-time системы (events, WebSocket)

 100  │  ● Общее назначение (API responses)

 1K   │  ⬤ Оптимум для большинства задач

 10K  │  ● Batch-обработка (ETL, analytics)

 100K │  ● Максимальный throughput, высокая задержка

      └────────────────────────────────────────────►
           Overhead ◄──────────── Throughput ──────►

Chunk — фундамент потоковой обработки

Структура Chunk

Chunk — иммутабельная последовательность с O(1) append/prepend (амортизированно):

import { Chunk } from "effect"

// Создание
const c1 = Chunk.make(1, 2, 3)
const c2 = Chunk.fromIterable([4, 5, 6])
const c3 = Chunk.empty<number>()

// Операции (все возвращают новый Chunk)
const appended = Chunk.append(c1, 4)          // [1, 2, 3, 4]
const prepended = Chunk.prepend(c1, 0)        // [0, 1, 2, 3]
const concatenated = Chunk.appendAll(c1, c2)  // [1, 2, 3, 4, 5, 6]

// Трансформации
const mapped = Chunk.map(c1, (n) => n * 2)    // [2, 4, 6]
const filtered = Chunk.filter(c1, (n) => n > 1) // [2, 3]

// Свёртка
const sum = Chunk.reduce(c1, 0, (acc, n) => acc + n) // 6

// Доступ
const head = Chunk.head(c1)    // Option.some(1)
const size = Chunk.size(c1)    // 3
const arr = Chunk.toArray(c1)  // [1, 2, 3]

Как Stream использует Chunk внутри

Каждый шаг потока эмитирует Chunk<A>:

Stream.make(1, 2, 3, 4, 5, 6, 7, 8)

Внутренне:
  Step 1: emit Chunk[1, 2, 3, 4, 5, 6, 7, 8]  (один чанк)

Stream.fromIterable([1,2,3]).pipe(Stream.concat(Stream.fromIterable([4,5,6])))

Внутренне:
  Step 1: emit Chunk[1, 2, 3]  (чанк из первого потока)
  Step 2: emit Chunk[4, 5, 6]  (чанк из второго потока)

Трансформации типа map, filter применяются к каждому чанку целиком, не разбивая его на отдельные элементы.


Перегруппировка чанков

Stream.rechunk — изменение размера чанков

rechunk перегруппирует элементы потока в чанки указанного размера:

import { Stream, Effect } from "effect"

const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.rechunk(2)
)

// Теперь элементы сгруппированы в чанки по 2:
// Chunk[1, 2], Chunk[3, 4], Chunk[5]

const program = Effect.gen(function* () {
  const getChunk = yield* Stream.toPull(stream)

  console.log(yield* getChunk)  // Chunk[1, 2]
  console.log(yield* getChunk)  // Chunk[3, 4]
  console.log(yield* getChunk)  // Chunk[5]
})

Effect.runPromise(Effect.scoped(program))

⚠️ Важно: rechunk не меняет элементы потока — только их группировку на уровне чанков. С точки зрения runCollect или runForEach разницы нет.

Stream.chunksWith — доступ к чанковой структуре

Для операций, учитывающих границы чанков:

import { Stream, Chunk, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.rechunk(2),
  Stream.mapChunks((chunk) => {
    console.log(`Processing chunk of size ${Chunk.size(chunk)}`)
    return Chunk.map(chunk, (n) => n * 10)
  })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Processing chunk of size 2
Processing chunk of size 2
Processing chunk of size 1
{ _id: 'Chunk', values: [ 10, 20, 30, 40, 50 ] }
*/

Stream.unchunks — развёртывание чанков в элементы

Противоположность grouped — преобразует поток чанков в поток элементов:

import { Stream, Chunk, Effect } from "effect"

// Поток чанков
const chunkedStream: Stream.Stream<Chunk.Chunk<number>> = Stream.make(
  Chunk.make(1, 2, 3),
  Chunk.make(4, 5),
  Chunk.make(6)
)

// Развёртывание в плоский поток
const flat = chunkedStream.pipe(Stream.unchunks)

Effect.runPromise(Stream.runCollect(flat)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

Группировка элементов

Stream.grouped — группировка по размеру

Разделяет поток на подчанки фиксированного размера:

import { Stream, Effect } from "effect"

const stream = Stream.range(0, 8).pipe(Stream.grouped(3))

Effect.runPromise(Stream.runCollect(stream)).then((chunks) =>
  console.log("%o", chunks)
)
/*
{
  _id: 'Chunk',
  values: [
    { _id: 'Chunk', values: [ 0, 1, 2 ] },
    { _id: 'Chunk', values: [ 3, 4, 5 ] },
    { _id: 'Chunk', values: [ 6, 7, 8 ] }
  ]
}
*/

Тип результата: Stream<Chunk<A>> — поток чанков.


Группировка по времени и размеру

Stream.groupedWithin — гибридная группировка

Группирует элементы по размеру ИЛИ по времени — что наступит раньше:

import { Stream, Schedule, Effect, Chunk } from "effect"

const stream = Stream.range(0, 9).pipe(
  Stream.repeat(Schedule.spaced("1 second")),
  Stream.groupedWithin(18, "1.5 seconds"),
  Stream.take(3)
)

Effect.runPromise(Stream.runCollect(stream)).then((chunks) =>
  console.log(Chunk.toArray(chunks))
)

Это критически важный оператор для production-систем, где нужно балансировать между:

  • Batch-эффективностью: накапливаем больше элементов для batch-вставки
  • Максимальной задержкой: не задерживаем данные дольше указанного таймаута
Сценарий: batch-запись в БД

  Высокая нагрузка: 1000 events/sec
    → groupedWithin(100, "5 seconds")
    → чанки по 100 каждые 100мс (размер срабатывает раньше)

  Низкая нагрузка: 1 event/sec
    → groupedWithin(100, "5 seconds")
    → чанки по 5 каждые 5 сек (таймаут срабатывает раньше)

  Результат: адаптивный батчинг без ручной настройки

Группировка по ключу

Stream.groupByKey — партиционирование по ключу

import { Stream, GroupBy, Effect, Chunk } from "effect"

class Exam {
  constructor(
    readonly person: string,
    readonly score: number
  ) {}
}

const examResults = [
  new Exam("Alex", 64),
  new Exam("Michael", 97),
  new Exam("Bill", 77),
  new Exam("John", 78),
  new Exam("Bobby", 71)
]

// Группировка по десяткам (60s, 70s, 90s)
const grouped = Stream.fromIterable(examResults).pipe(
  Stream.groupByKey((exam) => Math.floor(exam.score / 10) * 10)
)

// Обработка каждой группы
const stream = GroupBy.evaluate(grouped, (key, groupStream) =>
  Stream.fromEffect(
    Stream.runCollect(groupStream).pipe(
      Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)
    )
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 60, 1 ], [ 90, 1 ], [ 70, 3 ] ] }

Stream.groupBy — эффективная группировка

Для случаев, когда вычисление ключа требует эффекта:

import { Stream, GroupBy, Effect, Chunk } from "effect"

const grouped = Stream.fromIterable(["Mary", "James", "Robert", "Patricia"]).pipe(
  Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name]))
)

const result = GroupBy.evaluate(grouped, (key, stream) =>
  Stream.fromEffect(
    Stream.runCollect(stream).pipe(
      Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)
    )
  )
)

Effect.runPromise(Stream.runCollect(result)).then(console.log)

Партиционирование

Stream.partition — разделение по предикату

import { Stream, Effect } from "effect"

const program = Stream.range(1, 9).pipe(
  Stream.partition((n) => n % 2 === 0, { bufferSize: 5 })
)

Effect.runPromise(
  Effect.scoped(
    Effect.gen(function* () {
      const [odds, evens] = yield* program
      console.log("Odds:", yield* Stream.runCollect(odds))
      console.log("Evens:", yield* Stream.runCollect(evens))
    })
  )
)
/*
Odds: { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
Evens: { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
*/

⚠️ Результат обёрнут в Scope, поскольку partition создаёт внутренние буферы.

Stream.partitionEither — разделение с эффектом

import { Stream, Effect, Either } from "effect"

const program = Stream.range(1, 9).pipe(
  Stream.partitionEither(
    (n) => Effect.succeed(
      n % 2 === 0 ? Either.right(n) : Either.left(n)
    ),
    { bufferSize: 5 }
  )
)

Effect.runPromise(
  Effect.scoped(
    Effect.gen(function* () {
      const [lefts, rights] = yield* program
      console.log("Left:", yield* Stream.runCollect(lefts))
      console.log("Right:", yield* Stream.runCollect(rights))
    })
  )
)

API Reference

ФункцияОписание
Stream.rechunk(n)Перегруппировка в чанки размера n
Stream.grouped(n)Группировка в Stream<Chunk<A>> по n элементов
Stream.groupedWithin(n, duration)Группировка по размеру или таймауту
Stream.unchunksStream<Chunk<A>> → Stream<A>
Stream.mapChunks(f)Трансформация на уровне чанков
Stream.groupByKey(f)Группировка по ключу (чистая)
Stream.groupBy(f)Группировка по ключу (эффективная)
Stream.partition(pred)Разделение на два потока по предикату
Stream.partitionEither(f)Разделение на два потока (Either)
GroupBy.evaluate(gb, f)Обработка каждой группы

Примеры

Production: адаптивный batch-writer

import { Stream, Effect, Console, Chunk, Schedule } from "effect"

interface LogEntry {
  readonly level: string
  readonly message: string
  readonly timestamp: number
}

// Поток лог-записей
const logStream: Stream.Stream<LogEntry> = Stream.repeatEffect(
  Effect.succeed({
    level: "INFO",
    message: `Event at ${Date.now()}`,
    timestamp: Date.now()
  } as LogEntry)
).pipe(
  Stream.schedule(Schedule.spaced("100 millis")),
  Stream.take(50)
)

// Адаптивный batch writer:
// - Не более 10 записей в батче
// - Не более 1 секунды ожидания
const batchWriter = logStream.pipe(
  Stream.groupedWithin(10, "1 second"),
  Stream.mapEffect((batch) =>
    Effect.gen(function* () {
      const size = Chunk.size(batch)
      yield* Console.log(`Writing batch of ${size} entries to storage`)
      // Здесь был бы реальный batch-insert
      return size
    })
  ),
  Stream.scan(0, (total, batchSize) => total + batchSize),
  Stream.runForEach((total) =>
    Console.log(`Total written: ${total}`)
  )
)

Упражнения

🟢 Basic

Упражнение 1: Создайте поток 1-20, сгруппируйте по 5 (grouped(5)). Посчитайте сумму каждой группы.

Решение:

import { Stream, Effect, Chunk } from "effect"

const program = Stream.range(1, 20).pipe(
  Stream.grouped(5),
  Stream.map((chunk) => Chunk.reduce(chunk, 0, (a, b) => a + b)),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 15, 40, 65, 90 ] }

🟡 Intermediate

Упражнение 2: Реализуйте потоковую группировку строк по первой букве, используя groupByKey. Посчитайте количество строк в каждой группе.

Решение:

import { Stream, GroupBy, Effect, Chunk } from "effect"

const words = ["apple", "avocado", "banana", "blueberry", "cherry", "apricot"]

const program = Stream.fromIterable(words).pipe(
  Stream.groupByKey((word) => word[0]!),
  (gb) => GroupBy.evaluate(gb, (key, stream) =>
    Stream.fromEffect(
      Stream.runCollect(stream).pipe(
        Effect.andThen((chunk) => ({ letter: key, count: Chunk.size(chunk) }))
      )
    )
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)

🔴 Advanced

Упражнение 3: Реализуйте адаптивный rechunk: если средняя «стоимость» обработки элемента высокая (например, > 10мс), уменьшите размер чанка, иначе — увеличьте. Используйте mapAccum для отслеживания средней стоимости.


🔗 Далее: 05-aggregation.md — aggregate, scan, runFold