Обработка ошибок в Stream
Обработка ошибок в потоках имеет свою специфику: ошибка может произойти после эмиссии нескольких элементов, и стратегия восстановления должна решить — переключиться на альтернативный поток, повторить попытку или завершить обработку с уже полученными данными.
Теория
Модель ошибок в Stream
Ошибки в Stream, как и в Effect, делятся на два класса:
┌─────────────────────────────────────────────────────┐
│ Expected Errors (E) │
│ - Типизированные, ожидаемые ошибки │
│ - Обрабатываются через catchAll, orElse │
│ - Отражены в сигнатуре: Stream<A, E, R> │
│ │
│ Defects (Die, Interrupt) │
│ - Непредвиденные ошибки (баги, фатальные сбои) │
│ - Обрабатываются через catchAllCause │
│ - НЕ отражены в типе E │
└─────────────────────────────────────────────────────┘
Специфика ошибок в потоках
В отличие от Effect, где ошибка происходит вместо результата, в Stream ошибка может произойти после эмиссии нескольких элементов:
Stream: [1, 2, 3, ⚡ERROR]
┌──────────────────────────────────────┐
│ Что делать с элементами 1, 2, 3? │
│ │
│ orElse: сохранить + переключиться │
│ retry: отбросить + начать заново │
│ catchAll: сохранить + обработать │
└──────────────────────────────────────┘
Recovering — восстановление после ошибки
Stream.orElse — переключение на альтернативный поток
При возникновении ошибки переключается на резервный поток:
import { Stream, Effect } from "effect"
const primary = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("Connection lost")),
Stream.concat(Stream.make(4, 5)) // никогда не выполнится
)
const fallback = Stream.make("a", "b", "c")
const stream = Stream.orElse(primary, () => fallback)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
_id: "Chunk",
values: [ 1, 2, 3, "a", "b", "c" ]
}
*/
Элементы 1, 2, 3 сохраняются, затем поток переключается на fallback.
Stream.orElseEither — с маркировкой источника
Различает элементы из основного и резервного потоков через Either:
import { Stream, Effect } from "effect"
const primary = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("Error")),
)
const fallback = Stream.make("a", "b", "c")
const stream = Stream.orElseEither(primary, () => fallback)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
_id: "Chunk",
values: [
{ _tag: "Left", left: 1 },
{ _tag: "Left", left: 2 },
{ _tag: "Left", left: 3 },
{ _tag: "Right", right: "a" },
{ _tag: "Right", right: "b" },
{ _tag: "Right", right: "c" }
]
}
*/
Stream.orElseSucceed / Stream.orElseFail
import { Stream, Effect } from "effect"
// При ошибке — эмитировать дефолтное значение
const withDefault = Stream.fail("error").pipe(
Stream.orElseSucceed(() => 0)
)
Effect.runPromise(Stream.runCollect(withDefault)).then(console.log)
// { _id: 'Chunk', values: [ 0 ] }
Catching — выборочная обработка
Stream.catchAll — обработка по типу ошибки
Позволяет принять решение на основе типа и значения ошибки:
import { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("Uh Oh!" as const)),
Stream.concat(Stream.make(4, 5)),
Stream.concat(Stream.fail("Ouch" as const))
)
const stream = Stream.catchAll(
s1,
(error): Stream.Stream<string | boolean> => {
switch (error) {
case "Uh Oh!":
return Stream.make("recovered", "from", "error")
case "Ouch":
return Stream.make(true, false)
}
}
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
_id: "Chunk",
values: [ 1, 2, 3, "recovered", "from", "error" ]
}
*/
⚠️ Обратите внимание: после catchAll поток завершается — элементы 4, 5 и вторая ошибка “Ouch” недостижимы.
Stream.catchSome — избирательная обработка
Обрабатывает только определённые ошибки, пробрасывая остальные:
import { Stream, Effect, Option } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("recoverable" as const)),
Stream.concat(Stream.make(4, 5))
)
const stream = Stream.catchSome(s1, (error) => {
if (error === "recoverable") {
return Option.some(Stream.make(99))
}
return Option.none() // пробрасываем ошибку дальше
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 99 ] }
Stream.catchAllCause — обработка дефектов
Обрабатывает любые типы сбоев, включая дефекты (die) и прерывания:
import { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.dieMessage("Boom!")),
Stream.concat(Stream.make(4, 5))
)
const s2 = Stream.make("recovered")
const stream = Stream.catchAllCause(s1, () => s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'recovered' ] }
Stream.catchSomeCause — избирательная обработка Cause
import { Stream, Effect, Option, Cause } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.dieMessage("Unexpected!")),
)
const stream = Stream.catchSomeCause(s1, (cause) => {
if (Cause.isDie(cause)) {
return Option.some(Stream.make("die-handled"))
}
return Option.none()
})
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'die-handled' ] }
Stream.catchTag — обработка Tagged Error
Для работы с tagged errors (наиболее частый паттерн в Effect-ts):
import { Stream, Effect, Data } from "effect"
class NetworkError extends Data.TaggedError("NetworkError")<{
readonly message: string
}> {}
class ValidationError extends Data.TaggedError("ValidationError")<{
readonly field: string
}> {}
const stream = Stream.make(1, 2).pipe(
Stream.concat(Stream.fail(new NetworkError({ message: "timeout" })))
)
const handled = stream.pipe(
Stream.catchTag("NetworkError", (err) =>
Stream.make(-1) // дефолтное значение при сетевой ошибке
)
)
Retry — повторные попытки
Stream.retry — повтор с расписанием
retry повторяет весь поток с начала при возникновении ошибки:
import { Stream, Effect, Schedule } from "effect"
let attempts = 0
const unreliableStream = Stream.fromEffect(
Effect.sync(() => {
attempts++
if (attempts < 3) {
throw new Error(`Attempt ${attempts} failed`)
}
return attempts
}).pipe(Effect.catchAll((e) => Effect.fail(String(e))))
)
const resilientStream = unreliableStream.pipe(
Stream.retry(Schedule.recurs(5))
)
Effect.runPromise(Stream.runCollect(resilientStream)).then(console.log)
Production паттерн: retry с экспоненциальным backoff
import { Stream, Effect, Schedule } from "effect"
const retryPolicy = Schedule.exponential("1 second").pipe(
Schedule.union(Schedule.spaced("30 seconds")), // cap at 30s
Schedule.compose(Schedule.recurs(10)) // max 10 retries
)
// Применение к потоку данных из внешнего API
const apiStream = (url: string) =>
Stream.fromEffect(
Effect.tryPromise({
try: () => fetch(url).then((r) => r.json() as Promise<unknown>),
catch: (e) => new Error(String(e))
})
).pipe(
Stream.retry(retryPolicy)
)
Timeout — ограничение по времени
Stream.timeout — тихое завершение
import { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe(
Stream.timeout("2 seconds")
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }
Stream.timeoutFail — завершение с ошибкой
import { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe(
Stream.timeoutFail(() => "Timeout exceeded", "2 seconds")
)
Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)
// Exit.Failure: "Timeout exceeded"
Stream.timeoutTo — переключение при таймауте
import { Stream, Effect } from "effect"
const stream = Stream.fromEffect(Effect.never).pipe(
Stream.timeoutTo("2 seconds", Stream.make(1, 2, 3))
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
Refining — уточнение ошибок
Stream.refineOrDie — фильтрация типов ошибок
Преобразует все ошибки, не соответствующие предикату, в дефекты:
import { Stream, Option } from "effect"
const stream = Stream.fail(new Error("Generic error"))
const refined = Stream.refineOrDie(stream, (error) => {
if (error instanceof SyntaxError) {
return Option.some(error)
}
return Option.none() // все остальные ошибки станут die
})
Cleanup — очистка при ошибке
Stream.onError — финализация при ошибке
import { Stream, Console, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.dieMessage("Boom!")),
Stream.onError(() =>
Console.log("Cleanup: closing resources").pipe(Effect.orDie)
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log, console.error)
/*
Cleanup: closing resources
RuntimeException: Boom!
*/
Stream.ensuring — гарантированное выполнение
import { Stream, Console, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.ensuring(Console.log("Stream completed (success or failure)"))
)
API Reference
| Функция | Описание |
|---|---|
orElse(fallback) | Переключение на fallback при любой ошибке |
orElseEither(fallback) | Переключение с маркировкой Either |
orElseSucceed(value) | При ошибке → одно дефолтное значение |
catchAll(f) | Обработка ошибки с выбором нового потока |
catchSome(f) | Обработка только некоторых ошибок |
catchAllCause(f) | Обработка любых сбоев (включая die) |
catchSomeCause(f) | Обработка некоторых Cause |
catchTag(tag, f) | Обработка Tagged Error |
retry(schedule) | Повтор потока при ошибке |
timeout(duration) | Тихое завершение при таймауте |
timeoutFail(err, duration) | Завершение с ошибкой при таймауте |
timeoutTo(duration, stream) | Переключение на поток при таймауте |
refineOrDie(f) | Уточнение типа ошибки |
onError(f) | Cleanup при ошибке |
Примеры
Production: отказоустойчивый поток данных
import { Stream, Effect, Schedule, Console, Data } from "effect"
class ConnectionError extends Data.TaggedError("ConnectionError")<{
readonly endpoint: string
}> {}
class ParseError extends Data.TaggedError("ParseError")<{
readonly raw: string
}> {}
// Поток данных из API с многоуровневой обработкой ошибок
const resilientDataStream = (endpoint: string) =>
Stream.repeatEffect(
Effect.tryPromise({
try: () => fetch(endpoint).then((r) => r.json() as Promise<string>),
catch: () => new ConnectionError({ endpoint })
})
).pipe(
// Retry при сетевых ошибках
Stream.retry(
Schedule.exponential("1 second").pipe(
Schedule.compose(Schedule.recurs(5))
)
),
// Обработка ошибок парсинга
Stream.catchTag("ParseError", (err) => {
console.error(`Parse failed for: ${err.raw}`)
return Stream.empty // пропускаем невалидные данные
}),
// Timeout на весь поток
Stream.timeout("5 minutes"),
// Cleanup при любой ошибке
Stream.onError(() =>
Console.log(`Stream from ${endpoint} failed`).pipe(Effect.orDie)
)
)
Упражнения
🟢 Basic
Упражнение 1: Создайте поток, который завершается ошибкой “fail” после элементов [1, 2, 3]. Используйте orElse для переключения на поток [10, 20, 30].
Решение:
import { Stream, Effect } from "effect"
const program = Stream.make(1, 2, 3).pipe(
Stream.concat(Stream.fail("fail")),
Stream.orElse(() => Stream.make(10, 20, 30)),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 10, 20, 30 ] }
🟡 Intermediate
Упражнение 2: Реализуйте поток, который обрабатывает разные типы Tagged ошибок: NetworkError → retry 3 раза, ValidationError → пропуск, FatalError → прекращение.
🔴 Advanced
Упражнение 3: Реализуйте circuit breaker для потока: после 3 последовательных ошибок переключайтесь на fallback поток на 10 секунд, затем возвращайтесь к основному. Используйте Ref для отслеживания состояния.
🔗 Далее: 07-concurrency.md — mapPar, flatMapPar, merge, mergeAll