Effect Курс Обработка ошибок в Stream
Глава

Обработка ошибок в Stream

Обработка ошибок в потоках имеет свою специфику: ошибка может произойти после эмиссии нескольких элементов, и стратегия восстановления должна решить — переключиться на альтернативный поток, повторить попытку или завершить обработку с уже полученными данными.

Теория

Модель ошибок в Stream

Ошибки в Stream, как и в Effect, делятся на два класса:

┌─────────────────────────────────────────────────────┐
│  Expected Errors (E)                                │
│  - Типизированные, ожидаемые ошибки                 │
│  - Обрабатываются через catchAll, orElse             │
│  - Отражены в сигнатуре: Stream<A, E, R>            │
│                                                     │
│  Defects (Die, Interrupt)                           │
│  - Непредвиденные ошибки (баги, фатальные сбои)     │
│  - Обрабатываются через catchAllCause                │
│  - НЕ отражены в типе E                            │
└─────────────────────────────────────────────────────┘

Специфика ошибок в потоках

В отличие от Effect, где ошибка происходит вместо результата, в Stream ошибка может произойти после эмиссии нескольких элементов:

Stream:  [1, 2, 3, ⚡ERROR]

                ┌──────────────────────────────────────┐
                │ Что делать с элементами 1, 2, 3?     │
                │                                      │
                │ orElse: сохранить + переключиться     │
                │ retry:  отбросить + начать заново     │
                │ catchAll: сохранить + обработать      │
                └──────────────────────────────────────┘

Recovering — восстановление после ошибки

Stream.orElse — переключение на альтернативный поток

При возникновении ошибки переключается на резервный поток:

import { Stream, Effect } from "effect"

const primary = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("Connection lost")),
  Stream.concat(Stream.make(4, 5)) // никогда не выполнится
)

const fallback = Stream.make("a", "b", "c")

const stream = Stream.orElse(primary, () => fallback)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
  _id: "Chunk",
  values: [ 1, 2, 3, "a", "b", "c" ]
}
*/

Элементы 1, 2, 3 сохраняются, затем поток переключается на fallback.

Stream.orElseEither — с маркировкой источника

Различает элементы из основного и резервного потоков через Either:

import { Stream, Effect } from "effect"

const primary = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("Error")),
)

const fallback = Stream.make("a", "b", "c")

const stream = Stream.orElseEither(primary, () => fallback)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
  _id: "Chunk",
  values: [
    { _tag: "Left", left: 1 },
    { _tag: "Left", left: 2 },
    { _tag: "Left", left: 3 },
    { _tag: "Right", right: "a" },
    { _tag: "Right", right: "b" },
    { _tag: "Right", right: "c" }
  ]
}
*/

Stream.orElseSucceed / Stream.orElseFail

import { Stream, Effect } from "effect"

// При ошибке — эмитировать дефолтное значение
const withDefault = Stream.fail("error").pipe(
  Stream.orElseSucceed(() => 0)
)

Effect.runPromise(Stream.runCollect(withDefault)).then(console.log)
// { _id: 'Chunk', values: [ 0 ] }

Catching — выборочная обработка

Stream.catchAll — обработка по типу ошибки

Позволяет принять решение на основе типа и значения ошибки:

import { Stream, Effect } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("Uh Oh!" as const)),
  Stream.concat(Stream.make(4, 5)),
  Stream.concat(Stream.fail("Ouch" as const))
)

const stream = Stream.catchAll(
  s1,
  (error): Stream.Stream<string | boolean> => {
    switch (error) {
      case "Uh Oh!":
        return Stream.make("recovered", "from", "error")
      case "Ouch":
        return Stream.make(true, false)
    }
  }
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
{
  _id: "Chunk",
  values: [ 1, 2, 3, "recovered", "from", "error" ]
}
*/

⚠️ Обратите внимание: после catchAll поток завершается — элементы 4, 5 и вторая ошибка “Ouch” недостижимы.

Stream.catchSome — избирательная обработка

Обрабатывает только определённые ошибки, пробрасывая остальные:

import { Stream, Effect, Option } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("recoverable" as const)),
  Stream.concat(Stream.make(4, 5))
)

