Effect Курс Конструкторы Stream
Глава

Конструкторы Stream

Полный обзор всех способов создания потоков в Effect-ts — от простых литеральных значений до сложных пагинированных API-запросов и потоков из ресурсов с управляемым жизненным циклом.

Теория

Таксономия конструкторов

Конструкторы Stream можно классифицировать по источнику данных:

                        ┌─────────────────────┐
                        │  Stream Constructors │
                        └─────────┬───────────┘
          ┌───────────────────────┼───────────────────────┐
          │                       │                       │
   ┌──────▼──────┐       ┌───────▼──────┐       ┌───────▼──────┐
   │  Синхронные  │       │ Эффективные  │       │  Ресурсные   │
   │             │       │              │       │              │
   │ make        │       │ fromEffect   │       │ acquireRelease│
   │ empty       │       │ repeatEffect │       │ scoped       │
   │ succeed     │       │ unfoldEffect │       │ finalizer    │
   │ fromIterable│       │ paginateEffect│      │ fromPubSub   │
   │ range       │       │ fromAsync    │       │ fromQueue    │
   │ iterate     │       │ Iterable     │       │              │
   │ unfold      │       │              │       │              │
   │ paginate    │       │              │       │              │
   └─────────────┘       └──────────────┘       └──────────────┘

Ключевой принцип: для каждого чистого (pure) конструктора существует его эффективный аналог с суффиксом Effect (или специальным именем), позволяющий выполнять побочные эффекты при генерации элементов.


Литеральные конструкторы

Stream.make — из перечисленных значений

Самый простой способ создать поток — перечислить значения:

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

Stream.make принимает переменное количество аргументов одного типа и создаёт конечный поток. Все значения сохраняются в один чанк.

Stream.empty — пустой поток

import { Stream, Effect } from "effect"

const stream = Stream.empty

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

Пустой поток полезен как начальный элемент в композиции или как результат «пустого» ответа.

Stream.succeed — одно значение

import { Stream, Effect } from "effect"

const stream = Stream.succeed(42)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 42 ] }

Аналогичен Stream.make(42), но семантически подчёркивает одно значение — как Effect.succeed для потоков.

Stream.fail — ошибка

import { Stream, Effect } from "effect"

const stream: Stream.Stream<never, string> = Stream.fail("Uh oh!")

Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)
// { _id: 'Exit', _tag: 'Failure', cause: { _id: 'Cause', _tag: 'Fail', failure: 'Uh oh!' } }

Stream.void — единственный undefined

import { Stream, Effect } from "effect"

const stream = Stream.void

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ undefined ] }

Stream.range — диапазон чисел

import { Stream, Effect } from "effect"

// Включительно с обоих сторон: [1, 5]
const stream = Stream.range(1, 5)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

⚠️ Важно: оба конца диапазона включены. Stream.range(1, 5) эмитирует 5 элементов: 1, 2, 3, 4, 5.


Конструкторы из коллекций

Stream.fromIterable — из любого Iterable

Создаёт поток из любого объекта, реализующего протокол Iterable:

import { Stream, Effect } from "effect"

// Из массива
const fromArray = Stream.fromIterable([1, 2, 3])

// Из Set
const fromSet = Stream.fromIterable(new Set([1, 2, 3]))

// Из Map (поток пар [key, value])
const fromMap = Stream.fromIterable(
  new Map([["a", 1], ["b", 2]])
)

// Из генератора
function* naturals() {
  let n = 0
  while (true) yield n++
}
const fromGenerator = Stream.fromIterable(naturals()).pipe(
  Stream.take(5)
)

Effect.runPromise(Stream.runCollect(fromGenerator)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4 ] }

Stream.fromIterableEffect — из Effect

Когда получение коллекции само по себе является эффектом:

import { Stream, Effect, Context } from "effect"

class Database extends Context.Tag("Database")<
  Database,
  { readonly getUsers: Effect.Effect<ReadonlyArray<string>> }
