Effect Курс Stream концепция и структура
Глава

Stream концепция и структура

Stream — это описание программы, которая при выполнении может эмитировать ноль или более значений типа `A`, обрабатывать ошибки типа `E` и работать в контексте типа `R`. Это ленивая, pull-based структура, являющаяся фундаментом потоковой обработки данных в Effect-ts.

Теория

Зачем нужны потоки

В реальных production-системах данные редко представлены единичными значениями. Мы работаем с последовательностями: строки лог-файла, результаты пагинированного API, события WebSocket, записи из базы данных, метрики IoT-устройств. Каждый из этих источников порождает множество значений, растянутых во времени.

Стандартный Effect<A, E, R> описывает программу, которая всегда завершается ровно одним значением типа A. Если нужно вернуть коллекцию, можно использовать Effect<Array<A>, E, R>, но такой подход имеет критические ограничения:

┌─────────────────────────────────────────────────────┐
│  Effect<Array<A>, E, R>                             │
│                                                     │
│  ❌ Загружает ВСЕ данные в память одновременно       │
│  ❌ Блокирует до полного завершения                  │
│  ❌ Невозможно обработать бесконечный поток          │
│  ❌ Нет back-pressure между производителем и         │
│     потребителем                                    │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│  Stream<A, E, R>                                    │
│                                                     │
│  ✅ Ленивая эмиссия — элементы порождаются по       │
│     требованию                                      │
│  ✅ Постоянное потребление памяти O(chunk_size)      │
│  ✅ Поддержка бесконечных последовательностей        │
│  ✅ Встроенный back-pressure через pull-модель       │
│  ✅ Композиция трансформаций без промежуточных       │
│     коллекций                                       │
└─────────────────────────────────────────────────────┘

Определение Stream

Stream<A, E, R> — это описание программы (blueprint), которая при выполнении:

  • Может эмитировать ноль или более значений типа A
  • Может завершиться ошибкой типа E
  • Требует контекст (зависимости) типа R

Важно подчеркнуть: Stream — это именно описание, а не исполняемый код. Как и Effect, поток является значением первого класса, которое можно передавать, комбинировать и трансформировать до момента запуска.

Сигнатура типа

Stream<A, E = never, R = never>
         │  │        │
         │  │        └─ Requirements: зависимости из контекста
         │  └────────── Error: типизированные ошибки
         └───────────── Success: тип эмитируемых значений

Параметры типа имеют значения по умолчанию — never для E и R, что означает «ошибок нет» и «зависимостей нет» соответственно:

import { Stream } from "effect"

// Stream без ошибок и зависимостей
const numbers: Stream.Stream<number> = Stream.make(1, 2, 3)

// Stream с ошибкой
const withError: Stream.Stream<number, string> = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("boom"))
)

// Stream с зависимостью
const withDeps: Stream.Stream<string, never, Database> =
  Stream.fromEffect(Database.pipe(Effect.andThen((db) => db.getUsers)))

Концепция ФП

Stream как свободный моноид над Effect

С точки зрения теории категорий, Stream<A, E, R> можно рассматривать как свободный моноид (free monoid) над Effect<Chunk<A>, Option<E>, R>:

Stream ≈ Effect<Chunk<A>, Option<E>, R>*

Где * обозначает клиниевскую звезду (Kleene star) — конкатенацию нуля или более элементов. Каждый «шаг» потока — это эффект, возвращающий чанк элементов, а Option.none() сигнализирует о завершении потока.

Коалгебра F-coalgebra

Потоки являются частным случаем F-коалгебры. Если алгебра (fold) «сворачивает» структуру в значение, то коалгебра (unfold) «разворачивает» значение в структуру:

fold  :  [A] → B              (катаморфизм — разрушение структуры)
unfold:  S → Option<(A, S)>   (анаморфизм — построение структуры)

Stream — это анаморфизм: начиная с некоторого состояния S, каждый шаг порождает пару (элемент, новое_состояние) или завершается:

import { Stream, Option } from "effect"

// Классический unfold — анаморфизм в чистом виде
const naturals = Stream.unfold(1, (state) =>
  Option.some([state, state + 1] as const)
)
// Порождает: 1, 2, 3, 4, 5, ...

Потоки как ленивые списки

Концептуально Stream<A> эквивалентен ленивому списку из Haskell:

data Stream a = Nil | Cons a (IO (Stream a))

