Effect Курс Трансформации
Глава

Трансформации

Трансформации — это сердце потоковой обработки. Они позволяют декларативно описывать пайплайны преобразования данных, где каждый шаг является чистой функцией, а композиция трансформаций ленива и не создаёт промежуточных коллекций.

Теория

Функтор, монада и фильтрация

Трансформации Stream отражают фундаментальные абстракции функционального программирования:

map    : Функтор     — Structure-preserving transformation
         Stream<A> → (A → B) → Stream<B>
         Сохраняет количество и порядок элементов

filter : MonadPlus   — Structure-reducing operation
         Stream<A> → (A → boolean) → Stream<A>
         Может уменьшить количество элементов

flatMap: Монада      — Structure-expanding + flattening
         Stream<A> → (A → Stream<B>) → Stream<B>
         Может изменить количество элементов (0..∞ за каждый)

Ключевой принцип: все трансформации ленивы. Они не выполняют никаких вычислений до запуска потока через run*-функцию. Трансформации формируют описание пайплайна, а не выполняют его:

Stream.make(1, 2, 3)           // описание: [1, 2, 3]
  .pipe(Stream.map(n => n*2))  // описание: map(×2)
  .pipe(Stream.filter(…))      // описание: filter(…)
  // Ничего не вычислено!

Stream.runCollect(…)           // ← тут начинается выполнение

Mapping — преобразование элементов

Stream.map — чистое преобразование

Применяет чистую функцию к каждому элементу:

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.map((n) => n * 2)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6 ] }

Гарантии map:

  • Количество элементов не меняется
  • Порядок сохраняется
  • Каждый элемент обрабатывается ровно один раз

Stream.mapEffect — эффективное преобразование

Когда трансформация требует выполнения побочных эффектов:

import { Stream, Random, Effect } from "effect"

const stream = Stream.make(10, 20, 30).pipe(
  Stream.mapEffect((n) => Random.nextIntBetween(0, n))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Пример: { _id: 'Chunk', values: [ 5, 9, 22 ] }

С опцией конкурентности эффекты выполняются параллельно, но порядок результатов сохраняется:

import { Stream, Effect } from "effect"

const fetchUrl = (url: string) =>
  Effect.gen(function* () {
    yield* Effect.sleep("100 millis")
    return `Result for ${url}`
  })

const stream = Stream.make("url1", "url2", "url3").pipe(
  Stream.mapEffect(fetchUrl, { concurrency: 2 })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

Stream.as — замена значения константой

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 5).pipe(Stream.as(null))

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ null, null, null, null, null ] }

Stream.mapAccum — маппинг с аккумулятором

Позволяет трансформировать элементы, сохраняя состояние между вызовами:

import { Stream, Effect } from "effect"

// Нарастающая сумма (running total)
const stream = Stream.range(1, 5).pipe(
  Stream.mapAccum(0, (state, n) => [state + n, state + n])
  //                     ↑                ↑          ↑
  //              начальное        новое      эмитируемое
  //              состояние     состояние     значение
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 3, 6, 10, 15 ] }

Stream.mapConcat — маппинг с развёртыванием

Каждый элемент преобразуется в Iterable, затем все результаты развёртываются в плоский поток:

import { Stream, Effect } from "effect"

const stream = Stream.make("1-2-3", "4-5", "6").pipe(
  Stream.mapConcat((s) => s.split("-"))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ '1', '2', '3', '4', '5', '6' ] }

Filtering — фильтрация

Stream.filter — чистая фильтрация

import { Stream, Effect } from "effect"

const evens = Stream.range(1, 10).pipe(
  Stream.filter((n) => n % 2 === 0)
)

Effect.runPromise(Stream.runCollect(evens)).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

Stream.filterEffect — эффективная фильтрация

Когда предикат фильтрации — это эффект:

import { Stream, Effect, Random } from "effect"

// Случайно пропускаем элементы (50% шанс)
const stream = Stream.range(1, 10).pipe(
  Stream.filterEffect((n) =>
    Random.nextBoolean.pipe(
      Effect.map((keep) => keep)
    )
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

Stream.changes — дедупликация последовательных значений

Эмитирует элемент только если он отличается от предыдущего:

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 1, 1, 2, 2, 3, 4).pipe(Stream.changes)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4 ] }