>() {}

// Поток пользователей из базы данных
const usersStream = Stream.fromIterableEffect(
  Database.pipe(Effect.andThen((db) => db.getUsers))
)

Effect.runPromise(
  Stream.runCollect(
    usersStream.pipe(
      Stream.provideService(Database, {
        getUsers: Effect.succeed(["Alice", "Bob", "Charlie"])
      })
    )
  )
).then(console.log)
// { _id: 'Chunk', values: [ 'Alice', 'Bob', 'Charlie' ] }

Stream.fromChunk / Stream.fromChunks — из Chunk

import { Stream, Chunk, Effect } from "effect"

// Из одного чанка
const fromOne = Stream.fromChunk(Chunk.make(1, 2, 3))

// Из нескольких чанков
const fromMany = Stream.fromChunks(
  Chunk.make(1, 2, 3),
  Chunk.make(4, 5, 6)
)

Effect.runPromise(Stream.runCollect(fromMany)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6 ] }

Stream.fromAsyncIterable — из AsyncIterable

Для работы с асинхронными итераторами:

import { Stream, Effect } from "effect"

const asyncIterable = async function* () {
  yield 1
  yield 2
  yield 3
}

const stream = Stream.fromAsyncIterable(
  asyncIterable(),
  (e) => new Error(String(e))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Второй аргумент — функция-маппер ошибок, поскольку async-итераторы могут выбрасывать исключения.


Конструкторы из Effect

Stream.fromEffect — из одного Effect

Создаёт поток из одного элемента, порождённого эффектом:

import { Stream, Random, Effect } from "effect"

const stream = Stream.fromEffect(Random.nextInt)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1042302242 ] }

Stream.repeatEffect — бесконечное повторение Effect

Многократно выполняет эффект, создавая бесконечный поток:

import { Stream, Random, Effect } from "effect"

const randomStream = Stream.repeatEffect(Random.nextInt)

Effect.runPromise(
  Stream.runCollect(randomStream.pipe(Stream.take(5)))
).then(console.log)
/*
Пример:
{ _id: 'Chunk', values: [ 1666935266, 604851965, 2194299958, 3393707011, 4090317618 ] }
*/

Stream.repeatEffectOption — повторение с условием остановки

Повторяет эффект до тех пор, пока он не завершится с Option.none():

import { Stream, Effect, Option } from "effect"

// Дренирование Iterator в Stream
const drainIterator = <A>(it: Iterator<A>): Stream.Stream<A> =>
  Stream.repeatEffectOption(
    Effect.sync(() => it.next()).pipe(
      Effect.andThen((res) =>
        res.done
          ? Effect.fail(Option.none())
          : Effect.succeed(res.value)
      )
    )
  )

const arr = [10, 20, 30]
const stream = drainIterator(arr[Symbol.iterator]())

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 10, 20, 30 ] }

Этот паттерн полезен для адаптации любого pull-based источника данных к Stream.


Развёртывание (Unfold)

Концепция ФП: анаморфизм

Если fold (катаморфизм) «сворачивает» коллекцию в значение, то unfold (анаморфизм) «разворачивает» начальное значение в коллекцию:

fold:    [1, 2, 3, 4, 5]  →  15       (разрушение структуры)
unfold:  1  →  [1, 2, 3, 4, 5, ...]   (построение структуры)