const stream = Stream.catchSome(s1, (error) => {
  if (error === "recoverable") {
    return Option.some(Stream.make(99))
  }
  return Option.none() // пробрасываем ошибку дальше
})

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 99 ] }

Stream.catchAllCause — обработка дефектов

Обрабатывает любые типы сбоев, включая дефекты (die) и прерывания:

import { Stream, Effect } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.dieMessage("Boom!")),
  Stream.concat(Stream.make(4, 5))
)

const s2 = Stream.make("recovered")

const stream = Stream.catchAllCause(s1, () => s2)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'recovered' ] }

Stream.catchSomeCause — избирательная обработка Cause

import { Stream, Effect, Option, Cause } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.dieMessage("Unexpected!")),
)

const stream = Stream.catchSomeCause(s1, (cause) => {
  if (Cause.isDie(cause)) {
    return Option.some(Stream.make("die-handled"))
  }
  return Option.none()
})

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 'die-handled' ] }

Stream.catchTag — обработка Tagged Error

Для работы с tagged errors (наиболее частый паттерн в Effect-ts):

import { Stream, Effect, Data } from "effect"

class NetworkError extends Data.TaggedError("NetworkError")<{
  readonly message: string
}> {}

class ValidationError extends Data.TaggedError("ValidationError")<{
  readonly field: string
}> {}

const stream = Stream.make(1, 2).pipe(
  Stream.concat(Stream.fail(new NetworkError({ message: "timeout" })))
)

const handled = stream.pipe(
  Stream.catchTag("NetworkError", (err) =>
    Stream.make(-1) // дефолтное значение при сетевой ошибке
  )
)

Retry — повторные попытки

Stream.retry — повтор с расписанием

retry повторяет весь поток с начала при возникновении ошибки:

import { Stream, Effect, Schedule } from "effect"

let attempts = 0

const unreliableStream = Stream.fromEffect(
  Effect.sync(() => {
    attempts++
    if (attempts < 3) {
      throw new Error(`Attempt ${attempts} failed`)
    }
    return attempts
  }).pipe(Effect.catchAll((e) => Effect.fail(String(e))))
)

const resilientStream = unreliableStream.pipe(
  Stream.retry(Schedule.recurs(5))
)

Effect.runPromise(Stream.runCollect(resilientStream)).then(console.log)

Production паттерн: retry с экспоненциальным backoff

import { Stream, Effect, Schedule } from "effect"

const retryPolicy = Schedule.exponential("1 second").pipe(
  Schedule.union(Schedule.spaced("30 seconds")), // cap at 30s
  Schedule.compose(Schedule.recurs(10))           // max 10 retries
)

// Применение к потоку данных из внешнего API
const apiStream = (url: string) =>
  Stream.fromEffect(
    Effect.tryPromise({
      try: () => fetch(url).then((r) => r.json() as Promise<unknown>),
      catch: (e) => new Error(String(e))
    })
  ).pipe(
    Stream.retry(retryPolicy)
  )

Timeout — ограничение по времени

Stream.timeout — тихое завершение

import { Stream, Effect } from "effect"

const stream = Stream.fromEffect(Effect.never).pipe(
  Stream.timeout("2 seconds")
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [] }

Stream.timeoutFail — завершение с ошибкой

import { Stream, Effect } from "effect"

const stream = Stream.fromEffect(Effect.never).pipe(
  Stream.timeoutFail(() => "Timeout exceeded", "2 seconds")
)

Effect.runPromiseExit(Stream.runCollect(stream)).then(console.log)
// Exit.Failure: "Timeout exceeded"

Stream.timeoutTo — переключение при таймауте

import { Stream, Effect } from "effect"

const stream = Stream.fromEffect(Effect.never).pipe(
  Stream.timeoutTo("2 seconds", Stream.make(1, 2, 3))
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3 ] }

Refining — уточнение ошибок