Каждый «хвост» списка обёрнут в эффект (IO), что обеспечивает ленивую оценку и возможность выполнять побочные эффекты при генерации элементов. В Effect-ts это реализовано через pull-based протокол с чанками.


Сравнение с Effect

Ключевое отличие Stream от Effect — количество эмитируемых значений:

┌───────────────────┬──────────────────────────────────┐
│  Effect<A, E, R>  │  Ровно одно значение типа A      │
│                   │  (или ошибка E)                  │
├───────────────────┼──────────────────────────────────┤
│  Stream<A, E, R>  │  Ноль или более значений типа A  │
│                   │  (или ошибка E)                  │
└───────────────────┴──────────────────────────────────┘

Рассмотрим спектр возможностей Stream:

import { Stream } from "effect"

// ❶ Пустой поток — ноль значений
const empty: Stream.Stream<never> = Stream.empty

// ❷ Одно значение — аналог Effect.succeed
const single: Stream.Stream<number> = Stream.succeed(42)

// ❸ Конечный поток — несколько значений
const finite: Stream.Stream<number> = Stream.range(1, 100)

// ❹ Бесконечный поток — неограниченное количество значений
const infinite: Stream.Stream<number> = Stream.iterate(1, (n) => n + 1)

Каждый Effect может быть поднят в Stream с помощью Stream.fromEffect, но обратное требует явного «сворачивания» потока в единичное значение:

import { Stream, Effect } from "effect"

// Effect → Stream (тривиальный подъём)
const fromEff: Stream.Stream<number> =
  Stream.fromEffect(Effect.succeed(42))

// Stream → Effect (требует стратегии сворачивания)
const toEff: Effect.Effect<number> =
  Stream.make(1, 2, 3).pipe(Stream.runFold(0, (acc, n) => acc + n))

Внутренняя архитектура

Channel — низкоуровневый примитив

Внутри Effect-ts Stream реализован поверх Channel — двунаправленного потокового примитива. Понимание этой структуры помогает разобраться в поведении потоков:

┌──────────────────────────────────────────────────────┐
│                     Channel                          │
│                                                      │
│  Channel<OutElem, InElem, OutErr, InErr, OutDone,   │
│           InDone, Env>                               │
│                                                      │
│  InElem ──────────►┌──────────┐──────────► OutElem  │
│                    │  Channel │                      │
│  InDone ──────────►│          │──────────► OutDone  │
│                    │          │──────────► OutErr   │
│  InErr  ──────────►└──────────┘                      │
│                         │                            │
│                       Env                            │
└──────────────────────────────────────────────────────┘

Stream<A, E, R>  ≈  Channel<Chunk<A>, unknown, E, unknown, unknown, unknown, R>

Stream — это канал, который:

  • Эмитирует чанки элементов (Chunk<A>) как OutElem
  • Может завершиться с ошибкой типа E
  • Требует окружение типа R

Жизненный цикл потока

┌──────────┐     ┌──────────┐     ┌──────────┐     ┌──────────┐
│  Create  │────►│  Transform│────►│   Run    │────►│ Consume  │
│          │     │          │     │          │     │          │
│ Stream.  │     │ .map()   │     │ runCollect│    │ Effect.  │
│ make()   │     │ .filter()│     │ runForEach│    │ runPromise│
│ fromIter │     │ .flatMap()│    │ runDrain  │     │          │
│ unfold() │     │ etc.     │     │ runFold   │     │          │
└──────────┘     └──────────┘     └──────────┘     └──────────┘
  Описание        Композиция      Интерпретация     Выполнение
  (ленивое)       (ленивая)       (→ Effect)        (запуск)

Все фазы до Run полностью ленивы — никакие вычисления не производятся, пока поток не будет запущен одним из run*-методов, возвращающих Effect.


Pull-based модель

Push vs Pull

Существуют две фундаментальные модели потоковой обработки:

Push-based (RxJS, EventEmitter):
  Производитель ──[push]──► Потребитель
  ⚠️ Производитель контролирует скорость
  ⚠️ Потребитель может быть перегружен

Pull-based (Effect Stream, Iterator):
  Производитель ◄──[pull]── Потребитель
  ✅ Потребитель контролирует скорость
  ✅ Естественный back-pressure