step: S → Option<(A, S)>
  None        → конец потока
  Some(a, s') → эмитировать a, продолжить с состоянием s'

Stream.unfold — чистый unfold

import { Stream, Effect, Option } from "effect"

// Натуральные числа
const naturals = Stream.unfold(1, (n) =>
  Option.some([n, n + 1] as const)
)

Effect.runPromise(
  Stream.runCollect(naturals.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

// Конечный unfold: числа от 1 до 10
const oneToTen = Stream.unfold(1, (n) =>
  n <= 10 ? Option.some([n, n + 1] as const) : Option.none()
)

Effect.runPromise(Stream.runCollect(oneToTen)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ] }

Stream.unfoldEffect — эффективный unfold

Когда генерация следующего значения требует выполнения побочных эффектов:

import { Stream, Effect, Option, Random } from "effect"

// Случайное блуждание
const randomWalk = Stream.unfoldEffect(0, (position) =>
  Random.nextBoolean.pipe(
    Effect.map((goRight) => {
      const newPosition = goRight ? position + 1 : position - 1
      return Option.some([position, newPosition] as const)
    })
  )
)

Effect.runPromise(
  Stream.runCollect(randomWalk.pipe(Stream.take(10)))
).then(console.log)
// Пример: { _id: 'Chunk', values: [ 0, 1, 2, 1, 0, -1, -2, -1, 0, 1 ] }

Stream.unfoldChunk / Stream.unfoldChunkEffect

Версии unfold, возвращающие Chunk вместо одного элемента за шаг:

import { Stream, Effect, Option, Chunk } from "effect"

// Генерация чанков по 3 элемента
const chunkedStream = Stream.unfoldChunk(0, (state) =>
  state < 9
    ? Option.some([
        Chunk.make(state, state + 1, state + 2),
        state + 3
      ] as const)
    : Option.none()
)

Effect.runPromise(Stream.runCollect(chunkedStream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8 ] }

Stream.iterate — упрощённый бесконечный unfold

Когда нужен бесконечный поток без условия остановки:

import { Stream, Effect } from "effect"

// iterate — это unfold без Option (всегда продолжает)
const naturals = Stream.iterate(1, (n) => n + 1)

Effect.runPromise(
  Stream.runCollect(naturals.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 4, 5 ] }

Stream.iterate(init, f) эквивалентен Stream.unfold(init, (s) => Option.some([s, f(s)])).


Пагинация

Проблема пагинированных API

При работе с REST API данные часто возвращаются постранично:

GET /api/users?page=1 → { data: [...], nextPage: 2 }
GET /api/users?page=2 → { data: [...], nextPage: 3 }
GET /api/users?page=3 → { data: [...], nextPage: null }

Stream.paginate и его варианты идеально подходят для этого сценария.

Stream.paginate — базовая пагинация

import { Stream, Effect, Option } from "effect"

// paginate эмитирует значение И ЗАТЕМ проверяет условие продолжения
const stream = Stream.paginate(0, (n) => [
  n,                                              // эмитируемое значение
  n < 3 ? Option.some(n + 1) : Option.none()     // следующее состояние
])

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3 ] }

Отличие от unfold: paginate сначала эмитирует текущее значение, а потом решает, продолжать ли. Это значит, что последнее значение перед None всегда включается в поток.

Stream.paginateEffect — пагинация с эффектами

Production-паттерн для пагинированных API:

import { Stream, Effect, Option, Chunk } from "effect"

// Симуляция пагинированного API
interface Page {
  readonly data: ReadonlyArray<string>
  readonly nextCursor: string | null
}

const fetchPage = (cursor: string | null): Effect.Effect<Page> =>
  Effect.succeed({
    data:
      cursor === null ? ["a", "b"] :
      cursor === "page2" ? ["c", "d"] :
      ["e"],
    nextCursor:
      cursor === null ? "page2" :
      cursor === "page2" ? "page3" :
      null
  })

// paginateChunkEffect — идеален для API, возвращающих массивы
const allItems = Stream.paginateChunkEffect(null as string | null, (cursor) =>
  fetchPage(cursor).pipe(
    Effect.map((page) => [
      Chunk.fromIterable(page.data),
      page.nextCursor !== null
        ? Option.some(page.nextCursor)
        : Option.none()
    ] as const)
  )
)