FlatMap — развёртывание и конкатенация

Концепция

flatMap — самая мощная трансформация. Каждый элемент исходного потока преобразуется в новый поток, а все результирующие потоки конкатенируются:

Исходный:  [A, B, C]

flatMap(f):
  A → Stream[a1, a2]
  B → Stream[b1, b2, b3]
  C → Stream[c1]

Результат: [a1, a2, b1, b2, b3, c1]

Stream.flatMap — последовательная конкатенация

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap((a) =>
    Stream.repeatValue(a).pipe(Stream.take(4))
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 ] }

Конкурентный flatMap

С опцией concurrency внутренние потоки выполняются параллельно:

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap(
    (n) =>
      Stream.fromEffect(
        Effect.sleep(`${n * 100} millis`).pipe(
          Effect.as(n)
        )
      ),
    { concurrency: 3 }
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок зависит от времени выполнения

Switch-режим flatMap

С опцией switch: true при появлении нового элемента предыдущий внутренний поток отменяется:

import { Stream, Effect, Schedule } from "effect"

// При каждом новом элементе переключаемся на новый внутренний поток
const stream = Stream.make(1, 2, 3).pipe(
  Stream.schedule(Schedule.spaced("200 millis")),
  Stream.flatMap(
    (n) =>
      Stream.repeatValue(n).pipe(
        Stream.schedule(Schedule.spaced("50 millis")),
        Stream.take(10)
      ),
    { switch: true }
  )
)

Полезно для UI-сценариев (autocomplete), где нужен только результат последнего запроса.


Taking и Dropping — извлечение подмножеств

Taking — взятие элементов

ФункцияОписание
Stream.take(n)Первые N элементов
Stream.takeWhile(pred)Пока предикат true
Stream.takeUntil(pred)До первого true (включительно)
Stream.takeRight(n)Последние N элементов
import { Stream, Effect } from "effect"

// take — первые N
const first3 = Stream.range(1, 10).pipe(Stream.take(3))
// [1, 2, 3]

// takeWhile — пока условие верно
const lessThan5 = Stream.range(1, 10).pipe(
  Stream.takeWhile((n) => n < 5)
)
// [1, 2, 3, 4]

// takeUntil — до выполнения условия (включая)
const untilFive = Stream.range(1, 10).pipe(
  Stream.takeUntil((n) => n === 5)
)
// [1, 2, 3, 4, 5]

Dropping — пропуск элементов

ФункцияОписание
Stream.drop(n)Пропустить первые N
Stream.dropWhile(pred)Пропускать пока предикат true
Stream.dropUntil(pred)Пропускать до первого true
import { Stream, Effect } from "effect"

// drop — пропустить первые 3
const skip3 = Stream.range(1, 10).pipe(Stream.drop(3))
// [4, 5, 6, 7, 8, 9, 10]

// dropWhile — пропускать пока < 5
const from5 = Stream.range(1, 10).pipe(
  Stream.dropWhile((n) => n < 5)
)
// [5, 6, 7, 8, 9, 10]

Tapping — наблюдение без изменения

Stream.tap позволяет выполнить побочный эффект для каждого элемента без изменения потока:

import { Stream, Console, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.tap((n) => Console.log(`before: ${n}`)),
  Stream.map((n) => n * 2),
  Stream.tap((n) => Console.log(`after: ${n}`))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
before: 1
after: 2
before: 2
after: 4
before: 3
after: 6
{ _id: 'Chunk', values: [ 2, 4, 6 ] }
*/

Тип потока не меняется: Stream<A> → Stream<A>.


Zipping — объединение потоков попарно

Stream.zip — попарное объединение

import { Stream, Effect } from "effect"

const stream = Stream.zip(
  Stream.make(1, 2, 3, 4, 5, 6),
  Stream.make("a", "b", "c")
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'c' ] ] }

Поток завершается, когда один из входных потоков исчерпан.

Stream.zipWith — с пользовательской функцией

import { Stream, Effect } from "effect"

const stream = Stream.zipWith(
  Stream.make(1, 2, 3),
  Stream.make("a", "b", "c"),
  (n, s) => `${s}:${n}`
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 'a:1', 'b:2', 'c:3' ] }