Effect Stream использует pull-based модель: потребитель запрашивает следующий чанк данных, а производитель лениво генерирует его. Это обеспечивает естественный back-pressure — потребитель не может быть перегружен данными.

Протокол toPull

Внутренне pull-модель реализована через Stream.toPull, который конвертирует Stream в Effect, возвращающий чанк данных при каждом вызове:

import { Stream, Effect } from "effect"

const stream = Stream.fromIterable([1, 2, 3, 4, 5]).pipe(
  Stream.rechunk(2)
)

const program = Effect.gen(function* () {
  // toPull возвращает эффект, который при каждом вызове
  // извлекает следующий чанк из потока
  const getChunk = yield* Stream.toPull(stream)

  // Чанк 1: [1, 2]
  const chunk1 = yield* getChunk
  console.log("Chunk 1:", chunk1)

  // Чанк 2: [3, 4]
  const chunk2 = yield* getChunk
  console.log("Chunk 2:", chunk2)

  // Чанк 3: [5]
  const chunk3 = yield* getChunk
  console.log("Chunk 3:", chunk3)

  // Следующий вызов завершится с Option.none() — поток исчерпан
})

// toPull требует Scope для управления ресурсами
Effect.runPromise(Effect.scoped(program))
/*
Output:
Chunk 1: { _id: 'Chunk', values: [ 1, 2 ] }
Chunk 2: { _id: 'Chunk', values: [ 3, 4 ] }
Chunk 3: { _id: 'Chunk', values: [ 5 ] }
*/

Обратите внимание: toPull возвращает scoped-ресурс, поскольку поток может владеть внешними ресурсами (файловые дескрипторы, подключения), которые нужно освободить при завершении.


Chunk — внутренняя единица данных

Почему не поэлементно

Stream обрабатывает данные не по одному элементу, а чанками (Chunk<A>). Chunk — это иммутабельный массив из модуля Effect, оптимизированный для потоковой обработки:

Поэлементная обработка:
  [1] → transform → [2] → transform → [3] → transform
  Overhead: N вызовов функции для N элементов

Чанковая обработка:
  [1, 2, 3, ..., 1000] → transform_batch
  Overhead: 1 вызов функции для 1000 элементов
  Амортизированная сложность: O(1) на элемент

Чанковость — одна из главных причин высокой производительности Effect Stream. Трансформации применяются к целым чанкам, что радикально снижает overhead вызовов функций и улучшает утилизацию кэша CPU.

Структура Chunk

import { Chunk } from "effect"

// Создание чанка
const chunk = Chunk.make(1, 2, 3, 4, 5)

// Chunk — иммутабельный, операции возвращают новый Chunk
const appended = Chunk.append(chunk, 6)
const prepended = Chunk.prepend(chunk, 0)
const mapped = Chunk.map(chunk, (n) => n * 2)

// Chunk поддерживает O(1) операции:
// - append (амортизированно)
// - prepend (амортизированно)
// - concat (через цепочку)
// - get по индексу

Размер чанка и производительность

Размер чанка влияет на баланс между задержкой (latency) и пропускной способностью (throughput):

Маленькие чанки (1-10 элементов):
  ✅ Низкая задержка (элемент доступен быстро)
  ❌ Высокий overhead на чанк
  → Подходит для real-time систем

Большие чанки (1000-10000 элементов):
  ✅ Высокая пропускная способность
  ❌ Высокая задержка (ожидание заполнения)
  → Подходит для batch-обработки

Средние чанки (100-1000 элементов):
  ⚖️ Оптимальный баланс для большинства задач

Вы можете контролировать размер чанка с помощью Stream.rechunk:

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 10000).pipe(
  Stream.rechunk(256) // Перегруппировка в чанки по 256 элементов
)

API Reference

Базовые конструкторы

ФункцияСигнатураОписание
Stream.make<A>(...values: A[]) => Stream<A>Создаёт поток из перечисленных значений
Stream.emptyStream<never>Пустой поток
Stream.succeed<A>(value: A) => Stream<A>Поток из одного значения
Stream.fail<E>(error: E) => Stream<never, E>Поток, завершающийся ошибкой
Stream.voidStream<void>Поток с единственным undefined
Stream.range(min: number, max: number) => Stream<number>Диапазон чисел [min, max] включительно
Stream.iterate<A>(init: A, f: (a: A) => A) => Stream<A>Бесконечный поток: init, f(init), f(f(init)), …

Запуск потока

