Конструкторы Stream
Полный обзор всех способов создания потоков в Effect-ts — от простых литеральных значений до сложных пагинированных API-запросов и потоков из ресурсов с управляемым жизненным циклом.
Теория
Таксономия конструкторов
Конструкторы Stream можно классифицировать по источнику данных:
┌─────────────────────┐
│ Stream Constructors │
└─────────┬───────────┘
┌───────────────────────┼───────────────────────┐
│ │ │
┌──────▼──────┐ ┌───────▼──────┐ ┌───────▼──────┐
│ Синхронные │ │ Эффективные │ │ Ресурсные │
│ │ │ │ │ │
│ make │ │ fromEffect │ │ acquireRelease│
│ empty │ │ repeatEffect │ │ scoped │
│ succeed │ │ unfoldEffect │ │ finalizer │
│ fromIterable│ │ paginateEffect│ │ fromPubSub │
│ range │ │ fromAsync │ │ fromQueue │
│ iterate │ │ Iterable │ │ │
│ unfold │ │ │ │ │
│ paginate │ │ │ │ │
└─────────────┘ └──────────────┘ └──────────────┘
Ключевой принцип: для каждого чистого (pure) конструктора существует его эффективный аналог с суффиксом Effect (или специальным именем), позволяющий выполнять побочные эффекты при генерации элементов.
Литеральные конструкторы
Stream.make — из перечисленных значений
Самый простой способ создать поток — перечислить значения:
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
Stream.make принимает переменное количество аргументов одного типа и создаёт конечный поток. Все значения сохраняются в один чанк.
Stream.empty — пустой поток
import { Stream, Effect } from "effect"
const stream = Stream.empty
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }
Пустой поток полезен как начальный элемент в композиции или как результат «пустого» ответа.
Stream.succeed — одно значение
import { Stream, Effect } from "effect"
const stream = Stream.succeed(42)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 42 ] }
Аналогичен Stream.make(42), но семантически подчёркивает одно значение — как Effect.succeed для потоков.
Stream.fail — ошибка
import { Stream, Effect } from "effect"
const stream: Stream.Stream<never, string> = Stream.fail("Uh oh!")
Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)
// { _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Fail', failure: 'Uh oh!' } }
Stream.void — единственный undefined
import { Stream, Effect } from "effect"
const stream = Stream.void
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ undefined ] }
Stream.range — диапазон чисел
import { Stream, Effect } from "effect"
// Включительно с обоих сторон: [1, 5]
const stream = Stream.range(1, 5)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
⚠️ Важно: оба конца диапазона включены. Stream.range(1, 5) эмитирует 5 элементов: 1, 2, 3, 4, 5.
Конструкторы из коллекций
Stream.fromIterable — из любого Iterable
Создаёт поток из любого объекта, реализующего протокол Iterable:
import { Stream, Effect } from "effect"
// Из массива
const fromArray = Stream.fromIterable([1, 2, 3])
// Из Set
const fromSet = Stream.fromIterable(new Set([1, 2, 3]))
// Из Map (поток пар [key, value])
const fromMap = Stream.fromIterable(
new Map([["a", 1], ["b", 2]])
)
// Из генератора
function* naturals() {
let n = 0
while (true) yield n++
}
const fromGenerator = Stream.fromIterable(naturals()).pipe(
Stream.take(5)
)
Effect.runPromise(Stream.runCollect(fromGenerator)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }
Stream.fromIterableEffect — из Effect
Когда получение коллекции само по себе является эффектом:
import { Stream, Effect, Context } from "effect"
class Database extends Context.Tag("Database")<
Database,
{ readonly getUsers: Effect.Effect<ReadonlyArray<string>> }
>() {}
// Поток пользователей из базы данных
const usersStream = Stream.fromIterableEffect(
Database.pipe(Effect.andThen((db) => db.getUsers))
)
Effect.runPromise(
Stream.runCollect(
usersStream.pipe(
Stream.provideService(Database, {
getUsers: Effect.succeed(["Alice", "Bob", "Charlie"])
})
)
)
).then(console.log)
// { _id: 'Chunk', values: [ 'Alice', 'Bob', 'Charlie' ] }
Stream.fromChunk / Stream.fromChunks — из Chunk
import { Stream, Chunk, Effect } from "effect"
// Из одного чанка
const fromOne = Stream.fromChunk(Chunk.make(1, 2, 3))
// Из нескольких чанков
const fromMany = Stream.fromChunks(
Chunk.make(1, 2, 3),
Chunk.make(4, 5, 6)
)
Effect.runPromise(Stream.runCollect(fromMany)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }
Stream.fromAsyncIterable — из AsyncIterable
Для работы с асинхронными итераторами:
import { Stream, Effect } from "effect"
const asyncIterable = async function* () {
yield 1
yield 2
yield 3
}
const stream = Stream.fromAsyncIterable(
asyncIterable(),
(e) => new Error(String(e))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Второй аргумент — функция-маппер ошибок, поскольку async-итераторы могут выбрасывать исключения.
Конструкторы из Effect
Stream.fromEffect — из одного Effect
Создаёт поток из одного элемента, порождённого эффектом:
import { Stream, Random, Effect } from "effect"
const stream = Stream.fromEffect(Random.nextInt)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1042302242 ] }
Stream.repeatEffect — бесконечное повторение Effect
Многократно выполняет эффект, создавая бесконечный поток:
import { Stream, Random, Effect } from "effect"
const randomStream = Stream.repeatEffect(Random.nextInt)
Effect.runPromise(
Stream.runCollect(randomStream.pipe(Stream.take(5)))
).then(console.log)
/*
Пример:
{ _id: 'Chunk', values: [ 1666935266, 604851965, 2194299958, 3393707011, 4090317618 ] }
*/
Stream.repeatEffectOption — повторение с условием остановки
Повторяет эффект до тех пор, пока он не завершится с Option.none():
import { Stream, Effect, Option } from "effect"
// Дренирование Iterator в Stream
const drainIterator = <A>(it: Iterator<A>): Stream.Stream<A> =>
Stream.repeatEffectOption(
Effect.sync(() => it.next()).pipe(
Effect.andThen((res) =>
res.done
? Effect.fail(Option.none())
: Effect.succeed(res.value)
)
)
)
const arr = [10, 20, 30]
const stream = drainIterator(arr[Symbol.iterator]())
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 10, 20, 30 ] }
Этот паттерн полезен для адаптации любого pull-based источника данных к Stream.
Развёртывание (Unfold)
Концепция ФП: анаморфизм
Если fold (катаморфизм) «сворачивает» коллекцию в значение, то unfold (анаморфизм) «разворачивает» начальное значение в коллекцию:
fold: [1, 2, 3, 4, 5] → 15 (разрушение структуры)
unfold: 1 → [1, 2, 3, 4, 5, ...] (построение структуры)
step: S → Option<(A, S)>
None → конец потока
Some(a, s') → эмитировать a, продолжить с состоянием s'
Stream.unfold — чистый unfold
import { Stream, Effect, Option } from "effect"
// Натуральные числа
const naturals = Stream.unfold(1, (n) =>
Option.some([n, n + 1] as const)
)
Effect.runPromise(
Stream.runCollect(naturals.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
// Конечный unfold: числа от 1 до 10
const oneToTen = Stream.unfold(1, (n) =>
n <= 10 ? Option.some([n, n + 1] as const) : Option.none()
)
Effect.runPromise(Stream.runCollect(oneToTen)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }
Stream.unfoldEffect — эффективный unfold
Когда генерация следующего значения требует выполнения побочных эффектов:
import { Stream, Effect, Option, Random } from "effect"
// Случайное блуждание
const randomWalk = Stream.unfoldEffect(0, (position) =>
Random.nextBoolean.pipe(
Effect.map((goRight) => {
const newPosition = goRight ? position + 1 : position - 1
return Option.some([position, newPosition] as const)
})
)
)
Effect.runPromise(
Stream.runCollect(randomWalk.pipe(Stream.take(10)))
).then(console.log)
// Пример: { _id: 'Chunk', values: [ 0, 1, 2, 1, 0, -1, -2, -1, 0, 1 ] }
Stream.unfoldChunk / Stream.unfoldChunkEffect
Версии unfold, возвращающие Chunk вместо одного элемента за шаг:
import { Stream, Effect, Option, Chunk } from "effect"
// Генерация чанков по 3 элемента
const chunkedStream = Stream.unfoldChunk(0, (state) =>
state < 9
? Option.some([
Chunk.make(state, state + 1, state + 2),
state + 3
] as const)
: Option.none()
)
Effect.runPromise(Stream.runCollect(chunkedStream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8 ] }
Stream.iterate — упрощённый бесконечный unfold
Когда нужен бесконечный поток без условия остановки:
import { Stream, Effect } from "effect"
// iterate — это unfold без Option (всегда продолжает)
const naturals = Stream.iterate(1, (n) => n + 1)
Effect.runPromise(
Stream.runCollect(naturals.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }
Stream.iterate(init, f) эквивалентен Stream.unfold(init, (s) => Option.some([s, f(s)])).
Пагинация
Проблема пагинированных API
При работе с REST API данные часто возвращаются постранично:
GET /api/users?page=1 → { data: [...], nextPage: 2 }
GET /api/users?page=2 → { data: [...], nextPage: 3 }
GET /api/users?page=3 → { data: [...], nextPage: null }
Stream.paginate и его варианты идеально подходят для этого сценария.
Stream.paginate — базовая пагинация
import { Stream, Effect, Option } from "effect"
// paginate эмитирует значение И ЗАТЕМ проверяет условие продолжения
const stream = Stream.paginate(0, (n) => [
n, // эмитируемое значение
n < 3 ? Option.some(n + 1) : Option.none() // следующее состояние
])
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }
Отличие от unfold: paginate сначала эмитирует текущее значение, а потом решает, продолжать ли. Это значит, что последнее значение перед None всегда включается в поток.
Stream.paginateEffect — пагинация с эффектами
Production-паттерн для пагинированных API:
import { Stream, Effect, Option, Chunk } from "effect"
// Симуляция пагинированного API
interface Page {
readonly data: ReadonlyArray<string>
readonly nextCursor: string | null
}
const fetchPage = (cursor: string | null): Effect.Effect<Page> =>
Effect.succeed({
data:
cursor === null ? ["a", "b"] :
cursor === "page2" ? ["c", "d"] :
["e"],
nextCursor:
cursor === null ? "page2" :
cursor === "page2" ? "page3" :
null
})
// paginateChunkEffect — идеален для API, возвращающих массивы
const allItems = Stream.paginateChunkEffect(null as string | null, (cursor) =>
fetchPage(cursor).pipe(
Effect.map((page) => [
Chunk.fromIterable(page.data),
page.nextCursor !== null
? Option.some(page.nextCursor)
: Option.none()
] as const)
)
)
Effect.runPromise(Stream.runCollect(allItems)).then(console.log)
// { _id: 'Chunk', values: [ 'a', 'b', 'c', 'd', 'e' ] }
Варианты пагинации
| Функция | Элемент | Состояние | Эффективный |
|---|---|---|---|
paginate | Одно значение | Pure | Нет |
paginateEffect | Одно значение | Effectful | Да |
paginateChunk | Chunk | Pure | Нет |
paginateChunkEffect | Chunk | Effectful | Да |
Повторение и тики
Stream.repeatValue — бесконечное повторение значения
import { Stream, Effect } from "effect"
const zeros = Stream.repeatValue(0)
Effect.runPromise(
Stream.runCollect(zeros.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }
Stream.repeat — повторение потока по расписанию
import { Stream, Effect, Schedule } from "effect"
// Повторяем поток [1] бесконечно
const repeated = Stream.repeat(Stream.succeed(1), Schedule.forever)
Effect.runPromise(
Stream.runCollect(repeated.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }
Stream.tick — генерация тиков
Поток, эмитирующий void через указанные интервалы:
import { Stream, Effect } from "effect"
// Тик каждые 100мс
const ticks = Stream.tick("100 millis")
Effect.runPromise(
Stream.runCollect(ticks.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ undefined, undefined, undefined, undefined, undefined ] }
Полезен для создания heartbeat-потоков и периодических опросов.
Потоки из ресурсов
Stream.scoped — из scoped-ресурса
Создаёт одноэлементный поток из ресурса с управляемым жизненным циклом:
import { Stream, Effect, Console } from "effect"
const stream = Stream.scoped(
Effect.acquireUseRelease(
Console.log("acquire"),
() => Console.log("use"),
() => Console.log("release")
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
acquire
use
release
{ _id: 'Chunk', values: [ undefined ] }
*/
Stream.acquireRelease — ресурс для многоэлементного потока
import { Stream, Console, Effect } from "effect"
const open = (filename: string) =>
Effect.gen(function* () {
yield* Console.log(`Opening ${filename}`)
return {
getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
close: Console.log(`Closing ${filename}`)
}
})
const stream = Stream.acquireRelease(
open("data.csv"),
(file) => file.close
).pipe(
Stream.flatMap((file) =>
Stream.fromEffect(file.getLines).pipe(
Stream.flatMap(Stream.fromIterable)
)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
Opening data.csv
Closing data.csv
{ _id: 'Chunk', values: [ 'Line 1', 'Line 2', 'Line 3' ] }
*/
Stream.finalizer — финализация потока
import { Stream, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.finalizer(Console.log("Stream finalized!")))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
Stream finalized!
{ _id: 'Chunk', values: [ 1, 2, 3 ] }
*/
Потоки из асинхронных источников
Stream.async — из callback-based API
Для адаптации callback-based API (EventEmitter, WebSocket):
import { Stream, Effect, Chunk, Option } from "effect"
// Адаптация callback-based API к Stream
const callbackStream = Stream.async<string>((emit) => {
// Эмитируем значения через callback
emit(Effect.succeed(Chunk.make("event1", "event2")))
emit(Effect.succeed(Chunk.make("event3")))
// Завершаем поток
emit(Effect.fail(Option.none()))
})
Effect.runPromise(Stream.runCollect(callbackStream)).then(console.log)
// { _id: 'Chunk', values: [ 'event1', 'event2', 'event3' ] }
Потоки из Queue и PubSub
Stream.fromQueue — из Queue
Создаёт поток элементов, извлекаемых из очереди:
import { Stream, Queue, Effect } from "effect"
const program = Effect.gen(function* () {
const queue = yield* Queue.unbounded<number>()
// Наполняем очередь в фоне
yield* Effect.fork(
Effect.gen(function* () {
yield* Queue.offer(queue, 1)
yield* Queue.offer(queue, 2)
yield* Queue.offer(queue, 3)
yield* Queue.shutdown(queue) // Сигнал завершения
})
)
// Читаем из очереди как из потока
const result = yield* Stream.fromQueue(queue).pipe(Stream.runCollect)
console.log(result)
})
Effect.runPromise(program)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Stream.fromPubSub — из PubSub
import { Stream, PubSub, Effect } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const pubsub = yield* PubSub.bounded<string>(16)
// Подписка создаёт Stream
const stream = Stream.fromPubSub(pubsub)
// Публикуем в фоне
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
yield* PubSub.publish(pubsub, "hello")
yield* PubSub.publish(pubsub, "world")
yield* PubSub.shutdown(pubsub)
})
)
const result = yield* stream.pipe(Stream.runCollect)
console.log(result)
})
)
Effect.runPromise(program)
Потоки из Schedule
Stream.fromSchedule — из расписания
import { Stream, Effect, Schedule } from "effect"
// Поток значений: 0, 1, 2, ..., 9 — по одному каждую секунду
const schedule = Schedule.spaced("1 second").pipe(
Schedule.compose(Schedule.recurs(10))
)
const stream = Stream.fromSchedule(schedule)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] }
API Reference
Полная таблица конструкторов
| Конструктор | Тип источника | Конечный? | Эффективный? |
|---|---|---|---|
make | Перечисление | ✅ Да | ❌ |
empty | Ничего | ✅ Да | ❌ |
succeed | Одно значение | ✅ Да | ❌ |
fail | Ошибка | ✅ Да | ❌ |
void | undefined | ✅ Да | ❌ |
range | Числовой диапазон | ✅ Да | ❌ |
fromIterable | Iterable | ✅/❌ | ❌ |
fromIterableEffect | Effect | ✅/❌ | ✅ |
fromAsyncIterable | AsyncIterable | ✅/❌ | ✅ |
fromChunk | Chunk | ✅ Да | ❌ |
fromChunks | Chunk… | ✅ Да | ❌ |
fromEffect | Effect | ✅ Да | ✅ |
iterate | Seed + function | ❌ Бесконечный | ❌ |
unfold | Seed + step | ✅/❌ | ❌ |
unfoldEffect | Seed + effectful step | ✅/❌ | ✅ |
unfoldChunk | Seed + chunk step | ✅/❌ | ❌ |
unfoldChunkEffect | Seed + effectful chunk step | ✅/❌ | ✅ |
paginate | Seed + paginator | ✅ Да | ❌ |
paginateEffect | Seed + effectful paginator | ✅ Да | ✅ |
paginateChunk | Seed + chunk paginator | ✅ Да | ❌ |
paginateChunkEffect | Seed + effectful chunk paginator | ✅ Да | ✅ |
repeatValue | Значение | ❌ Бесконечный | ❌ |
repeat | Stream + Schedule | ❌ Зависит | ❌ |
repeatEffect | Effect | ❌ Бесконечный | ✅ |
repeatEffectOption | Effect<A, Option | ✅/❌ | ✅ |
tick | Duration | ❌ Бесконечный | ✅ |
fromQueue | Queue | ✅/❌ | ✅ |
fromPubSub | PubSub | ✅/❌ | ✅ |
fromSchedule | Schedule | ✅ Да | ✅ |
scoped | Scoped Effect | ✅ Да | ✅ |
acquireRelease | Acquire + Release | ✅ Да | ✅ |
async | Callback-based API | ✅/❌ | ✅ |
Примеры
Production: пагинированный REST API
import { Stream, Effect, Option, Chunk, Schedule } from "effect"
interface ApiResponse<T> {
readonly items: ReadonlyArray<T>
readonly cursor: string | null
readonly totalCount: number
}
interface User {
readonly id: string
readonly name: string
readonly email: string
}
// Симуляция API-клиента
const fetchUsers = (cursor: string | null): Effect.Effect<ApiResponse<User>, Error> =>
Effect.succeed({
items: cursor === null
? [{ id: "1", name: "Alice", email: "alice@example.com" },
{ id: "2", name: "Bob", email: "bob@example.com" }]
: cursor === "cursor_2"
? [{ id: "3", name: "Charlie", email: "charlie@example.com" }]
: [],
cursor: cursor === null ? "cursor_2" : null,
totalCount: 3
})
// Поток всех пользователей через пагинацию
const allUsers: Stream.Stream<User, Error> = Stream.paginateChunkEffect(
null as string | null,
(cursor) =>
fetchUsers(cursor).pipe(
Effect.map((response) => [
Chunk.fromIterable(response.items),
response.cursor !== null
? Option.some(response.cursor)
: Option.none()
] as const)
)
)
Effect.runPromise(Stream.runCollect(allUsers)).then(console.log)
Production: поток событий с retry
import { Stream, Effect, Schedule, Option, Chunk } from "effect"
// Поток событий из внешней системы с автоматическим переподключением
const eventStream = (connectionId: string): Stream.Stream<string, Error> =>
Stream.unwrap(
Effect.gen(function* () {
// Подключение к источнику событий
console.log(`Connecting: ${connectionId}`)
return Stream.repeatEffectOption(
Effect.gen(function* () {
// Симуляция получения события
const event = `event-${Date.now()}`
return event
}).pipe(
Effect.mapError(() => Option.some(new Error("Connection lost")))
)
).pipe(
Stream.take(5) // Для примера ограничиваем
)
})
)
Упражнения
🟢 Basic
Упражнение 1: Поток из Map Создайте Map<string, number> с 5 парами. Конвертируйте в поток и оставьте только пары, где значение > 10.
Решение:
import { Stream, Effect } from "effect"
const data = new Map([
["a", 5], ["b", 15], ["c", 8], ["d", 20], ["e", 12]
])
const program = Stream.fromIterable(data).pipe(
Stream.filter(([, value]) => value > 10),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ [ 'b', 15 ], [ 'd', 20 ], [ 'e', 12 ] ] }
🟡 Intermediate
Упражнение 2: Факториалы через unfold Создайте поток первых 10 факториалов (1!, 2!, …, 10!) используя Stream.unfold.
Решение:
import { Stream, Effect, Option } from "effect"
const factorials = Stream.unfold(
[1, 1] as readonly [number, number], // [текущий_факториал, номер]
([factorial, n]) =>
n <= 10
? Option.some([factorial, [factorial * (n + 1), n + 1] as const] as const)
: Option.none()
)
Effect.runPromise(Stream.runCollect(factorials)).then(console.log)
🔴 Advanced
Упражнение 3: Пагинированный API с retry и rate-limiting Реализуйте поток, который:
- Загружает данные через пагинированный API (
paginateChunkEffect) - При ошибке ретраит с экспоненциальной задержкой (до 3 попыток)
- Добавляет задержку 200мс между запросами страниц
- Собирает все элементы
Решение:
import { Stream, Effect, Option, Chunk, Schedule } from "effect"
interface Page {
readonly items: ReadonlyArray<string>
readonly nextToken: string | null
}
const fetchPage = (token: string | null): Effect.Effect<Page, Error> =>
Effect.succeed({
items: token === null ? ["a", "b", "c"] : token === "t2" ? ["d", "e"] : [],
nextToken: token === null ? "t2" : null
})
const robustPaginatedStream = Stream.paginateChunkEffect(
null as string | null,
(token) =>
fetchPage(token).pipe(
Effect.retry(Schedule.exponential("100 millis").pipe(
Schedule.compose(Schedule.recurs(3))
)),
Effect.delay("200 millis"), // Rate limiting
Effect.map((page) => [
Chunk.fromIterable(page.items),
page.nextToken !== null
? Option.some(page.nextToken)
: Option.none()
] as const)
)
)
Effect.runPromise(Stream.runCollect(robustPaginatedStream)).then(console.log)
// { _id: 'Chunk', values: [ 'a', 'b', 'c', 'd', 'e' ] }