Stream.zipAll / zipAllWith — с дефолтами

Если один поток короче, используются дефолтные значения:

import { Stream, Effect } from "effect"

const stream = Stream.zipAll(Stream.make(1, 2, 3, 4), {
  other: Stream.make("a", "b"),
  defaultSelf: 0,
  defaultOther: "z"
})

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 1, 'a' ], [ 2, 'b' ], [ 3, 'z' ], [ 4, 'z' ] ] }

Stream.zipWithIndex — нумерация элементов

import { Stream, Effect } from "effect"

const stream = Stream.make("a", "b", "c").pipe(Stream.zipWithIndex)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ [ 'a', 0 ], [ 'b', 1 ], [ 'c', 2 ] ] }

Scanning — кумулятивная аккумуляция

Stream.scan аккумулирует значения и эмитирует каждый промежуточный результат:

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 5).pipe(
  Stream.scan(0, (acc, n) => acc + n)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 3, 6, 10, 15 ] }
//                           ↑  ↑  ↑   ↑   ↑   ↑
//                        init  +1 +2  +3  +4  +5

Обратите внимание: scan эмитирует начальное значение первым. Поток из N элементов даёт N+1 значений.


Concatenation — конкатенация потоков

Stream.concat — последовательная конкатенация

import { Stream, Effect } from "effect"

const stream = Stream.concat(
  Stream.make(1, 2, 3),
  Stream.make("a", "b")
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'a', 'b' ] }

Stream.concatAll — конкатенация массива потоков

import { Stream, Chunk, Effect } from "effect"

const stream = Stream.concatAll<number | string | boolean, never, never>(
  Chunk.make(
    Stream.make(1, 2),
    Stream.make("a", "b"),
    Stream.make(true, false)
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 'a', 'b', true, false ] }

Interspersing — вставка разделителей

Stream.intersperse — разделитель между элементами

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.intersperse(0)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 0, 2, 0, 3, 0, 4, 0, 5 ] }

Stream.intersperseAffixes — с префиксом и суффиксом

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.intersperseAffixes({
    start: "[",
    middle: ",",
    end: "]"
  })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ '[', 1, ',', 2, ',', 3, ']' ] }

Прочие трансформации

Stream.drain — игнорирование значений

Выполняет все эффекты потока, но отбрасывает значения:

