Stream концепция и структура
Stream — это описание программы, которая при выполнении может эмитировать ноль или более значений типа `A`, обрабатывать ошибки типа `E` и работать в контексте типа `R`. Это ленивая, pull-based структура, являющаяся фундаментом потоковой обработки данных в Effect-ts.
Теория
Зачем нужны потоки
В реальных production-системах данные редко представлены единичными значениями. Мы работаем с последовательностями: строки лог-файла, результаты пагинированного API, события WebSocket, записи из базы данных, метрики IoT-устройств. Каждый из этих источников порождает множество значений, растянутых во времени.
Стандартный Effect<A, E, R> описывает программу, которая всегда завершается ровно одним значением типа A. Если нужно вернуть коллекцию, можно использовать Effect<Array<A>, E, R>, но такой подход имеет критические ограничения:
┌─────────────────────────────────────────────────────┐
│ Effect<Array<A>, E, R> │
│ │
│ ❌ Загружает ВСЕ данные в память одновременно │
│ ❌ Блокирует до полного завершения │
│ ❌ Невозможно обработать бесконечный поток │
│ ❌ Нет back-pressure между производителем и │
│ потребителем │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Stream<A, E, R> │
│ │
│ ✅ Ленивая эмиссия — элементы порождаются по │
│ требованию │
│ ✅ Постоянное потребление памяти O(chunk_size) │
│ ✅ Поддержка бесконечных последовательностей │
│ ✅ Встроенный back-pressure через pull-модель │
│ ✅ Композиция трансформаций без промежуточных │
│ коллекций │
└─────────────────────────────────────────────────────┘
Определение Stream
Stream<A, E, R> — это описание программы (blueprint), которая при выполнении:
- Может эмитировать ноль или более значений типа
A - Может завершиться ошибкой типа
E - Требует контекст (зависимости) типа
R
Важно подчеркнуть: Stream — это именно описание, а не исполняемый код. Как и Effect, поток является значением первого класса, которое можно передавать, комбинировать и трансформировать до момента запуска.
Сигнатура типа
Stream<A, E = never, R = never>
│ │ │
│ │ └─ Requirements: зависимости из контекста
│ └────────── Error: типизированные ошибки
└───────────── Success: тип эмитируемых значений
Параметры типа имеют значения по умолчанию — never для E и R, что означает «ошибок нет» и «зависимостей нет» соответственно:
import { Stream } from "effect"
// Stream без ошибок и зависимостей
const numbers: Stream.Stream<number> = Stream.make(1, 2, 3)
// Stream с ошибкой
const withError: Stream.Stream<number, string> = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("boom"))
)
// Stream с зависимостью
const withDeps: Stream.Stream<string, never, Database> =
Stream.fromEffect(Database.pipe(Effect.andThen((db) => db.getUsers)))
Концепция ФП
Stream как свободный моноид над Effect
С точки зрения теории категорий, Stream<A, E, R> можно рассматривать как свободный моноид (free monoid) над Effect<Chunk<A>, Option<E>, R>:
Stream ≈ Effect<Chunk<A>, Option<E>, R>*
Где * обозначает клиниевскую звезду (Kleene star) — конкатенацию нуля или более элементов. Каждый «шаг» потока — это эффект, возвращающий чанк элементов, а Option.none() сигнализирует о завершении потока.
Коалгебра F-coalgebra
Потоки являются частным случаем F-коалгебры. Если алгебра (fold) «сворачивает» структуру в значение, то коалгебра (unfold) «разворачивает» значение в структуру:
fold : [A] → B (катаморфизм — разрушение структуры)
unfold: S → Option<(A, S)> (анаморфизм — построение структуры)
Stream — это анаморфизм: начиная с некоторого состояния S, каждый шаг порождает пару (элемент, новое_состояние) или завершается:
import { Stream, Option } from "effect"
// Классический unfold — анаморфизм в чистом виде
const naturals = Stream.unfold(1, (state) =>
Option.some([state, state + 1] as const)
)
// Порождает: 1, 2, 3, 4, 5, ...
Потоки как ленивые списки
Концептуально Stream<A> эквивалентен ленивому списку из Haskell:
data Stream a = Nil | Cons a (IO (Stream a))
Каждый «хвост» списка обёрнут в эффект (IO), что обеспечивает ленивую оценку и возможность выполнять побочные эффекты при генерации элементов. В Effect-ts это реализовано через pull-based протокол с чанками.
Сравнение с Effect
Ключевое отличие Stream от Effect — количество эмитируемых значений:
┌───────────────────┬──────────────────────────────────┐
│ Effect<A, E, R> │ Ровно одно значение типа A │
│ │ (или ошибка E) │
├───────────────────┼──────────────────────────────────┤
│ Stream<A, E, R> │ Ноль или более значений типа A │
│ │ (или ошибка E) │
└───────────────────┴──────────────────────────────────┘
Рассмотрим спектр возможностей Stream:
import { Stream } from "effect"
// ❶ Пустой поток — ноль значений
const empty: Stream.Stream<never> = Stream.empty
// ❷ Одно значение — аналог Effect.succeed
const single: Stream.Stream<number> = Stream.succeed(42)
// ❸ Конечный поток — несколько значений
const finite: Stream.Stream<number> = Stream.range(1, 100)
// ❹ Бесконечный поток — неограниченное количество значений
const infinite: Stream.Stream<number> = Stream.iterate(1, (n) => n + 1)
Каждый Effect может быть поднят в Stream с помощью Stream.fromEffect, но обратное требует явного «сворачивания» потока в единичное значение:
import { Stream, Effect } from "effect"
// Effect → Stream (тривиальный подъём)
const fromEff: Stream.Stream<number> =
Stream.fromEffect(Effect.succeed(42))
// Stream → Effect (требует стратегии сворачивания)
const toEff: Effect.Effect<number> =
Stream.make(1, 2, 3).pipe(Stream.runFold(0, (acc, n) => acc + n))
Внутренняя архитектура
Channel — низкоуровневый примитив
Внутри Effect-ts Stream реализован поверх Channel — двунаправленного потокового примитива. Понимание этой структуры помогает разобраться в поведении потоков:
┌──────────────────────────────────────────────────────┐
│ Channel │
│ │
│ Channel<OutElem, InElem, OutErr, InErr, OutDone, │
│ InDone, Env> │
│ │
│ InElem ──────────►┌──────────┐──────────► OutElem │
│ │ Channel │ │
│ InDone ──────────►│ │──────────► OutDone │
│ │ │──────────► OutErr │
│ InErr ──────────►└──────────┘ │
│ │ │
│ Env │
└──────────────────────────────────────────────────────┘
Stream<A, E, R> ≈ Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>
Stream — это канал, который:
- Эмитирует чанки элементов (
Chunk<A>) какOutElem - Может завершиться с ошибкой типа
E - Требует окружение типа
R
Жизненный цикл потока
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Create │────►│ Transform│────►│ Run │────►│ Consume │
│ │ │ │ │ │ │ │
│ Stream. │ │ .map() │ │ runCollect│ │ Effect. │
│ make() │ │ .filter()│ │ runForEach│ │ runPromise│
│ fromIter │ │ .flatMap()│ │ runDrain │ │ │
│ unfold() │ │ etc. │ │ runFold │ │ │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
Описание Композиция Интерпретация Выполнение
(ленивое) (ленивая) (→ Effect) (запуск)
Все фазы до Run полностью ленивы — никакие вычисления не производятся, пока поток не будет запущен одним из run*-методов, возвращающих Effect.
Pull-based модель
Push vs Pull
Существуют две фундаментальные модели потоковой обработки:
Push-based (RxJS, EventEmitter):
Производитель ──[push]──► Потребитель
⚠️ Производитель контролирует скорость
⚠️ Потребитель может быть перегружен
Pull-based (Effect Stream, Iterator):
Производитель ◄──[pull]── Потребитель
✅ Потребитель контролирует скорость
✅ Естественный back-pressure
Effect Stream использует pull-based модель: потребитель запрашивает следующий чанк данных, а производитель лениво генерирует его. Это обеспечивает естественный back-pressure — потребитель не может быть перегружен данными.
Протокол toPull
Внутренне pull-модель реализована через Stream.toPull, который конвертирует Stream в Effect, возвращающий чанк данных при каждом вызове:
import { Stream, Effect } from "effect"
const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
Stream.rechunk(2)
)
const program = Effect.gen(function* () {
// toPull возвращает эффект, который при каждом вызове
// извлекает следующий чанк из потока
const getChunk = yield* Stream.toPull(stream)
// Чанк 1: [1, 2]
const chunk1 = yield* getChunk
console.log("Chunk 1:", chunk1)
// Чанк 2: [3, 4]
const chunk2 = yield* getChunk
console.log("Chunk 2:", chunk2)
// Чанк 3: [5]
const chunk3 = yield* getChunk
console.log("Chunk 3:", chunk3)
// Следующий вызов завершится с Option.none() — поток исчерпан
})
// toPull требует Scope для управления ресурсами
Effect.runPromise(Effect.scoped(program))
/*
Output:
Chunk 1: { _id: 'Chunk', values: [ 1, 2 ] }
Chunk 2: { _id: 'Chunk', values: [ 3, 4 ] }
Chunk 3: { _id: 'Chunk', values: [ 5 ] }
*/
Обратите внимание: toPull возвращает scoped-ресурс, поскольку поток может владеть внешними ресурсами (файловые дескрипторы, подключения), которые нужно освободить при завершении.
Chunk — внутренняя единица данных
Почему не поэлементно
Stream обрабатывает данные не по одному элементу, а чанками (Chunk<A>). Chunk — это иммутабельный массив из модуля Effect, оптимизированный для потоковой обработки:
Поэлементная обработка:
[1] → transform → [2] → transform → [3] → transform
Overhead: N вызовов функции для N элементов
Чанковая обработка:
[1, 2, 3, ..., 1000] → transform_batch
Overhead: 1 вызов функции для 1000 элементов
Амортизированная сложность: O(1) на элемент
Чанковость — одна из главных причин высокой производительности Effect Stream. Трансформации применяются к целым чанкам, что радикально снижает overhead вызовов функций и улучшает утилизацию кэша CPU.
Структура Chunk
import { Chunk } from "effect"
// Создание чанка
const chunk = Chunk.make(1, 2, 3, 4, 5)
// Chunk — иммутабельный, операции возвращают новый Chunk
const appended = Chunk.append(chunk, 6)
const prepended = Chunk.prepend(chunk, 0)
const mapped = Chunk.map(chunk, (n) => n * 2)
// Chunk поддерживает O(1) операции:
// - append (амортизированно)
// - prepend (амортизированно)
// - concat (через цепочку)
// - get по индексу
Размер чанка и производительность
Размер чанка влияет на баланс между задержкой (latency) и пропускной способностью (throughput):
Маленькие чанки (1-10 элементов):
✅ Низкая задержка (элемент доступен быстро)
❌ Высокий overhead на чанк
→ Подходит для real-time систем
Большие чанки (1000-10000 элементов):
✅ Высокая пропускная способность
❌ Высокая задержка (ожидание заполнения)
→ Подходит для batch-обработки
Средние чанки (100-1000 элементов):
⚖️ Оптимальный баланс для большинства задач
Вы можете контролировать размер чанка с помощью Stream.rechunk:
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 10000).pipe(
Stream.rechunk(256) // Перегруппировка в чанки по 256 элементов
)
API Reference
Базовые конструкторы
| Функция | Сигнатура | Описание |
|---|---|---|
Stream.make | <A>(...values: A[]) => Stream<A> | Создаёт поток из перечисленных значений |
Stream.empty | Stream<never> | Пустой поток |
Stream.succeed | <A>(value: A) => Stream<A> | Поток из одного значения |
Stream.fail | <E>(error: E) => Stream<never, E> | Поток, завершающийся ошибкой |
Stream.void | Stream<void> | Поток с единственным undefined |
Stream.range | (min: number, max: number) => Stream<number> | Диапазон чисел [min, max] включительно |
Stream.iterate | <A>(init: A, f: (a: A) => A) => Stream<A> | Бесконечный поток: init, f(init), f(f(init)), … |
Запуск потока
| Функция | Сигнатура | Описание |
|---|---|---|
Stream.runCollect | <A, E, R>(stream: Stream<A, E, R>) => Effect<Chunk<A>, E, R> | Собирает все элементы в Chunk |
Stream.runForEach | <A, E, R>(stream, f) => Effect<void, E, R> | Выполняет эффект для каждого элемента |
Stream.runDrain | <A, E, R>(stream) => Effect<void, E, R> | Запускает поток, игнорируя значения |
Stream.runFold | <A, E, R, S>(stream, init, f) => Effect<S, E, R> | Свёртка потока в единственное значение |
Stream.runHead | <A, E, R>(stream) => Effect<Option<A>, E, R> | Извлекает первый элемент |
Базовые трансформации
| Функция | Описание |
|---|---|
Stream.map | Чистая трансформация каждого элемента |
Stream.filter | Фильтрация элементов по предикату |
Stream.take | Взять первые N элементов |
Stream.drop | Пропустить первые N элементов |
Stream.concat | Конкатенация двух потоков |
Примеры
Пример 1: Базовое создание и потребление
import { Stream, Effect } from "effect"
// Создание конечного потока
const numbers = Stream.make(1, 2, 3, 4, 5)
// Трансформация: удвоение каждого элемента
const doubled = numbers.pipe(
Stream.map((n) => n * 2)
)
// Потребление: собрать все элементы в Chunk
const program = Stream.runCollect(doubled)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }
Пример 2: Бесконечный поток с ограничением
import { Stream, Effect } from "effect"
// Бесконечный поток чисел Фибоначчи
const fibonacci = Stream.unfold(
[0, 1] as const,
([a, b]) => Option.some([a, [b, a + b] as const] as const)
)
// Берём первые 10 чисел
const firstTen = fibonacci.pipe(Stream.take(10))
Effect.runPromise(Stream.runCollect(firstTen)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 ] }
Пример 3: Поток с зависимостями
import { Stream, Effect, Context, Layer } from "effect"
// Определение сервиса
class UserRepository extends Context.Tag("UserRepository")<
UserRepository,
{
readonly findAll: Effect.Effect<ReadonlyArray<string>>
readonly findById: (id: number) => Effect.Effect<string>
}
>() {}
// Поток, зависящий от сервиса
const userStream: Stream.Stream<string, never, UserRepository> =
Stream.fromEffect(
UserRepository.pipe(
Effect.andThen((repo) => repo.findAll)
)
).pipe(Stream.flatMap(Stream.fromIterable))
// Предоставление зависимости и запуск
const TestLayer = Layer.succeed(UserRepository, {
findAll: Effect.succeed(["Alice", "Bob", "Charlie"]),
findById: (id) => Effect.succeed(`User-${id}`)
})
const program = userStream.pipe(
Stream.provideLayer(TestLayer),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 'Alice', 'Bob', 'Charlie' ] }
Пример 4: Ленивость в действии
import { Stream, Effect, Console } from "effect"
// Каждый элемент генерируется лениво
const lazyStream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.tap((n) => Console.log(`Producing: ${n}`)),
Stream.map((n) => n * 10),
Stream.tap((n) => Console.log(`Transformed: ${n}`)),
Stream.take(3) // Только первые 3 — остальные НЕ будут сгенерированы
)
Effect.runPromise(Stream.runCollect(lazyStream)).then(console.log)
/*
Output:
Producing: 1
Transformed: 10
Producing: 2
Transformed: 20
Producing: 3
Transformed: 30
{ _id: 'Chunk', values: [ 10, 20, 30 ] }
*/
// Элементы 4 и 5 никогда не будут обработаны!
Пример 5: Production — мониторинг метрик
import { Stream, Effect, Schedule, Random, Chunk } from "effect"
// Тип метрики
interface Metric {
readonly timestamp: number
readonly cpu: number
readonly memory: number
}
// Поток метрик, собираемых каждую секунду
const metricsStream: Stream.Stream<Metric> = Stream.repeatEffect(
Effect.gen(function* () {
const cpu = yield* Random.nextIntBetween(0, 100)
const memory = yield* Random.nextIntBetween(20, 90)
return {
timestamp: Date.now(),
cpu,
memory
} as Metric
})
).pipe(Stream.schedule(Schedule.spaced("1 second")))
// Обработка: фильтрация аномальных значений, группировка по 5
const pipeline = metricsStream.pipe(
Stream.filter((m) => m.cpu < 95), // отбрасываем пиковые значения
Stream.take(20), // берём 20 метрик для примера
Stream.grouped(5), // группируем по 5
Stream.map((chunk) => ({
count: Chunk.size(chunk),
avgCpu: Chunk.reduce(chunk, 0, (acc, m) => acc + m.cpu) / Chunk.size(chunk),
avgMemory: Chunk.reduce(chunk, 0, (acc, m) => acc + m.memory) / Chunk.size(chunk)
}))
)
const program = pipeline.pipe(
Stream.runForEach((summary) =>
Effect.sync(() => console.log(
`Batch: avgCPU=${summary.avgCpu.toFixed(1)}%, avgMem=${summary.avgMemory.toFixed(1)}%`
))
)
)
Упражнения
🟢 Basic
Упражнение 1: Первый поток Создайте поток из строк "hello", "world", "effect". Примените Stream.map для преобразования каждой строки в верхний регистр. Соберите результат с помощью Stream.runCollect.
Решение:
import { Stream, Effect } from "effect"
const program = Stream.make("hello", "world", "effect").pipe(
Stream.map((s) => s.toUpperCase()),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 'HELLO', 'WORLD', 'EFFECT' ] }
Упражнение 2: Диапазон и фильтрация Создайте поток чисел от 1 до 50. Оставьте только числа, делимые на 7. Соберите результат.
Решение:
import { Stream, Effect } from "effect"
const program = Stream.range(1, 50).pipe(
Stream.filter((n) => n % 7 === 0),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 7, 14, 21, 28, 35, 42, 49 ] }
🟡 Intermediate
Упражнение 3: Бесконечный поток степеней двойки Используя Stream.iterate, создайте бесконечный поток степеней двойки: 1, 2, 4, 8, 16, … Извлеките первые 10 значений.
Решение:
import { Stream, Effect } from "effect"
const powersOfTwo = Stream.iterate(1, (n) => n * 2).pipe(
Stream.take(10),
Stream.runCollect
)
Effect.runPromise(powersOfTwo).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 ] }
Упражнение 4: Поток с эффектами Создайте поток, который генерирует 5 случайных чисел с помощью Stream.repeatEffect и Random.nextIntBetween(1, 100). Отфильтруйте только чётные числа и соберите результат.
Решение:
import { Stream, Effect, Random } from "effect"
const program = Stream.repeatEffect(
Random.nextIntBetween(1, 100)
).pipe(
Stream.take(5),
Stream.filter((n) => n % 2 === 0),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
🔴 Advanced
Упражнение 5: Потоковый пайплайн с зависимостями Создайте сервис EventSource с методом getEvents: Effect<ReadonlyArray<Event>>, где Event = { id: number, type: string, payload: string }. Реализуйте потоковый пайплайн:
- Получение событий из сервиса
- Фильтрация по типу
"order" - Преобразование payload в uppercase
- Ограничение первыми 10 событиями
- Сбор результата
Предоставьте тестовый Layer и запустите.
Решение:
import { Stream, Effect, Context, Layer } from "effect"
interface Event {
readonly id: number
readonly type: string
readonly payload: string
}
class EventSource extends Context.Tag("EventSource")<
EventSource,
{ readonly getEvents: Effect.Effect<ReadonlyArray<Event>> }
>() {}
const eventPipeline = Stream.fromEffect(
EventSource.pipe(Effect.andThen((es) => es.getEvents))
).pipe(
Stream.flatMap(Stream.fromIterable),
Stream.filter((event) => event.type === "order"),
Stream.map((event) => ({
...event,
payload: event.payload.toUpperCase()
})),
Stream.take(10),
Stream.runCollect
)
const TestEventSource = Layer.succeed(EventSource, {
getEvents: Effect.succeed(
Array.from({ length: 20 }, (_, i) => ({
id: i,
type: i % 3 === 0 ? "order" : "notification",
payload: `event-payload-${i}`
}))
)
})
const program = Effect.provide(eventPipeline, TestEventSource)
Effect.runPromise(program).then(console.log)