Effect.runPromise(Stream.runCollect(allItems)).then(console.log)
// { _id: 'Chunk', values: [ 'a', 'b', 'c', 'd', 'e' ] }

Варианты пагинации

ФункцияЭлементСостояниеЭффективный
paginateОдно значениеPureНет
paginateEffectОдно значениеEffectfulДа
paginateChunkChunkPureНет
paginateChunkEffectChunkEffectfulДа

Повторение и тики

Stream.repeatValue — бесконечное повторение значения

import { Stream, Effect } from "effect"

const zeros = Stream.repeatValue(0)

Effect.runPromise(
  Stream.runCollect(zeros.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 0, 0, 0, 0, 0 ] }

Stream.repeat — повторение потока по расписанию

import { Stream, Effect, Schedule } from "effect"

// Повторяем поток [1] бесконечно
const repeated = Stream.repeat(Stream.succeed(1), Schedule.forever)

Effect.runPromise(
  Stream.runCollect(repeated.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ 1, 1, 1, 1, 1 ] }

Stream.tick — генерация тиков

Поток, эмитирующий void через указанные интервалы:

import { Stream, Effect } from "effect"

// Тик каждые 100мс
const ticks = Stream.tick("100 millis")

Effect.runPromise(
  Stream.runCollect(ticks.pipe(Stream.take(5)))
).then(console.log)
// { _id: 'Chunk', values: [ undefined, undefined, undefined, undefined, undefined ] }

Полезен для создания heartbeat-потоков и периодических опросов.


Потоки из ресурсов

Stream.scoped — из scoped-ресурса

Создаёт одноэлементный поток из ресурса с управляемым жизненным циклом:

import { Stream, Effect, Console } from "effect"

const stream = Stream.scoped(
  Effect.acquireUseRelease(
    Console.log("acquire"),
    () => Console.log("use"),
    () => Console.log("release")
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
acquire
use
release
{ _id: 'Chunk', values: [ undefined ] }
*/

Stream.acquireRelease — ресурс для многоэлементного потока

import { Stream, Console, Effect } from "effect"

const open = (filename: string) =>
  Effect.gen(function* () {
    yield* Console.log(`Opening ${filename}`)
    return {
      getLines: Effect.succeed(["Line 1", "Line 2", "Line 3"]),
      close: Console.log(`Closing ${filename}`)
    }
  })

const stream = Stream.acquireRelease(
  open("data.csv"),
  (file) => file.close
).pipe(
  Stream.flatMap((file) =>
    Stream.fromEffect(file.getLines).pipe(
      Stream.flatMap(Stream.fromIterable)
    )
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
Opening data.csv
Closing data.csv
{ _id: 'Chunk', values: [ 'Line 1', 'Line 2', 'Line 3' ] }
*/

Stream.finalizer — финализация потока

import { Stream, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.finalizer(Console.log("Stream finalized!")))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Output:
Stream finalized!
{ _id: 'Chunk', values: [ 1, 2, 3 ] }
*/

Потоки из асинхронных источников

Stream.async — из callback-based API

Для адаптации callback-based API (EventEmitter, WebSocket):

import { Stream, Effect, Chunk, Option } from "effect"

// Адаптация callback-based API к Stream
const callbackStream = Stream.async<string>((emit) => {
  // Эмитируем значения через callback
  emit(Effect.succeed(Chunk.make("event1", "event2")))
  emit(Effect.succeed(Chunk.make("event3")))
  // Завершаем поток
  emit(Effect.fail(Option.none()))
})

Effect.runPromise(Stream.runCollect(callbackStream)).then(console.log)
// { _id: 'Chunk', values: [ 'event1', 'event2', 'event3' ] }

Потоки из Queue и PubSub

Stream.fromQueue — из Queue

Создаёт поток элементов, извлекаемых из очереди:

import { Stream, Queue, Effect } from "effect"

const program = Effect.gen(function* () {
  const queue = yield* Queue.unbounded<number>()

  // Наполняем очередь в фоне
  yield* Effect.fork(
    Effect.gen(function* () {
      yield* Queue.offer(queue, 1)
      yield* Queue.offer(queue, 2)
      yield* Queue.offer(queue, 3)
      yield* Queue.shutdown(queue) // Сигнал завершения
    })
  )

  // Читаем из очереди как из потока
  const result = yield* Stream.fromQueue(queue).pipe(Stream.runCollect)
  console.log(result)
})

Effect.runPromise(program)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Stream.fromPubSub — из PubSub

import { Stream, PubSub, Effect } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    const pubsub = yield* PubSub.bounded<string>(16)

    // Подписка создаёт Stream
    const stream = Stream.fromPubSub(pubsub)

    // Публикуем в фоне
    yield* Effect.fork(
      Effect.gen(function* () {
        yield* Effect.sleep("100 millis")
        yield* PubSub.publish(pubsub, "hello")
        yield* PubSub.publish(pubsub, "world")
        yield* PubSub.shutdown(pubsub)
      })
    )

    const result = yield* stream.pipe(Stream.runCollect)
    console.log(result)
  })
)

Effect.runPromise(program)

Потоки из Schedule

Stream.fromSchedule — из расписания

import { Stream, Effect, Schedule } from "effect"

// Поток значений: 0, 1, 2, ..., 9 — по одному каждую секунду
const schedule = Schedule.spaced("1 second").pipe(
  Schedule.compose(Schedule.recurs(10))
)

const stream = Stream.fromSchedule(schedule)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] }