import { Stream, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.tap((n) => Console.log(`Processing: ${n}`)),
  Stream.drain
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Processing: 1
Processing: 2
Processing: 3
{ _id: 'Chunk', values: [] }
*/

Stream.mapChunks — трансформация на уровне чанков

Для низкоуровневой оптимизации можно работать напрямую с чанками:

import { Stream, Chunk, Effect } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.mapChunks((chunk) =>
    Chunk.map(chunk, (n) => n * 100)
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

Stream.cross — декартово произведение

import { Stream, Effect } from "effect"

const stream = Stream.cross(
  Stream.make("a", "b"),
  Stream.make(1, 2, 3)
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// [ ['a',1], ['a',2], ['a',3], ['b',1], ['b',2], ['b',3] ]

API Reference

ТрансформацияТипОписание
map(A → B) → Stream<B>Чистое преобразование
mapEffect(A → Effect<B>) → Stream<B>Эффективное преобразование
mapAccum(S, (S, A) → [S, B]) → Stream<B>С аккумулятором
mapConcat(A → Iterable<B>) → Stream<B>Map + flatten
mapChunks(Chunk<A> → Chunk<B>) → Stream<B>Чанковый маппинг
as(value: B) → Stream<B>Замена константой
filter(A → boolean) → Stream<A>Чистая фильтрация
filterEffect(A → Effect<boolean>) → Stream<A>Эффективная фильтрация
flatMap(A → Stream<B>) → Stream<B>Монадический bind
take(n: number) → Stream<A>Первые N
takeWhile(pred) → Stream<A>Пока предикат true
takeUntil(pred) → Stream<A>До первого true
drop(n: number) → Stream<A>Пропустить N
dropWhile(pred) → Stream<A>Пропускать пока true
tap(A → Effect<_>) → Stream<A>Наблюдение
scan(S, (S, A) → S) → Stream<S>Кумулятивная аккумуляция
zip(Stream<B>) → Stream<[A, B]>Попарно
zipWith(Stream<B>, (A, B) → C) → Stream<C>Попарно с функцией
zipWithIndex→ Stream<[A, number]>С индексами
concat(Stream<B>) → Stream<A | B>Конкатенация
intersperse(sep: A) → Stream<A>Разделитель
drain→ Stream<never>Игнорирование
changes→ Stream<A>Дедупликация
cross(Stream<B>) → Stream<[A, B]>Декартово произведение

Примеры

Production: ETL пайплайн

import { Stream, Effect, Console, Chunk } from "effect"

interface RawRecord {
  readonly id: string
  readonly data: string
  readonly timestamp: number
}

interface ProcessedRecord {
  readonly id: string
  readonly normalizedData: string
  readonly processedAt: number
}

// Extract → Transform → Load пайплайн
const etlPipeline = (source: Stream.Stream<RawRecord>) =>
  source.pipe(
    // Transform: нормализация данных
    Stream.map((record) => ({
      id: record.id,
      normalizedData: record.data.trim().toLowerCase(),
      processedAt: Date.now()
    } satisfies ProcessedRecord)),

    // Filter: отбрасываем пустые записи
    Stream.filter((record) => record.normalizedData.length > 0),

    // Tap: логирование каждой обработанной записи
    Stream.tap((record) =>
      Console.log(`Processed: ${record.id}`)
    ),

    // Группировка для batch-вставки
    Stream.grouped(100),

    // Load: batch-вставка в БД
    Stream.mapEffect((batch) =>
      Effect.gen(function* () {
        yield* Console.log(`Inserting batch of ${Chunk.size(batch)} records`)
        return Chunk.size(batch)
      })
    ),

    // Аккумуляция общего количества
    Stream.scan(0, (total, batchSize) => total + batchSize)
  )

Упражнения

🟢 Basic

Упражнение 1: Цепочка трансформаций Создайте поток чисел 1-20. Примените: map (×3), filter (только делимые на 2), take (первые 5). Соберите результат.

Решение:

import { Stream, Effect } from "effect"

const program = Stream.range(1, 20).pipe(
  Stream.map((n) => n * 3),
  Stream.filter((n) => n % 2 === 0),
  Stream.take(5),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 6, 12, 18, 24, 30 ] }

🟡 Intermediate

Упражнение 2: flatMap для разворачивания вложенных данных Имеется поток «заказов», каждый содержит массив «товаров». Разверните в плоский поток товаров, добавьте к каждому orderId.

Решение:

import { Stream, Effect } from "effect"

interface Order {
  readonly orderId: string
  readonly items: ReadonlyArray<string>
}

const orders: ReadonlyArray<Order> = [
  { orderId: "O1", items: ["Widget", "Gadget"] },
  { orderId: "O2", items: ["Sprocket"] },
  { orderId: "O3", items: ["Gear", "Bolt", "Nut"] }
]

const program = Stream.fromIterable(orders).pipe(
  Stream.flatMap((order) =>
    Stream.fromIterable(order.items).pipe(
      Stream.map((item) => ({ orderId: order.orderId, item }))
    )
  ),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)

🔴 Advanced

Упражнение 3: mapAccum для вычисления скользящего среднего Реализуйте скользящее среднее с окном 3 для потока чисел, используя Stream.mapAccum. Состояние — массив последних 3 значений.

Решение:

import { Stream, Effect } from "effect"

const movingAverage = Stream.make(10, 20, 30, 40, 50, 60, 70).pipe(
  Stream.mapAccum(
    [] as ReadonlyArray<number>,
    (window, value) => {
      const newWindow = [...window, value].slice(-3) as ReadonlyArray<number>
      const avg = newWindow.reduce((a, b) => a + b, 0) / newWindow.length
      return [newWindow, avg] as const
    }
  ),
  Stream.runCollect
)

Effect.runPromise(movingAverage).then(console.log)
// { _id: 'Chunk', values: [ 10, 15, 20, 30, 40, 50, 60 ] }

🔗 Далее: 04-chunking.md — grouped, rechunk, unchunks