ФункцияСигнатураОписание
Stream.runCollect<A, E, R>(stream: Stream<A, E, R>) => Effect<Chunk<A>, E, R>Собирает все элементы в Chunk
Stream.runForEach<A, E, R>(stream, f) => Effect<void, E, R>Выполняет эффект для каждого элемента
Stream.runDrain<A, E, R>(stream) => Effect<void, E, R>Запускает поток, игнорируя значения
Stream.runFold<A, E, R, S>(stream, init, f) => Effect<S, E, R>Свёртка потока в единственное значение
Stream.runHead<A, E, R>(stream) => Effect<Option<A>, E, R>Извлекает первый элемент

Базовые трансформации

ФункцияОписание
Stream.mapЧистая трансформация каждого элемента
Stream.filterФильтрация элементов по предикату
Stream.takeВзять первые N элементов
Stream.dropПропустить первые N элементов
Stream.concatКонкатенация двух потоков

Примеры

Пример 1: Базовое создание и потребление

import { Stream, Effect } from "effect"

// Создание конечного потока
const numbers = Stream.make(1, 2, 3, 4, 5)

// Трансформация: удвоение каждого элемента
const doubled = numbers.pipe(
  Stream.map((n) => n * 2)
)

// Потребление: собрать все элементы в Chunk
const program = Stream.runCollect(doubled)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 2, 4, 6, 8, 10 ] }

Пример 2: Бесконечный поток с ограничением

import { Stream, Effect } from "effect"

// Бесконечный поток чисел Фибоначчи
const fibonacci = Stream.unfold(
  [0, 1] as const,
  ([a, b]) => Option.some([a, [b, a + b] as const] as const)
)

// Берём первые 10 чисел
const firstTen = fibonacci.pipe(Stream.take(10))

Effect.runPromise(Stream.runCollect(firstTen)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 1, 2, 3, 5, 8, 13, 21, 34 ] }

Пример 3: Поток с зависимостями

import { Stream, Effect, Context, Layer } from "effect"

// Определение сервиса
class UserRepository extends Context.Tag("UserRepository")<
  UserRepository,
  {
    readonly findAll: Effect.Effect<ReadonlyArray<string>>
    readonly findById: (id: number) => Effect.Effect<string>
  }
>() {}

// Поток, зависящий от сервиса
const userStream: Stream.Stream<string, never, UserRepository> =
  Stream.fromEffect(
    UserRepository.pipe(
      Effect.andThen((repo) => repo.findAll)
    )
  ).pipe(Stream.flatMap(Stream.fromIterable))

// Предоставление зависимости и запуск
const TestLayer = Layer.succeed(UserRepository, {
  findAll: Effect.succeed(["Alice", "Bob", "Charlie"]),
  findById: (id) => Effect.succeed(`User-${id}`)
})

const program = userStream.pipe(
  Stream.provideLayer(TestLayer),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 'Alice', 'Bob', 'Charlie' ] }

Пример 4: Ленивость в действии

import { Stream, Effect, Console } from "effect"

// Каждый элемент генерируется лениво
const lazyStream = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.tap((n) => Console.log(`Producing: ${n}`)),
  Stream.map((n) => n * 10),
  Stream.tap((n) => Console.log(`Transformed: ${n}`)),
  Stream.take(3) // Только первые 3 — остальные НЕ будут сгенерированы
)

Effect.runPromise(Stream.runCollect(lazyStream)).then(console.log)
/*
Output:
Producing: 1
Transformed: 10
Producing: 2
Transformed: 20
Producing: 3
Transformed: 30
{ _id: 'Chunk', values: [ 10, 20, 30 ] }
*/
// Элементы 4 и 5 никогда не будут обработаны!

Пример 5: Production — мониторинг метрик

import { Stream, Effect, Schedule, Random, Chunk } from "effect"

// Тип метрики
interface Metric {
  readonly timestamp: number
  readonly cpu: number
  readonly memory: number
}

// Поток метрик, собираемых каждую секунду
const metricsStream: Stream.Stream<Metric> = Stream.repeatEffect(
  Effect.gen(function* () {
    const cpu = yield* Random.nextIntBetween(0, 100)
    const memory = yield* Random.nextIntBetween(20, 90)
    return {
      timestamp: Date.now(),
      cpu,
      memory
    } as Metric
  })
).pipe(Stream.schedule(Schedule.spaced("1 second")))