API Reference

Полная таблица конструкторов

КонструкторТип источникаКонечный?Эффективный?
makeПеречисление✅ Да
emptyНичего✅ Да
succeedОдно значение✅ Да
failОшибка✅ Да
voidundefined✅ Да
rangeЧисловой диапазон✅ Да
fromIterableIterable✅/❌
fromIterableEffectEffect✅/❌
fromAsyncIterableAsyncIterable✅/❌
fromChunkChunk✅ Да
fromChunksChunk…✅ Да
fromEffectEffect✅ Да
iterateSeed + function❌ Бесконечный
unfoldSeed + step✅/❌
unfoldEffectSeed + effectful step✅/❌
unfoldChunkSeed + chunk step✅/❌
unfoldChunkEffectSeed + effectful chunk step✅/❌
paginateSeed + paginator✅ Да
paginateEffectSeed + effectful paginator✅ Да
paginateChunkSeed + chunk paginator✅ Да
paginateChunkEffectSeed + effectful chunk paginator✅ Да
repeatValueЗначение❌ Бесконечный
repeatStream + Schedule❌ Зависит
repeatEffectEffect❌ Бесконечный
repeatEffectOptionEffect<A, Option>✅/❌
tickDuration❌ Бесконечный
fromQueueQueue✅/❌
fromPubSubPubSub✅/❌
fromScheduleSchedule✅ Да
scopedScoped Effect✅ Да
acquireReleaseAcquire + Release✅ Да
asyncCallback-based API✅/❌

Примеры

Production: пагинированный REST API

import { Stream, Effect, Option, Chunk, Schedule } from "effect"

interface ApiResponse<T> {
  readonly items: ReadonlyArray<T>
  readonly cursor: string | null
  readonly totalCount: number
}

interface User {
  readonly id: string
  readonly name: string
  readonly email: string
}

// Симуляция API-клиента
const fetchUsers = (cursor: string | null): Effect.Effect<ApiResponse<User>, Error> =>
  Effect.succeed({
    items: cursor === null
      ? [{ id: "1", name: "Alice", email: "alice@example.com" },
         { id: "2", name: "Bob", email: "bob@example.com" }]
      : cursor === "cursor_2"
      ? [{ id: "3", name: "Charlie", email: "charlie@example.com" }]
      : [],
    cursor: cursor === null ? "cursor_2" : null,
    totalCount: 3
  })

