Chunking
Чанкинг — механизм управления размером и структурой пакетов данных внутри потока. Правильная настройка чанков определяет баланс между latency и throughput, а также эффективность batch-операций
Теория
Почему чанкинг критически важен
Stream в Effect-ts обрабатывает данные не поэлементно, а чанками (Chunk<A>). Это архитектурное решение даёт колоссальный прирост производительности:
Поэлементная обработка:
for each element:
call transform() ← overhead вызова функции
allocate result ← давление на GC
yield to consumer ← context switch
Чанковая обработка:
for each chunk (1000 elements):
call transform() ← 1 вызов на 1000 элементов
allocate chunk ← 1 аллокация на 1000 элементов
yield to consumer ← 1 context switch на 1000 элементов
Прирост: ~100-1000x меньше overhead
Размер чанка и trade-offs
Размер чанка
│
1 │ ● Минимальная задержка, максимальный overhead
│
10 │ ● Real-time системы (events, WebSocket)
│
100 │ ● Общее назначение (API responses)
│
1K │ ⬤ Оптимум для большинства задач
│
10K │ ● Batch-обработка (ETL, analytics)
│
100K │ ● Максимальный throughput, высокая задержка
│
└────────────────────────────────────────────►
Overhead ◄──────────── Throughput ──────►
Chunk — фундамент потоковой обработки
Структура Chunk
Chunk — иммутабельная последовательность с O(1) append/prepend (амортизированно):
import { Chunk } from "effect"
// Создание
const c1 = Chunk.make(1, 2, 3)
const c2 = Chunk.fromIterable([4, 5, 6])
const c3 = Chunk.empty<number>()
// Операции (все возвращают новый Chunk)
const appended = Chunk.append(c1, 4) // [1, 2, 3, 4]
const prepended = Chunk.prepend(c1, 0) // [0, 1, 2, 3]
const concatenated = Chunk.appendAll(c1, c2) // [1, 2, 3, 4, 5, 6]
// Трансформации
const mapped = Chunk.map(c1, (n) => n * 2) // [2, 4, 6]
const filtered = Chunk.filter(c1, (n) => n > 1) // [2, 3]
// Свёртка
const sum = Chunk.reduce(c1, 0, (acc, n) => acc + n) // 6
// Доступ
const head = Chunk.head(c1) // Option.some(1)
const size = Chunk.size(c1) // 3
const arr = Chunk.toArray(c1) // [1, 2, 3]
Как Stream использует Chunk внутри
Каждый шаг потока эмитирует Chunk<A>:
Stream.make(1, 2, 3, 4, 5, 6, 7, 8)
Внутренне:
Step 1: emit Chunk[1, 2, 3, 4, 5, 6, 7, 8] (один чанк)
Stream.fromIterable([1,2,3]).pipe(Stream.concat(Stream.fromIterable([4,5,6])))
Внутренне:
Step 1: emit Chunk[1, 2, 3] (чанк из первого потока)
Step 2: emit Chunk[4, 5, 6] (чанк из второго потока)
Трансформации типа map, filter применяются к каждому чанку целиком, не разбивая его на отдельные элементы.
Перегруппировка чанков
Stream.rechunk — изменение размера чанков
rechunk перегруппирует элементы потока в чанки указанного размера:
import { Stream, Effect } from "effect"
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.rechunk(2)
)
// Теперь элементы сгруппированы в чанки по 2:
// Chunk[1, 2], Chunk[3, 4], Chunk[5]
const program = Effect.gen(function* () {
const getChunk = yield* Stream.toPull(stream)
console.log(yield* getChunk) // Chunk[1, 2]
console.log(yield* getChunk) // Chunk[3, 4]
console.log(yield* getChunk) // Chunk[5]
})
Effect.runPromise(Effect.scoped(program))
⚠️ Важно: rechunk не меняет элементы потока — только их группировку на уровне чанков. С точки зрения runCollect или runForEach разницы нет.
Stream.chunksWith — доступ к чанковой структуре
Для операций, учитывающих границы чанков:
import { Stream, Chunk, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.rechunk(2),
Stream.mapChunks((chunk) => {
console.log(`Processing chunk of size ${Chunk.size(chunk)}`)
return Chunk.map(chunk, (n) => n * 10)
})
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Processing chunk of size 2
Processing chunk of size 2
Processing chunk of size 1
{ _id: 'Chunk', values: [ 10, 20, 30, 40, 50 ] }
*/
Stream.unchunks — развёртывание чанков в элементы
Противоположность grouped — преобразует поток чанков в поток элементов:
import { Stream, Chunk, Effect } from "effect"
// Поток чанков
const chunkedStream: Stream.Stream<Chunk.Chunk<number>> = Stream.make(
Chunk.make(1, 2, 3),
Chunk.make(4, 5),
Chunk.make(6)
)
// Развёртывание в плоский поток
const flat = chunkedStream.pipe(Stream.unchunks)
Effect.runPromise(Stream.runCollect(flat)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
Группировка элементов
Stream.grouped — группировка по размеру
Разделяет поток на подчанки фиксированного размера:
import { Stream, Effect } from "effect"
const stream = Stream.range(0, 8).pipe(Stream.grouped(3))
Effect.runPromise(Stream.runCollect(stream)).then((chunks) =>
console.log("%o", chunks)
)
/*
{
_id: 'Chunk',
values: [
{ _id: 'Chunk', values: [ 0, 1, 2 ] },
{ _id: 'Chunk', values: [ 3, 4, 5 ] },
{ _id: 'Chunk', values: [ 6, 7, 8 ] }
]
}
*/
Тип результата: Stream<Chunk<A>> — поток чанков.
Группировка по времени и размеру
Stream.groupedWithin — гибридная группировка
Группирует элементы по размеру ИЛИ по времени — что наступит раньше:
import { Stream, Schedule, Effect, Chunk } from "effect"
const stream = Stream.range(0, 9).pipe(
Stream.repeat(Schedule.spaced("1 second")),
Stream.groupedWithin(18, "1.5 seconds"),
Stream.take(3)
)
Effect.runPromise(Stream.runCollect(stream)).then((chunks) =>
console.log(Chunk.toArray(chunks))
)
Это критически важный оператор для production-систем, где нужно балансировать между:
- Batch-эффективностью: накапливаем больше элементов для batch-вставки
- Максимальной задержкой: не задерживаем данные дольше указанного таймаута
Сценарий: batch-запись в БД
Высокая нагрузка: 1000 events/sec
→ groupedWithin(100, "5 seconds")
→ чанки по 100 каждые 100мс (размер срабатывает раньше)
Низкая нагрузка: 1 event/sec
→ groupedWithin(100, "5 seconds")
→ чанки по 5 каждые 5 сек (таймаут срабатывает раньше)
Результат: адаптивный батчинг без ручной настройки
Группировка по ключу
Stream.groupByKey — партиционирование по ключу
import { Stream, GroupBy, Effect, Chunk } from "effect"
class Exam {
constructor(
readonly person: string,
readonly score: number
) {}
}
const examResults = [
new Exam("Alex", 64),
new Exam("Michael", 97),
new Exam("Bill", 77),
new Exam("John", 78),
new Exam("Bobby", 71)
]
// Группировка по десяткам (60s, 70s, 90s)
const grouped = Stream.fromIterable(examResults).pipe(
Stream.groupByKey((exam) => Math.floor(exam.score / 10) * 10)
)
// Обработка каждой группы
const stream = GroupBy.evaluate(grouped, (key, groupStream) =>
Stream.fromEffect(
Stream.runCollect(groupStream).pipe(
Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 60, 1 ], [ 90, 1 ], [ 70, 3 ] ] }
Stream.groupBy — эффективная группировка
Для случаев, когда вычисление ключа требует эффекта:
import { Stream, GroupBy, Effect, Chunk } from "effect"
const grouped = Stream.fromIterable(["Mary", "James", "Robert", "Patricia"]).pipe(
Stream.groupBy((name) => Effect.succeed([name.substring(0, 1), name]))
)
const result = GroupBy.evaluate(grouped, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.andThen((chunk) => [key, Chunk.size(chunk)] as const)
)
)
)
Effect.runPromise(Stream.runCollect(result)).then(console.log)
Партиционирование
Stream.partition — разделение по предикату
import { Stream, Effect } from "effect"
const program = Stream.range(1, 9).pipe(
Stream.partition((n) => n % 2 === 0, { bufferSize: 5 })
)
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const [odds, evens] = yield* program
console.log("Odds:", yield* Stream.runCollect(odds))
console.log("Evens:", yield* Stream.runCollect(evens))
})
)
)
/*
Odds: { _id: 'Chunk', values: [ 1, 3, 5, 7, 9 ] }
Evens: { _id: 'Chunk', values: [ 2, 4, 6, 8 ] }
*/
⚠️ Результат обёрнут в Scope, поскольку partition создаёт внутренние буферы.
Stream.partitionEither — разделение с эффектом
import { Stream, Effect, Either } from "effect"
const program = Stream.range(1, 9).pipe(
Stream.partitionEither(
(n) => Effect.succeed(
n % 2 === 0 ? Either.right(n) : Either.left(n)
),
{ bufferSize: 5 }
)
)
Effect.runPromise(
Effect.scoped(
Effect.gen(function* () {
const [lefts, rights] = yield* program
console.log("Left:", yield* Stream.runCollect(lefts))
console.log("Right:", yield* Stream.runCollect(rights))
})
)
)
API Reference
| Функция | Описание |
|---|---|
Stream.rechunk(n) | Перегруппировка в чанки размера n |
Stream.grouped(n) | Группировка в Stream<Chunk<A>> по n элементов |
Stream.groupedWithin(n, duration) | Группировка по размеру или таймауту |
Stream.unchunks | Stream<Chunk<A>> → Stream<A> |
Stream.mapChunks(f) | Трансформация на уровне чанков |
Stream.groupByKey(f) | Группировка по ключу (чистая) |
Stream.groupBy(f) | Группировка по ключу (эффективная) |
Stream.partition(pred) | Разделение на два потока по предикату |
Stream.partitionEither(f) | Разделение на два потока (Either) |
GroupBy.evaluate(gb, f) | Обработка каждой группы |
Примеры
Production: адаптивный batch-writer
import { Stream, Effect, Console, Chunk, Schedule } from "effect"
interface LogEntry {
readonly level: string
readonly message: string
readonly timestamp: number
}
// Поток лог-записей
const logStream: Stream.Stream<LogEntry> = Stream.repeatEffect(
Effect.succeed({
level: "INFO",
message: `Event at ${Date.now()}`,
timestamp: Date.now()
} as LogEntry)
).pipe(
Stream.schedule(Schedule.spaced("100 millis")),
Stream.take(50)
)
// Адаптивный batch writer:
// - Не более 10 записей в батче
// - Не более 1 секунды ожидания
const batchWriter = logStream.pipe(
Stream.groupedWithin(10, "1 second"),
Stream.mapEffect((batch) =>
Effect.gen(function* () {
const size = Chunk.size(batch)
yield* Console.log(`Writing batch of ${size} entries to storage`)
// Здесь был бы реальный batch-insert
return size
})
),
Stream.scan(0, (total, batchSize) => total + batchSize),
Stream.runForEach((total) =>
Console.log(`Total written: ${total}`)
)
)
Упражнения
🟢 Basic
Упражнение 1: Создайте поток 1-20, сгруппируйте по 5 (grouped(5)). Посчитайте сумму каждой группы.
Решение:
import { Stream, Effect, Chunk } from "effect"
const program = Stream.range(1, 20).pipe(
Stream.grouped(5),
Stream.map((chunk) => Chunk.reduce(chunk, 0, (a, b) => a + b)),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 15, 40, 65, 90 ] }
🟡 Intermediate
Упражнение 2: Реализуйте потоковую группировку строк по первой букве, используя groupByKey. Посчитайте количество строк в каждой группе.
Решение:
import { Stream, GroupBy, Effect, Chunk } from "effect"
const words = ["apple", "avocado", "banana", "blueberry", "cherry", "apricot"]
const program = Stream.fromIterable(words).pipe(
Stream.groupByKey((word) => word[0]!),
(gb) => GroupBy.evaluate(gb, (key, stream) =>
Stream.fromEffect(
Stream.runCollect(stream).pipe(
Effect.andThen((chunk) => ({ letter: key, count: Chunk.size(chunk) }))
)
)
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
🔴 Advanced
Упражнение 3: Реализуйте адаптивный rechunk: если средняя «стоимость» обработки элемента высокая (например, > 10мс), уменьшите размер чанка, иначе — увеличьте. Используйте mapAccum для отслеживания средней стоимости.
🔗 Далее: 05-aggregation.md — aggregate, scan, runFold