Трансформации
Трансформации — это сердце потоковой обработки. Они позволяют декларативно описывать пайплайны преобразования данных, где каждый шаг является чистой функцией, а композиция трансформаций ленива и не создаёт промежуточных коллекций.
Теория
Функтор, монада и фильтрация
Трансформации Stream отражают фундаментальные абстракции функционального программирования:
map : Функтор — Structure-preserving transformation
Stream<A> → (A → B) → Stream<B>
Сохраняет количество и порядок элементов
filter : MonadPlus — Structure-reducing operation
Stream<A> → (A → boolean) → Stream<A>
Может уменьшить количество элементов
flatMap: Монада — Structure-expanding + flattening
Stream<A> → (A → Stream<B>) → Stream<B>
Может изменить количество элементов (0..∞ за каждый)
Ключевой принцип: все трансформации ленивы. Они не выполняют никаких вычислений до запуска потока через run*-функцию. Трансформации формируют описание пайплайна, а не выполняют его:
Stream.make(1, 2, 3) // описание: [1, 2, 3]
.pipe(Stream.map(n => n*2)) // описание: map(×2)
.pipe(Stream.filter(…)) // описание: filter(…)
// Ничего не вычислено!
Stream.runCollect(…) // ← тут начинается выполнение
Mapping — преобразование элементов
Stream.map — чистое преобразование
Применяет чистую функцию к каждому элементу:
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.map((n) => n * 2)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6 ] }
Гарантии map:
- Количество элементов не меняется
- Порядок сохраняется
- Каждый элемент обрабатывается ровно один раз
Stream.mapEffect — эффективное преобразование
Когда трансформация требует выполнения побочных эффектов:
import { Stream, Random, Effect } from "effect"
const stream = Stream.make(10, 20, 30).pipe(
Stream.mapEffect((n) => Random.nextIntBetween(0, n))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Пример: { _id: 'Chunk', values: [ 5, 9, 22 ] }
С опцией конкурентности эффекты выполняются параллельно, но порядок результатов сохраняется:
import { Stream, Effect } from "effect"
const fetchUrl = (url: string) =>
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return `Result for ${url}`
})
const stream = Stream.make("url1", "url2", "url3").pipe(
Stream.mapEffect(fetchUrl, { concurrency: 2 })
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
Stream.as — замена значения константой
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 5).pipe(Stream.as(null))
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ null, null, null, null, null ] }
Stream.mapAccum — маппинг с аккумулятором
Позволяет трансформировать элементы, сохраняя состояние между вызовами:
import { Stream, Effect } from "effect"
// Нарастающая сумма (running total)
const stream = Stream.range(1, 5).pipe(
Stream.mapAccum(0, (state, n) => [state + n, state + n])
// ↑ ↑ ↑
// начальное новое эмитируемое
// состояние состояние значение
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 3, 6, 10, 15 ] }
Stream.mapConcat — маппинг с развёртыванием
Каждый элемент преобразуется в Iterable, затем все результаты развёртываются в плоский поток:
import { Stream, Effect } from "effect"
const stream = Stream.make("1-2-3", "4-5", "6").pipe(
Stream.mapConcat((s) => s.split("-"))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ '1', '2', '3', '4', '5', '6' ] }
Filtering — фильтрация
Stream.filter — чистая фильтрация
import { Stream, Effect } from "effect"
const evens = Stream.range(1, 10).pipe(
Stream.filter((n) => n % 2 === 0)
)
Effect.runPromise(Stream.runCollect(evens)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
Stream.filterEffect — эффективная фильтрация
Когда предикат фильтрации — это эффект:
import { Stream, Effect, Random } from "effect"
// Случайно пропускаем элементы (50% шанс)
const stream = Stream.range(1, 10).pipe(
Stream.filterEffect((n) =>
Random.nextBoolean.pipe(
Effect.map((keep) => keep)
)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
Stream.changes — дедупликация последовательных значений
Эмитирует элемент только если он отличается от предыдущего:
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }
FlatMap — развёртывание и конкатенация
Концепция
flatMap — самая мощная трансформация. Каждый элемент исходного потока преобразуется в новый поток, а все результирующие потоки конкатенируются:
Исходный: [A, B, C]
flatMap(f):
A → Stream[a1, a2]
B → Stream[b1, b2, b3]
C → Stream[c1]
Результат: [a1, a2, b1, b2, b3, c1]
Stream.flatMap — последовательная конкатенация
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap((a) =>
Stream.repeatValue(a).pipe(Stream.take(4))
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ] }
Конкурентный flatMap
С опцией concurrency внутренние потоки выполняются параллельно:
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap(
(n) =>
Stream.fromEffect(
Effect.sleep(`${n * 100} millis`).pipe(
Effect.as(n)
)
),
{ concurrency: 3 }
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок зависит от времени выполнения
Switch-режим flatMap
С опцией switch: true при появлении нового элемента предыдущий внутренний поток отменяется:
import { Stream, Effect, Schedule } from "effect"
// При каждом новом элементе переключаемся на новый внутренний поток
const stream = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("200 millis")),
Stream.flatMap(
(n) =>
Stream.repeatValue(n).pipe(
Stream.schedule(Schedule.spaced("50 millis")),
Stream.take(10)
),
{ switch: true }
)
)
Полезно для UI-сценариев (autocomplete), где нужен только результат последнего запроса.
Taking и Dropping — извлечение подмножеств
Taking — взятие элементов
| Функция | Описание |
|---|---|
Stream.take(n) | Первые N элементов |
Stream.takeWhile(pred) | Пока предикат true |
Stream.takeUntil(pred) | До первого true (включительно) |
Stream.takeRight(n) | Последние N элементов |
import { Stream, Effect } from "effect"
// take — первые N
const first3 = Stream.range(1, 10).pipe(Stream.take(3))
// [1, 2, 3]
// takeWhile — пока условие верно
const lessThan5 = Stream.range(1, 10).pipe(
Stream.takeWhile((n) => n < 5)
)
// [1, 2, 3, 4]
// takeUntil — до выполнения условия (включая)
const untilFive = Stream.range(1, 10).pipe(
Stream.takeUntil((n) => n === 5)
)
// [1, 2, 3, 4, 5]
Dropping — пропуск элементов
| Функция | Описание |
|---|---|
Stream.drop(n) | Пропустить первые N |
Stream.dropWhile(pred) | Пропускать пока предикат true |
Stream.dropUntil(pred) | Пропускать до первого true |
import { Stream, Effect } from "effect"
// drop — пропустить первые 3
const skip3 = Stream.range(1, 10).pipe(Stream.drop(3))
// [4, 5, 6, 7, 8, 9, 10]
// dropWhile — пропускать пока < 5
const from5 = Stream.range(1, 10).pipe(
Stream.dropWhile((n) => n < 5)
)
// [5, 6, 7, 8, 9, 10]
Tapping — наблюдение без изменения
Stream.tap позволяет выполнить побочный эффект для каждого элемента без изменения потока:
import { Stream, Console, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`before: ${n}`)),
Stream.map((n) => n * 2),
Stream.tap((n) => Console.log(`after: ${n}`))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
before: 1
after: 2
before: 2
after: 4
before: 3
after: 6
{ _id: 'Chunk', values: [ 2, 4, 6 ] }
*/
Тип потока не меняется: Stream<A> → Stream<A>.
Zipping — объединение потоков попарно
Stream.zip — попарное объединение
import { Stream, Effect } from "effect"
const stream = Stream.zip(
Stream.make(1, 2, 3, 4, 5, 6),
Stream.make("a", "b", "c")
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'c' ] ] }
Поток завершается, когда один из входных потоков исчерпан.
Stream.zipWith — с пользовательской функцией
import { Stream, Effect } from "effect"
const stream = Stream.zipWith(
Stream.make(1, 2, 3),
Stream.make("a", "b", "c"),
(n, s) => `${s}:${n}`
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 'a:1', 'b:2', 'c:3' ] }
Stream.zipAll / zipAllWith — с дефолтами
Если один поток короче, используются дефолтные значения:
import { Stream, Effect } from "effect"
const stream = Stream.zipAll(Stream.make(1, 2, 3, 4), {
other: Stream.make("a", "b"),
defaultSelf: 0,
defaultOther: "z"
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'z' ], [ 4, 'z' ] ] }
Stream.zipWithIndex — нумерация элементов
import { Stream, Effect } from "effect"
const stream = Stream.make("a", "b", "c").pipe(Stream.zipWithIndex)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 'a', 0 ], [ 'b', 1 ], [ 'c', 2 ] ] }
Scanning — кумулятивная аккумуляция
Stream.scan аккумулирует значения и эмитирует каждый промежуточный результат:
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 5).pipe(
Stream.scan(0, (acc, n) => acc + n)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
// ↑ ↑ ↑ ↑ ↑ ↑
// init +1 +2 +3 +4 +5
Обратите внимание: scan эмитирует начальное значение первым. Поток из N элементов даёт N+1 значений.
Concatenation — конкатенация потоков
Stream.concat — последовательная конкатенация
import { Stream, Effect } from "effect"
const stream = Stream.concat(
Stream.make(1, 2, 3),
Stream.make("a", "b")
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'a', 'b' ] }
Stream.concatAll — конкатенация массива потоков
import { Stream, Chunk, Effect } from "effect"
const stream = Stream.concatAll<number | string | boolean, never, never>(
Chunk.make(
Stream.make(1, 2),
Stream.make("a", "b"),
Stream.make(true, false)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 'a', 'b', true, false ] }
Interspersing — вставка разделителей
Stream.intersperse — разделитель между элементами
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.intersperse(0)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ] }
Stream.intersperseAffixes — с префиксом и суффиксом
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.intersperseAffixes({
start: "[",
middle: ",",
end: "]"
})
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ '[', 1, ',', 2, ',', 3, ']' ] }
Прочие трансформации
Stream.drain — игнорирование значений
Выполняет все эффекты потока, но отбрасывает значения:
import { Stream, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.tap((n) => Console.log(`Processing: ${n}`)),
Stream.drain
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Processing: 1
Processing: 2
Processing: 3
{ _id: 'Chunk', values: [] }
*/
Stream.mapChunks — трансформация на уровне чанков
Для низкоуровневой оптимизации можно работать напрямую с чанками:
import { Stream, Chunk, Effect } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.mapChunks((chunk) =>
Chunk.map(chunk, (n) => n * 100)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
Stream.cross — декартово произведение
import { Stream, Effect } from "effect"
const stream = Stream.cross(
Stream.make("a", "b"),
Stream.make(1, 2, 3)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// [ ['a',1], ['a',2], ['a',3], ['b',1], ['b',2], ['b',3] ]
API Reference
| Трансформация | Тип | Описание |
|---|---|---|
map | (A → B) → Stream<B> | Чистое преобразование |
mapEffect | (A → Effect<B>) → Stream<B> | Эффективное преобразование |
mapAccum | (S, (S, A) → [S, B]) → Stream<B> | С аккумулятором |
mapConcat | (A → Iterable<B>) → Stream<B> | Map + flatten |
mapChunks | (Chunk<A> → Chunk<B>) → Stream<B> | Чанковый маппинг |
as | (value: B) → Stream<B> | Замена константой |
filter | (A → boolean) → Stream<A> | Чистая фильтрация |
filterEffect | (A → Effect<boolean>) → Stream<A> | Эффективная фильтрация |
flatMap | (A → Stream<B>) → Stream<B> | Монадический bind |
take | (n: number) → Stream<A> | Первые N |
takeWhile | (pred) → Stream<A> | Пока предикат true |
takeUntil | (pred) → Stream<A> | До первого true |
drop | (n: number) → Stream<A> | Пропустить N |
dropWhile | (pred) → Stream<A> | Пропускать пока true |
tap | (A → Effect<_>) → Stream<A> | Наблюдение |
scan | (S, (S, A) → S) → Stream<S> | Кумулятивная аккумуляция |
zip | (Stream<B>) → Stream<[A, B]> | Попарно |
zipWith | (Stream<B>, (A, B) → C) → Stream<C> | Попарно с функцией |
zipWithIndex | → Stream<[A, number]> | С индексами |
concat | (Stream<B>) → Stream<A | B> | Конкатенация |
intersperse | (sep: A) → Stream<A> | Разделитель |
drain | → Stream<never> | Игнорирование |
changes | → Stream<A> | Дедупликация |
cross | (Stream<B>) → Stream<[A, B]> | Декартово произведение |
Примеры
Production: ETL пайплайн
import { Stream, Effect, Console, Chunk } from "effect"
interface RawRecord {
readonly id: string
readonly data: string
readonly timestamp: number
}
interface ProcessedRecord {
readonly id: string
readonly normalizedData: string
readonly processedAt: number
}
// Extract → Transform → Load пайплайн
const etlPipeline = (source: Stream.Stream<RawRecord>) =>
source.pipe(
// Transform: нормализация данных
Stream.map((record) => ({
id: record.id,
normalizedData: record.data.trim().toLowerCase(),
processedAt: Date.now()
} satisfies ProcessedRecord)),
// Filter: отбрасываем пустые записи
Stream.filter((record) => record.normalizedData.length > 0),
// Tap: логирование каждой обработанной записи
Stream.tap((record) =>
Console.log(`Processed: ${record.id}`)
),
// Группировка для batch-вставки
Stream.grouped(100),
// Load: batch-вставка в БД
Stream.mapEffect((batch) =>
Effect.gen(function* () {
yield* Console.log(`Inserting batch of ${Chunk.size(batch)} records`)
return Chunk.size(batch)
})
),
// Аккумуляция общего количества
Stream.scan(0, (total, batchSize) => total + batchSize)
)
Упражнения
🟢 Basic
Упражнение 1: Цепочка трансформаций Создайте поток чисел 1-20. Примените: map (×3), filter (только делимые на 2), take (первые 5). Соберите результат.
Решение:
import { Stream, Effect } from "effect"
const program = Stream.range(1, 20).pipe(
Stream.map((n) => n * 3),
Stream.filter((n) => n % 2 === 0),
Stream.take(5),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 6, 12, 18, 24, 30 ] }
🟡 Intermediate
Упражнение 2: flatMap для разворачивания вложенных данных Имеется поток «заказов», каждый содержит массив «товаров». Разверните в плоский поток товаров, добавьте к каждому orderId.
Решение:
import { Stream, Effect } from "effect"
interface Order {
readonly orderId: string
readonly items: ReadonlyArray<string>
}
const orders: ReadonlyArray<Order> = [
{ orderId: "O1", items: ["Widget", "Gadget"] },
{ orderId: "O2", items: ["Sprocket"] },
{ orderId: "O3", items: ["Gear", "Bolt", "Nut"] }
]
const program = Stream.fromIterable(orders).pipe(
Stream.flatMap((order) =>
Stream.fromIterable(order.items).pipe(
Stream.map((item) => ({ orderId: order.orderId, item }))
)
),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
🔴 Advanced
Упражнение 3: mapAccum для вычисления скользящего среднего Реализуйте скользящее среднее с окном 3 для потока чисел, используя Stream.mapAccum. Состояние — массив последних 3 значений.
Решение:
import { Stream, Effect } from "effect"
const movingAverage = Stream.make(10, 20, 30, 40, 50, 60, 70).pipe(
Stream.mapAccum(
[] as ReadonlyArray<number>,
(window, value) => {
const newWindow = [...window, value].slice(-3) as ReadonlyArray<number>
const avg = newWindow.reduce((a, b) => a + b, 0) / newWindow.length
return [newWindow, avg] as const
}
),
Stream.runCollect
)
Effect.runPromise(movingAverage).then(console.log)
// { _id: 'Chunk', values: [ 10, 15, 20, 30, 40, 50, 60 ] }
🔗 Далее: 04-chunking.md — grouped, rechunk, unchunks