// Обработка: фильтрация аномальных значений, группировка по 5
const pipeline = metricsStream.pipe(
  Stream.filter((m) => m.cpu < 95), // отбрасываем пиковые значения
  Stream.take(20),                   // берём 20 метрик для примера
  Stream.grouped(5),                 // группируем по 5
  Stream.map((chunk) => ({
    count: Chunk.size(chunk),
    avgCpu: Chunk.reduce(chunk, 0, (acc, m) => acc + m.cpu) / Chunk.size(chunk),
    avgMemory: Chunk.reduce(chunk, 0, (acc, m) => acc + m.memory) / Chunk.size(chunk)
  }))
)

const program = pipeline.pipe(
  Stream.runForEach((summary) =>
    Effect.sync(() => console.log(
      `Batch: avgCPU=${summary.avgCpu.toFixed(1)}%, avgMem=${summary.avgMemory.toFixed(1)}%`
    ))
  )
)

Упражнения

🟢 Basic

Упражнение 1: Первый поток Создайте поток из строк "hello", "world", "effect". Примените Stream.map для преобразования каждой строки в верхний регистр. Соберите результат с помощью Stream.runCollect.

Решение:

import { Stream, Effect } from "effect"

const program = Stream.make("hello", "world", "effect").pipe(
  Stream.map((s) => s.toUpperCase()),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 'HELLO', 'WORLD', 'EFFECT' ] }

Упражнение 2: Диапазон и фильтрация Создайте поток чисел от 1 до 50. Оставьте только числа, делимые на 7. Соберите результат.

Решение:

import { Stream, Effect } from "effect"

const program = Stream.range(1, 50).pipe(
  Stream.filter((n) => n % 7 === 0),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 7, 14, 21, 28, 35, 42, 49 ] }

🟡 Intermediate

Упражнение 3: Бесконечный поток степеней двойки Используя Stream.iterate, создайте бесконечный поток степеней двойки: 1, 2, 4, 8, 16, … Извлеките первые 10 значений.

Решение:

import { Stream, Effect } from "effect"

const powersOfTwo = Stream.iterate(1, (n) => n * 2).pipe(
  Stream.take(10),
  Stream.runCollect
)

Effect.runPromise(powersOfTwo).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512 ] }

Упражнение 4: Поток с эффектами Создайте поток, который генерирует 5 случайных чисел с помощью Stream.repeatEffect и Random.nextIntBetween(1, 100). Отфильтруйте только чётные числа и соберите результат.

Решение:

import { Stream, Effect, Random } from "effect"

const program = Stream.repeatEffect(
  Random.nextIntBetween(1, 100)
).pipe(
  Stream.take(5),
  Stream.filter((n) => n % 2 === 0),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)

🔴 Advanced

Упражнение 5: Потоковый пайплайн с зависимостями Создайте сервис EventSource с методом getEvents: Effect<ReadonlyArray<Event>>, где Event = { id: number, type: string, payload: string }. Реализуйте потоковый пайплайн:

  1. Получение событий из сервиса
  2. Фильтрация по типу "order"
  3. Преобразование payload в uppercase
  4. Ограничение первыми 10 событиями
  5. Сбор результата

Предоставьте тестовый Layer и запустите.

Решение:

import { Stream, Effect, Context, Layer } from "effect"

interface Event {
  readonly id: number
  readonly type: string
  readonly payload: string
}

class EventSource extends Context.Tag("EventSource")<
  EventSource,
  { readonly getEvents: Effect.Effect<ReadonlyArray<Event>> }
>() {}

const eventPipeline = Stream.fromEffect(
  EventSource.pipe(Effect.andThen((es) => es.getEvents))
).pipe(
  Stream.flatMap(Stream.fromIterable),
  Stream.filter((event) => event.type === "order"),
  Stream.map((event) => ({
    ...event,
    payload: event.payload.toUpperCase()
  })),
  Stream.take(10),
  Stream.runCollect
)

const TestEventSource = Layer.succeed(EventSource, {
  getEvents: Effect.succeed(
    Array.from({ length: 20 }, (_, i) => ({
      id: i,
      type: i % 3 === 0 ? "order" : "notification",
      payload: `event-payload-${i}`
    }))
  )
})

const program = Effect.provide(eventPipeline, TestEventSource)

Effect.runPromise(program).then(console.log)