Stream.refineOrDie — фильтрация типов ошибок

Преобразует все ошибки, не соответствующие предикату, в дефекты:

import { Stream, Option } from "effect"

const stream = Stream.fail(new Error("Generic error"))

const refined = Stream.refineOrDie(stream, (error) => {
  if (error instanceof SyntaxError) {
    return Option.some(error)
  }
  return Option.none() // все остальные ошибки станут die
})

Cleanup — очистка при ошибке

Stream.onError — финализация при ошибке

import { Stream, Console, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.dieMessage("Boom!")),
  Stream.onError(() =>
    Console.log("Cleanup: closing resources").pipe(Effect.orDie)
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log, console.error)
/*
Cleanup: closing resources
RuntimeException: Boom!
*/

Stream.ensuring — гарантированное выполнение

import { Stream, Console, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.ensuring(Console.log("Stream completed (success or failure)"))
)

API Reference

ФункцияОписание
orElse(fallback)Переключение на fallback при любой ошибке
orElseEither(fallback)Переключение с маркировкой Either
orElseSucceed(value)При ошибке → одно дефолтное значение
catchAll(f)Обработка ошибки с выбором нового потока
catchSome(f)Обработка только некоторых ошибок
catchAllCause(f)Обработка любых сбоев (включая die)
catchSomeCause(f)Обработка некоторых Cause
catchTag(tag, f)Обработка Tagged Error
retry(schedule)Повтор потока при ошибке
timeout(duration)Тихое завершение при таймауте
timeoutFail(err, duration)Завершение с ошибкой при таймауте
timeoutTo(duration, stream)Переключение на поток при таймауте
refineOrDie(f)Уточнение типа ошибки
onError(f)Cleanup при ошибке

Примеры

Production: отказоустойчивый поток данных

import { Stream, Effect, Schedule, Console, Data } from "effect"

class ConnectionError extends Data.TaggedError("ConnectionError")<{
  readonly endpoint: string
}> {}

class ParseError extends Data.TaggedError("ParseError")<{
  readonly raw: string
}> {}

// Поток данных из API с многоуровневой обработкой ошибок
const resilientDataStream = (endpoint: string) =>
  Stream.repeatEffect(
    Effect.tryPromise({
      try: () => fetch(endpoint).then((r) => r.json() as Promise<string>),
      catch: () => new ConnectionError({ endpoint })
    })
  ).pipe(
    // Retry при сетевых ошибках
    Stream.retry(
      Schedule.exponential("1 second").pipe(
        Schedule.compose(Schedule.recurs(5))
      )
    ),

    // Обработка ошибок парсинга
    Stream.catchTag("ParseError", (err) => {
      console.error(`Parse failed for: ${err.raw}`)
      return Stream.empty // пропускаем невалидные данные
    }),

    // Timeout на весь поток
    Stream.timeout("5 minutes"),

    // Cleanup при любой ошибке
    Stream.onError(() =>
      Console.log(`Stream from ${endpoint} failed`).pipe(Effect.orDie)
    )
  )

Упражнения

🟢 Basic

Упражнение 1: Создайте поток, который завершается ошибкой “fail” после элементов [1, 2, 3]. Используйте orElse для переключения на поток [10, 20, 30].

Решение:

import { Stream, Effect } from "effect"

const program = Stream.make(1, 2, 3).pipe(
  Stream.concat(Stream.fail("fail")),
  Stream.orElse(() => Stream.make(10, 20, 30)),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 3, 10, 20, 30 ] }

🟡 Intermediate

Упражнение 2: Реализуйте поток, который обрабатывает разные типы Tagged ошибок: NetworkError → retry 3 раза, ValidationError → пропуск, FatalError → прекращение.

🔴 Advanced

Упражнение 3: Реализуйте circuit breaker для потока: после 3 последовательных ошибок переключайтесь на fallback поток на 10 секунд, затем возвращайтесь к основному. Используйте Ref для отслеживания состояния.


🔗 Далее: 07-concurrency.md — mapPar, flatMapPar, merge, mergeAll