// Поток всех пользователей через пагинацию
const allUsers: Stream.Stream<User, Error> = Stream.paginateChunkEffect(
  null as string | null,
  (cursor) =>
    fetchUsers(cursor).pipe(
      Effect.map((response) => [
        Chunk.fromIterable(response.items),
        response.cursor !== null
          ? Option.some(response.cursor)
          : Option.none()
      ] as const)
    )
)

Effect.runPromise(Stream.runCollect(allUsers)).then(console.log)

Production: поток событий с retry

import { Stream, Effect, Schedule, Option, Chunk } from "effect"

// Поток событий из внешней системы с автоматическим переподключением
const eventStream = (connectionId: string): Stream.Stream<string, Error> =>
  Stream.unwrap(
    Effect.gen(function* () {
      // Подключение к источнику событий
      console.log(`Connecting: ${connectionId}`)

      return Stream.repeatEffectOption(
        Effect.gen(function* () {
          // Симуляция получения события
          const event = `event-${Date.now()}`
          return event
        }).pipe(
          Effect.mapError(() => Option.some(new Error("Connection lost")))
        )
      ).pipe(
        Stream.take(5) // Для примера ограничиваем
      )
    })
  )

Упражнения

🟢 Basic

Упражнение 1: Поток из Map Создайте Map<string, number> с 5 парами. Конвертируйте в поток и оставьте только пары, где значение > 10.

Решение:

import { Stream, Effect } from "effect"

const data = new Map([
  ["a", 5], ["b", 15], ["c", 8], ["d", 20], ["e", 12]
])

const program = Stream.fromIterable(data).pipe(
  Stream.filter(([, value]) => value > 10),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ [ 'b', 15 ], [ 'd', 20 ], [ 'e', 12 ] ] }

🟡 Intermediate

Упражнение 2: Факториалы через unfold Создайте поток первых 10 факториалов (1!, 2!, …, 10!) используя Stream.unfold.

Решение:

import { Stream, Effect, Option } from "effect"

const factorials = Stream.unfold(
  [1, 1] as readonly [number, number], // [текущий_факториал, номер]
  ([factorial, n]) =>
    n <= 10
      ? Option.some([factorial, [factorial * (n + 1), n + 1] as const] as const)
      : Option.none()
)

Effect.runPromise(Stream.runCollect(factorials)).then(console.log)

🔴 Advanced

Упражнение 3: Пагинированный API с retry и rate-limiting Реализуйте поток, который:

  1. Загружает данные через пагинированный API (paginateChunkEffect)
  2. При ошибке ретраит с экспоненциальной задержкой (до 3 попыток)
  3. Добавляет задержку 200мс между запросами страниц
  4. Собирает все элементы

Решение:

import { Stream, Effect, Option, Chunk, Schedule } from "effect"

interface Page {
  readonly items: ReadonlyArray<string>
  readonly nextToken: string | null
}

const fetchPage = (token: string | null): Effect.Effect<Page, Error> =>
  Effect.succeed({
    items: token === null ? ["a", "b", "c"] : token === "t2" ? ["d", "e"] : [],
    nextToken: token === null ? "t2" : null
  })

const robustPaginatedStream = Stream.paginateChunkEffect(
  null as string | null,
  (token) =>
    fetchPage(token).pipe(
      Effect.retry(Schedule.exponential("100 millis").pipe(
        Schedule.compose(Schedule.recurs(3))
      )),
      Effect.delay("200 millis"), // Rate limiting
      Effect.map((page) => [
        Chunk.fromIterable(page.items),
        page.nextToken !== null
          ? Option.some(page.nextToken)
          : Option.none()
      ] as const)
    )
)

Effect.runPromise(Stream.runCollect(robustPaginatedStream)).then(console.log)
// { _id: 'Chunk', values: [ 'a', 'b', 'c', 'd', 'e' ] }