Joining
Операции ожидания и композиции файберов.
Введение в операции с файберами
После создания файбера через fork вам нужны способы взаимодействия с ним. Effect предоставляет богатый набор операций для:
- Ожидания завершения и получения результата
- Проверки состояния без блокировки
- Прерывания выполнения
- Композиции нескольких файберов
Обзор операций
┌─────────────────────────────────────────────────────────────────┐
│ ОПЕРАЦИИ С FIBER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Fiber.join │
│ ├── Блокирующее ожидание │
│ ├── Возвращает A (re-raises E) │
│ └── Самый простой способ получить результат │
│ │
│ Fiber.await │
│ ├── Блокирующее ожидание │
│ ├── Возвращает Exit<A, E> │
│ └── Полная информация о завершении │
│ │
│ Fiber.poll │
│ ├── Неблокирующая проверка │
│ ├── Возвращает Option<Exit<A, E>> │
│ └── Для polling и мониторинга │
│ │
│ Fiber.interrupt │
│ ├── Прерывание файбера │
│ ├── Возвращает Exit<A, E> │
│ └── Безопасная отмена с cleanup │
│ │
└─────────────────────────────────────────────────────────────────┘
Fiber.join — Получение результата
Fiber.join — самая распространённая операция для получения результата файбера. Она блокирует текущий файбер до завершения целевого и возвращает результат или пробрасывает ошибку.
Сигнатура
declare const join: <A, E>(
fiber: Fiber<A, E>
) => Effect.Effect<A, E>
Ключевые характеристики
- Блокирующая — текущий файбер приостанавливается до завершения
- Re-raises ошибки — ошибка из файбера становится ошибкой эффекта
- Unwraps результат — возвращает
A, а неExit<A, E> - Не передаёт прерывания — если файбер прерван, join завершится с interrupted Cause
Базовый пример
const fib = (n: number): Effect.Effect<number> =>
n < 2
? Effect.succeed(n)
: Effect.zipWith(fib(n - 1), fib(n - 2), (a, b) => a + b)
const program = Effect.gen(function* () {
// Fork вычисления
const fiber = yield* Effect.fork(fib(10))
console.log("Calculation running in background...")
yield* Effect.sleep("100 millis")
// Join — дожидаемся результата
const result = yield* Fiber.join(fiber)
console.log(`Fibonacci(10) = ${result}`)
})
Effect.runFork(program)
/*
Output:
Calculation running in background...
Fibonacci(10) = 55
*/
Обработка ошибок через join
const failingTask = Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return yield* Effect.fail("Task failed!" as const)
})
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(failingTask)
// Join пробрасывает ошибку
const result = yield* Fiber.join(fiber)
// Этот код не выполнится при ошибке
console.log("Result:", result)
}).pipe(
Effect.catchAll((error) =>
Effect.succeed(`Caught error: ${error}`)
)
)
Effect.runPromise(program).then(console.log)
// Output: Caught error: Task failed!
Диаграмма join
Main Fiber Forked Fiber
│ │
│── fork ──────────────────────────►│
│ │
│ (doing other work...) │ (computing...)
│ │
│◄───────────── join ───────────────│
│ (blocked) │
│ │
│◄────────────── result ────────────│
│ ▼
│ (completed)
▼
(continues with result)
Fiber.await — Получение Exit
Fiber.await также ожидает завершения файбера, но возвращает Exit<A, E>, давая полную информацию о том, как файбер завершился.
Сигнатура
declare const await: <A, E>(
fiber: Fiber<A, E>
) => Effect.Effect<Exit<A, E>, never, never>
Ключевые характеристики
- Блокирующая — как join
- Never fails — возвращает
Effect<Exit<A, E>, never, never> - Полная информация — Success, Failure, или Interrupted
- Inspection-friendly — можно проанализировать причину завершения
Базовый пример
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return 42
})
)
// Await возвращает Exit
const exit = yield* Fiber.await(fiber)
console.log(exit)
})
Effect.runFork(program)
/*
Output:
{ _id: 'Exit', _tag: 'Success', value: 42 }
*/
Обработка различных Exit состояний
const analyzeExit = <A, E>(exit: Exit.Exit<A, E>): string => {
if (Exit.isSuccess(exit)) {
return `Success: ${exit.value}`
}
const cause = exit.cause
if (Cause.isFailType(cause)) {
return `Failed with: ${cause.error}`
}
if (Cause.isInterruptedOnly(cause)) {
return `Interrupted`
}
if (Cause.isDieType(cause)) {
return `Died with defect: ${cause.defect}`
}
return `Complex failure: ${Cause.pretty(cause)}`
}
const program = Effect.gen(function* () {
// Успешный файбер
const fiber1 = yield* Effect.fork(Effect.succeed(42))
const exit1 = yield* Fiber.await(fiber1)
console.log("Fiber 1:", analyzeExit(exit1))
// Файбер с ошибкой
const fiber2 = yield* Effect.fork(Effect.fail("Oops"))
const exit2 = yield* Fiber.await(fiber2)
console.log("Fiber 2:", analyzeExit(exit2))
// Файбер с прерыванием
const fiber3 = yield* Effect.fork(
Effect.sleep("1 hour")
)
yield* Fiber.interrupt(fiber3)
const exit3 = yield* Fiber.await(fiber3)
console.log("Fiber 3:", analyzeExit(exit3))
})
Effect.runFork(program)
/*
Output:
Fiber 1: Success: 42
Fiber 2: Failed with: Oops
Fiber 3: Interrupted
*/
Паттерн: Изоляция ошибок
// Изолируем ошибку одного файбера от остальных
const isolatedExecution = <A, E>(
effect: Effect.Effect<A, E>
) =>
Effect.gen(function* () {
const fiber = yield* Effect.fork(effect)
const exit = yield* Fiber.await(fiber)
return exit
})
const program = Effect.gen(function* () {
const results = yield* Effect.all([
isolatedExecution(Effect.succeed(1)),
isolatedExecution(Effect.fail("error")),
isolatedExecution(Effect.succeed(3))
])
// Все три результата получены, несмотря на ошибку во втором
for (const [index, exit] of results.entries()) {
console.log(`Task ${index}: ${Exit.isSuccess(exit) ? "Success" : "Failure"}`)
}
})
Effect.runFork(program)
/*
Output:
Task 0: Success
Task 1: Failure
Task 2: Success
*/
Fiber.poll — Неблокирующая проверка
Fiber.poll позволяет проверить состояние файбера без блокировки. Возвращает Option<Exit<A, E>> — None если файбер ещё выполняется, Some(exit) если завершился.
Сигнатура
declare const poll: <A, E>(
fiber: Fiber<A, E>
) => Effect.Effect<Option<Exit<A, E>>, never, never>
Ключевые характеристики
- Неблокирующая — возвращается немедленно
- Idempotent — можно вызывать многократно
- Never fails — всегда возвращает успешный Effect
- Monitoring-friendly — идеально для polling loops
Базовый пример
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep("500 millis")
return "done"
})
)
// Немедленная проверка — файбер ещё работает
const poll1 = yield* Fiber.poll(fiber)
console.log("Poll 1:", Option.isNone(poll1) ? "Still running" : "Completed")
yield* Effect.sleep("300 millis")
// Ещё одна проверка
const poll2 = yield* Fiber.poll(fiber)
console.log("Poll 2:", Option.isNone(poll2) ? "Still running" : "Completed")
yield* Effect.sleep("300 millis")
// Теперь должен быть завершён
const poll3 = yield* Fiber.poll(fiber)
if (Option.isSome(poll3)) {
const result = Exit.isSuccess(poll3.value)
? poll3.value.value
: "error"
console.log("Poll 3: Completed with", result)
}
})
Effect.runFork(program)
/*
Output:
Poll 1: Still running
Poll 2: Still running
Poll 3: Completed with done
*/
Polling loop с таймаутом
const pollWithTimeout = <A, E>(
fiber: Fiber.Fiber<A, E>,
timeout: Duration.DurationInput,
pollInterval: Duration.DurationInput
): Effect.Effect<A, E | "Timeout"> =>
Effect.gen(function* () {
const deadline = Date.now() + Duration.toMillis(timeout)
while (Date.now() < deadline) {
const maybeExit = yield* Fiber.poll(fiber)
if (Option.isSome(maybeExit)) {
const exit = maybeExit.value
if (Exit.isSuccess(exit)) {
return exit.value
} else {
return yield* Effect.failCause(exit.cause)
}
}
yield* Effect.sleep(pollInterval)
}
// Таймаут — прерываем файбер
yield* Fiber.interrupt(fiber)
return yield* Effect.fail("Timeout" as const)
})
const program = Effect.gen(function* () {
// Быстрая задача — успеет завершиться
const fiber1 = yield* Effect.fork(
Effect.sleep("200 millis").pipe(Effect.as("fast result"))
)
const result1 = yield* pollWithTimeout(fiber1, "1 second", "50 millis")
console.log("Result 1:", result1)
// Медленная задача — таймаут
const fiber2 = yield* Effect.fork(
Effect.sleep("5 seconds").pipe(Effect.as("slow result"))
)
const result2 = yield* pollWithTimeout(fiber2, "500 millis", "100 millis").pipe(
Effect.catchAll((e) => Effect.succeed(`Caught: ${e}`))
)
console.log("Result 2:", result2)
})
Effect.runFork(program)
/*
Output:
Result 1: fast result
Result 2: Caught: Timeout
*/
Мониторинг нескольких файберов
const monitorFibers = <A, E>(
fibers: ReadonlyArray<Fiber.Fiber<A, E>>
) =>
Effect.gen(function* () {
while (true) {
const polls = yield* Effect.forEach(fibers, Fiber.poll)
const completed = polls.filter(Option.isSome).length
const running = polls.filter(Option.isNone).length
console.log(`Status: ${completed} completed, ${running} running`)
if (running === 0) {
const results = polls.map((p) => {
if (Option.isSome(p) && Exit.isSuccess(p.value)) {
return p.value.value
}
return null
})
return results
}
yield* Effect.sleep("100 millis")
}
})
const program = Effect.gen(function* () {
const fibers = yield* Effect.forEach(
[100, 300, 200, 500, 150],
(delay) => Effect.fork(
Effect.sleep(`${delay} millis`).pipe(Effect.as(delay))
)
)
const results = yield* monitorFibers(fibers)
console.log("Final results:", results)
})
Effect.runFork(program)
Fiber.interrupt — Прерывание файбера
Fiber.interrupt прерывает файбер и возвращает его Exit. Прерывание безопасно — все finalizers будут выполнены.
Сигнатура
declare const interrupt: <A, E>(
fiber: Fiber<A, E>
) => Effect.Effect<Exit<A, E>, never, never>
Ключевые характеристики
- Асинхронное — сигнализирует о прерывании, но завершение может занять время
- Back-pressuring — по умолчанию ждёт полного завершения файбера
- Safe — выполняет все finalizers
- Идемпотентное — повторный вызов безопасен
Базовый пример прерывания
const program = Effect.gen(function* () {
// Fork бесконечно работающего файбера
const fiber = yield* Effect.fork(
Effect.forever(
Effect.log("Hi!").pipe(Effect.delay("10 millis"))
)
)
yield* Effect.sleep("30 millis")
// Прерываем и получаем Exit
const exit = yield* Fiber.interrupt(fiber)
console.log(exit)
})
Effect.runFork(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message=Hi!
timestamp=... level=INFO fiber=#1 message=Hi!
{
_id: 'Exit',
_tag: 'Failure',
cause: {
_id: 'Cause',
_tag: 'Interrupt',
fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... }
}
}
*/
Back-pressuring vs Fire-and-forget
const slowCleanup = Effect.gen(function* () {
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* Effect.log("Cleanup started...")
yield* Effect.sleep("500 millis")
yield* Effect.log("Cleanup completed!")
})
)
yield* Effect.forever(Effect.sleep("100 millis"))
})
// Back-pressuring (default) — ждём завершения cleanup
const withBackpressure = Effect.gen(function* () {
const fiber = yield* Effect.fork(slowCleanup)
yield* Effect.sleep("100 millis")
console.log("Interrupting with back-pressure...")
yield* Fiber.interrupt(fiber) // Ждёт cleanup
console.log("Interrupt completed!") // После cleanup
})
// Fire-and-forget — не ждём
const withoutBackpressure = Effect.gen(function* () {
const fiber = yield* Effect.fork(slowCleanup)
yield* Effect.sleep("100 millis")
console.log("Interrupting without back-pressure...")
yield* Effect.fork(Fiber.interrupt(fiber)) // Не ждём
console.log("Interrupt signal sent!") // Немедленно
yield* Effect.sleep("1 second") // Даём время на cleanup
})
Effect.runFork(withBackpressure)
/*
Output:
Interrupting with back-pressure...
Cleanup started...
Cleanup completed!
Interrupt completed!
*/
Fiber.interruptFork — Удобный shorthand
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.forever(Effect.log("Working...").pipe(Effect.delay("50 millis")))
)
yield* Effect.sleep("200 millis")
// Эквивалентно Effect.fork(Fiber.interrupt(fiber))
yield* Fiber.interruptFork(fiber)
console.log("Interrupt signal sent, continuing...")
yield* Effect.sleep("100 millis")
})
Effect.runFork(program)
Прерывание с причиной
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.sleep("1 hour")
)
// Обычное прерывание
const exit1 = yield* Fiber.interrupt(fiber)
if (Exit.isFailure(exit1) && Cause.isInterruptedOnly(exit1.cause)) {
const interruptors = Cause.interruptors(exit1.cause)
console.log("Interrupted by:", interruptors)
}
})
Effect.runFork(program)
/*
Output:
Interrupted by: Set { FiberId.Runtime(0, ...) }
*/
Композиция файберов
Effect предоставляет операции для композиции нескольких файберов.
Fiber.zip — Объединение в tuple
const program = Effect.gen(function* () {
const fiber1 = yield* Effect.fork(Effect.succeed("Hello"))
const fiber2 = yield* Effect.fork(Effect.succeed("World"))
// Объединяем файберы
const combined = Fiber.zip(fiber1, fiber2)
// Join возвращает tuple
const [a, b] = yield* Fiber.join(combined)
console.log(`${a} ${b}`)
})
Effect.runFork(program)
// Output: Hello World
Fiber.zipWith — Объединение с функцией
const program = Effect.gen(function* () {
const fiber1 = yield* Effect.fork(Effect.succeed(10))
const fiber2 = yield* Effect.fork(Effect.succeed(32))
// Комбинируем результаты
const combined = Fiber.zipWith(
fiber1,
fiber2,
(a, b) => a + b
)
const result = yield* Fiber.join(combined)
console.log(`Sum: ${result}`)
})
Effect.runFork(program)
// Output: Sum: 42
Fiber.orElse — Fallback
const program = Effect.gen(function* () {
// Первый файбер завершится ошибкой
const fiber1 = yield* Effect.fork(Effect.fail("Primary failed"))
// Второй — успешно
const fiber2 = yield* Effect.fork(Effect.succeed("Fallback success"))
// orElse: если первый провалится, используем второй
const withFallback = Fiber.orElse(fiber1, fiber2)
const result = yield* Fiber.join(withFallback)
console.log(result)
})
Effect.runFork(program)
// Output: Fallback success
Fiber.all — Сбор всех результатов
const program = Effect.gen(function* () {
const fibers = yield* Effect.forEach(
[1, 2, 3, 4, 5],
(n) => Effect.fork(
Effect.sleep(`${n * 100} millis`).pipe(Effect.as(n * n))
)
)
// Собираем все файберы в один
const allFibers = Fiber.all(fibers)
// Join всех
const results = yield* Fiber.join(allFibers)
console.log("Results:", results)
})
Effect.runFork(program)
// Output: Results: [1, 4, 9, 16, 25]
Семантика композиции
┌─────────────────────────────────────────────────────────────────┐
│ КОМПОЗИЦИЯ ФАЙБЕРОВ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Fiber.zip(f1, f2) │
│ ├── Оба должны успешно завершиться │
│ ├── Если один провалится — composite fiber провалится │
│ └── Результат: [A, B] │
│ │
│ Fiber.orElse(f1, f2) │
│ ├── Если f1 успешен — результат f1 │
│ ├── Если f1 провалился — результат f2 │
│ └── f2 может провалиться или успеть │
│ │
│ Fiber.all([f1, f2, ...]) │
│ ├── Все должны успешно завершиться │
│ ├── Если любой провалится — composite провалится │
│ └── Результат: [A1, A2, ...] │
│ │
└─────────────────────────────────────────────────────────────────┘
Fiber.status — Получение статуса
Fiber.status возвращает текущий статус файбера без блокировки.
Типы статусов
// Файбер выполняется
type Running = FiberStatus.Running
// Файбер приостановлен (ждёт чего-то)
type Suspended = FiberStatus.Suspended
// Файбер завершён
type Done = FiberStatus.Done
Пример мониторинга статуса
const monitorStatus = (fiber: Fiber.RuntimeFiber<unknown, unknown>) =>
Effect.gen(function* () {
while (true) {
const status = yield* Fiber.status(fiber)
if (FiberStatus.isDone(status)) {
console.log("Fiber completed!")
return
}
if (FiberStatus.isRunning(status)) {
console.log("Fiber is running")
}
if (FiberStatus.isSuspended(status)) {
console.log("Fiber is suspended")
}
yield* Effect.sleep("100 millis")
}
})
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.log("Starting work...")
yield* Effect.sleep("300 millis")
yield* Effect.log("Finishing work...")
return "done"
})
)
yield* monitorStatus(fiber)
})
Effect.runFork(program)
API Reference
Операции ожидания
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.join | Fiber<A, E> → Effect<A, E> | Блокирующее ожидание, re-raises ошибку |
Fiber.await | Fiber<A, E> → Effect<Exit<A, E>> | Блокирующее ожидание, возвращает Exit |
Fiber.poll | Fiber<A, E> → Effect<Option<Exit<A, E>>> | Неблокирующая проверка |
Прерывание
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.interrupt | Fiber<A, E> → Effect<Exit<A, E>> | Прервать и дождаться |
Fiber.interruptFork | Fiber<A, E> → Effect<void> | Прервать без ожидания |
Fiber.interruptAll | Iterable<Fiber<A, E>> → Effect<void> | Прервать все файберы |
Fiber.interruptAllAs | (FiberId, Iterable<Fiber>) → Effect<void> | Прервать от имени |
Композиция
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.zip | (Fiber<A>, Fiber<B>) → Fiber<[A, B]> | Объединить в tuple |
Fiber.zipWith | (Fiber<A>, Fiber<B>, f) → Fiber<C> | Объединить с функцией |
Fiber.orElse | (Fiber<A>, Fiber<A>) → Fiber<A> | Fallback |
Fiber.all | Iterable<Fiber<A>> → Fiber<A[]> | Собрать все |
Инспекция
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.status | Fiber<A, E> → Effect<FiberStatus> | Текущий статус |
Fiber.id | Fiber<A, E> → FiberId | ID файбера |
Примеры
Пример 1: Race с таймаутом
const raceWithTimeout = <A, E>(
effect: Effect.Effect<A, E>,
timeoutMs: number
): Effect.Effect<A, E | "Timeout"> =>
Effect.gen(function* () {
// Создаём Deferred для результата
const result = yield* Deferred.make<A, E | "Timeout">()
// Fork основную задачу
const mainFiber = yield* Effect.fork(
Effect.gen(function* () {
const value = yield* effect
yield* Deferred.succeed(result, value)
}).pipe(
Effect.catchAll((error) => Deferred.fail(result, error))
)
)
// Fork таймер
const timerFiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(`${timeoutMs} millis`)
yield* Deferred.fail(result, "Timeout" as const)
})
)
// Ждём результат
const value = yield* Deferred.await(result)
// Cleanup — прерываем оставшийся файбер
yield* Fiber.interruptFork(mainFiber)
yield* Fiber.interruptFork(timerFiber)
return value
})
const program = Effect.gen(function* () {
// Успешный случай
const result1 = yield* raceWithTimeout(
Effect.sleep("100 millis").pipe(Effect.as("fast")),
500
)
console.log("Result 1:", result1)
// Таймаут
const result2 = yield* raceWithTimeout(
Effect.sleep("1 second").pipe(Effect.as("slow")),
200
).pipe(
Effect.catchAll((e) => Effect.succeed(`Error: ${e}`))
)
console.log("Result 2:", result2)
})
Effect.runFork(program)
/*
Output:
Result 1: fast
Result 2: Error: Timeout
*/
Пример 2: Batch processor с прогрессом
interface BatchResult<A> {
readonly completed: ReadonlyArray<A>
readonly failed: number
readonly total: number
}
const processBatch = <A, E, B>(
items: ReadonlyArray<A>,
process: (item: A) => Effect.Effect<B, E>,
onProgress: (completed: number, total: number) => Effect.Effect<void>
): Effect.Effect<BatchResult<B>> =>
Effect.gen(function* () {
const results = yield* Ref.make<Chunk.Chunk<B>>(Chunk.empty())
const failedCount = yield* Ref.make(0)
const completedCount = yield* Ref.make(0)
// Процессим каждый item
const fibers = yield* Effect.forEach(items, (item) =>
Effect.fork(
Effect.gen(function* () {
const exit = yield* Effect.exit(process(item))
if (Exit.isSuccess(exit)) {
yield* Ref.update(results, Chunk.append(exit.value))
} else {
yield* Ref.update(failedCount, (n) => n + 1)
}
const completed = yield* Ref.updateAndGet(completedCount, (n) => n + 1)
yield* onProgress(completed, items.length)
})
)
)
// Ждём все файберы
yield* Effect.forEach(fibers, Fiber.join, { discard: true })
const completed = yield* Ref.get(results)
const failed = yield* Ref.get(failedCount)
return {
completed: Chunk.toArray(completed),
failed,
total: items.length
}
})
const program = Effect.gen(function* () {
const items = Array.from({ length: 10 }, (_, i) => i + 1)
const result = yield* processBatch(
items,
(n) =>
n % 3 === 0
? Effect.fail(`Error for ${n}`)
: Effect.sleep(`${n * 50} millis`).pipe(Effect.as(n * n)),
(completed, total) =>
Effect.log(`Progress: ${completed}/${total}`)
)
console.log("Final result:", result)
})
Effect.runFork(program)
Пример 3: First success pattern
const firstSuccess = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
Effect.gen(function* () {
if (effects.length === 0) {
return yield* Effect.die("No effects provided")
}
const result = yield* Deferred.make<A, E>()
const failureCount = yield* Ref.make(0)
const lastError = yield* Ref.make<E | null>(null)
const fibers = yield* Effect.forEach(effects, (effect) =>
Effect.fork(
Effect.gen(function* () {
const exit = yield* Effect.exit(effect)
if (Exit.isSuccess(exit)) {
// Первый успех выигрывает
yield* Deferred.succeed(result, exit.value)
} else {
// Запоминаем ошибку
const failures = Cause.failures(exit.cause)
if (failures.length > 0) {
yield* Ref.set(lastError, failures[0]!)
}
const count = yield* Ref.updateAndGet(failureCount, (n) => n + 1)
// Все провалились
if (count === effects.length) {
const error = yield* Ref.get(lastError)
if (error !== null) {
yield* Deferred.fail(result, error)
}
}
}
})
)
)
const value = yield* Deferred.await(result)
// Cleanup
yield* Effect.forEach(fibers, Fiber.interruptFork, { discard: true })
return value
})
const program = Effect.gen(function* () {
const result = yield* firstSuccess([
Effect.sleep("300 millis").pipe(Effect.flatMap(() => Effect.fail("slow fail"))),
Effect.sleep("100 millis").pipe(Effect.flatMap(() => Effect.fail("fast fail"))),
Effect.sleep("200 millis").pipe(Effect.as("success!")),
Effect.sleep("500 millis").pipe(Effect.as("too slow"))
])
console.log("First success:", result)
})
Effect.runFork(program)
// Output: First success: success!
Упражнения
Упражнение 1: Join vs Await
Сравните поведение join и await при ошибке.
import { Effect, Fiber, Exit } from "effect"
const failingEffect = Effect.fail("Something went wrong!")
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(failingEffect)
// Часть 1: Используйте join и обработайте ошибку через catchAll
// Ваш код...
// Часть 2: Используйте await и проанализируйте Exit
// Ваш код...
})import { Effect, Fiber, Exit, Cause } from "effect"
const failingEffect = Effect.fail("Something went wrong!")
const program = Effect.gen(function* () {
// Часть 1: join + catchAll
const fiber1 = yield* Effect.fork(failingEffect)
const result1 = yield* Fiber.join(fiber1).pipe(
Effect.catchAll((error) => Effect.succeed(`Caught via join: ${error}`))
)
console.log(result1)
// Часть 2: await + Exit анализ
const fiber2 = yield* Effect.fork(failingEffect)
const exit = yield* Fiber.await(fiber2)
if (Exit.isSuccess(exit)) {
console.log("Success:", exit.value)
} else {
const failures = Cause.failures(exit.cause)
console.log("Failure via await:", failures)
}
})
Effect.runFork(program)
/*
Output:
Caught via join: Something went wrong!
Failure via await: ['Something went wrong!']
*/Упражнение 2: Простой polling
Реализуйте функцию, которая poll’ит файбер каждые 100ms до завершения.
import { Effect, Fiber, Option, Exit } from "effect"
const pollUntilDone = <A, E>(
fiber: Fiber.Fiber<A, E>
): Effect.Effect<A, E> =>
// Ваша реализация
???import { Effect, Fiber, Option, Exit } from "effect"
const pollUntilDone = <A, E>(
fiber: Fiber.Fiber<A, E>
): Effect.Effect<A, E> =>
Effect.gen(function* () {
while (true) {
const maybeExit = yield* Fiber.poll(fiber)
if (Option.isSome(maybeExit)) {
const exit = maybeExit.value
if (Exit.isSuccess(exit)) {
return exit.value
} else {
return yield* Effect.failCause(exit.cause)
}
}
yield* Effect.sleep("100 millis")
}
})
// Тест
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
Effect.sleep("350 millis").pipe(Effect.as("done!"))
)
const start = Date.now()
const result = yield* pollUntilDone(fiber)
const elapsed = Date.now() - start
console.log(`Result: ${result}, elapsed: ~${Math.round(elapsed / 100) * 100}ms`)
})
Effect.runFork(program)
// Output: Result: done!, elapsed: ~400msУпражнение 3: Race N effects
Реализуйте функцию, которая запускает N эффектов и возвращает первый успешный результат.
import { Effect, Fiber } from "effect"
const raceN = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
// Ваша реализация:
// 1. Fork все эффекты
// 2. Используйте Fiber.orElse для цепочки
// 3. Очистите оставшиеся файберы
???import { Effect, Fiber, Deferred, Exit, Ref, Cause } from "effect"
const raceN = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
Effect.gen(function* () {
if (effects.length === 0) {
return yield* Effect.die("No effects to race")
}
const result = yield* Deferred.make<A, E>()
const failedCount = yield* Ref.make(0)
const errors = yield* Ref.make<E[]>([])
const fibers = yield* Effect.forEach(effects, (effect) =>
Effect.fork(
Effect.gen(function* () {
const exit = yield* Effect.exit(effect)
if (Exit.isSuccess(exit)) {
yield* Deferred.succeed(result, exit.value)
} else {
const failures = Cause.failures(exit.cause)
if (failures.length > 0) {
yield* Ref.update(errors, (es) => [...es, ...failures])
}
const count = yield* Ref.updateAndGet(failedCount, (n) => n + 1)
// Все провалились — возвращаем последнюю ошибку
if (count === effects.length) {
const allErrors = yield* Ref.get(errors)
if (allErrors.length > 0) {
yield* Deferred.fail(result, allErrors[allErrors.length - 1]!)
}
}
}
})
)
)
const value = yield* Deferred.await(result)
// Cleanup
yield* Effect.forEach(fibers, Fiber.interruptFork, { discard: true })
return value
})
// Тест
const program = Effect.gen(function* () {
const effects = [
Effect.sleep("300 millis").pipe(Effect.flatMap(() => Effect.fail("error 1" as const))),
Effect.sleep("200 millis").pipe(Effect.as("winner!")),
Effect.sleep("100 millis").pipe(Effect.flatMap(() => Effect.fail("error 2" as const)))
]
const result = yield* raceN(effects).pipe(
Effect.catchAll((e) => Effect.succeed(`All failed: ${e}`))
)
console.log("Result:", result)
})
Effect.runFork(program)
// Output: Result: winner!Упражнение 4: Композиция с таймаутами
Создайте функцию, объединяющую результаты нескольких файберов с индивидуальными таймаутами.
import { Effect, Fiber } from "effect"
interface TimedTask<A> {
readonly task: Effect.Effect<A>
readonly timeoutMs: number
readonly name: string
}
interface TaskResult<A> {
readonly name: string
readonly result: A | "timeout" | "error"
}
const runTimedTasks = <A>(
tasks: ReadonlyArray<TimedTask<A>>
): Effect.Effect<ReadonlyArray<TaskResult<A>>> =>
// Ваша реализация
???import { Effect, Fiber, Option, Exit } from "effect"
interface TimedTask<A> {
readonly task: Effect.Effect<A>
readonly timeoutMs: number
readonly name: string
}
interface TaskResult<A> {
readonly name: string
readonly result: A | "timeout" | "error"
}
const runTimedTasks = <A>(
tasks: ReadonlyArray<TimedTask<A>>
): Effect.Effect<ReadonlyArray<TaskResult<A>>> =>
Effect.gen(function* () {
const results = yield* Effect.forEach(
tasks,
(task) =>
Effect.gen(function* () {
const fiber = yield* Effect.fork(task.task)
// Wait for timeout
yield* Effect.sleep(`${task.timeoutMs} millis`)
// Check result
const maybeExit = yield* Fiber.poll(fiber)
let result: A | "timeout" | "error"
if (Option.isNone(maybeExit)) {
// Timeout — interrupt and return timeout
yield* Fiber.interruptFork(fiber)
result = "timeout"
} else {
const exit = maybeExit.value
if (Exit.isSuccess(exit)) {
result = exit.value
} else {
result = "error"
}
}
return { name: task.name, result }
}),
{ concurrency: "unbounded" }
)
return results
})
// Тест
const program = Effect.gen(function* () {
const tasks: ReadonlyArray<TimedTask<string>> = [
{ name: "fast", task: Effect.sleep("50 millis").pipe(Effect.as("fast result")), timeoutMs: 200 },
{ name: "slow", task: Effect.sleep("500 millis").pipe(Effect.as("slow result")), timeoutMs: 200 },
{ name: "failing", task: Effect.fail("oops"), timeoutMs: 200 }
]
const results = yield* runTimedTasks(tasks)
console.log("Results:", results)
})
Effect.runFork(program)
/*
Output:
Results: [
{ name: 'fast', result: 'fast result' },
{ name: 'slow', result: 'timeout' },
{ name: 'failing', result: 'error' }
]
*/Упражнение 5: Circuit Breaker с мониторингом файберов
Реализуйте Circuit Breaker, который отслеживает состояние через файберы.
import { Effect, Fiber, Ref, Deferred, Schedule } from "effect"
interface CircuitBreaker {
readonly execute: <A, E>(effect: Effect.Effect<A, E>) => Effect.Effect<A, E | "CircuitOpen">
readonly getState: Effect.Effect<"closed" | "open" | "half-open">
readonly getStats: Effect.Effect<{ failures: number; successes: number }>
}
const createCircuitBreaker = (config: {
readonly failureThreshold: number
readonly resetTimeout: number
readonly halfOpenRequests: number
}): Effect.Effect<CircuitBreaker, never, Scope> =>
// Ваша реализация
???import { Effect, Fiber, Ref, Deferred, Schedule, Scope, Exit } from "effect"
type CircuitState = "closed" | "open" | "half-open"
interface CircuitBreaker {
readonly execute: <A, E>(effect: Effect.Effect<A, E>) => Effect.Effect<A, E | "CircuitOpen">
readonly getState: Effect.Effect<CircuitState>
readonly getStats: Effect.Effect<{ failures: number; successes: number }>
}
const createCircuitBreaker = (config: {
readonly failureThreshold: number
readonly resetTimeout: number
readonly halfOpenRequests: number
}): Effect.Effect<CircuitBreaker, never, Scope> =>
Effect.gen(function* () {
const state = yield* Ref.make<CircuitState>("closed")
const failures = yield* Ref.make(0)
const successes = yield* Ref.make(0)
const halfOpenAttempts = yield* Ref.make(0)
const resetFiber = yield* Ref.make<Fiber.RuntimeFiber<void, never> | null>(null)
const scheduleReset = Effect.gen(function* () {
// Отменяем предыдущий таймер если есть
const existing = yield* Ref.get(resetFiber)
if (existing) {
yield* Fiber.interruptFork(existing)
}
// Запускаем новый таймер
const fiber = yield* Effect.forkScoped(
Effect.gen(function* () {
yield* Effect.sleep(`${config.resetTimeout} millis`)
yield* Ref.set(state, "half-open")
yield* Ref.set(halfOpenAttempts, 0)
yield* Effect.log("Circuit moved to half-open")
})
)
yield* Ref.set(resetFiber, fiber)
})
const tripBreaker = Effect.gen(function* () {
yield* Ref.set(state, "open")
yield* Effect.log("Circuit OPENED")
yield* scheduleReset
})
const closeBreaker = Effect.gen(function* () {
yield* Ref.set(state, "closed")
yield* Ref.set(failures, 0)
yield* Effect.log("Circuit CLOSED")
})
const execute = <A, E>(effect: Effect.Effect<A, E>) =>
Effect.gen(function* () {
const currentState = yield* Ref.get(state)
switch (currentState) {
case "open":
return yield* Effect.fail("CircuitOpen" as const)
case "half-open": {
const attempts = yield* Ref.updateAndGet(halfOpenAttempts, (n) => n + 1)
if (attempts > config.halfOpenRequests) {
return yield* Effect.fail("CircuitOpen" as const)
}
const exit = yield* Effect.exit(effect)
if (Exit.isSuccess(exit)) {
yield* Ref.update(successes, (n) => n + 1)
// Достаточно успехов — закрываем
if (attempts >= config.halfOpenRequests) {
yield* closeBreaker
}
return exit.value
} else {
yield* Ref.update(failures, (n) => n + 1)
yield* tripBreaker
return yield* Effect.failCause(exit.cause)
}
}
case "closed": {
const exit = yield* Effect.exit(effect)
if (Exit.isSuccess(exit)) {
yield* Ref.update(successes, (n) => n + 1)
yield* Ref.set(failures, 0) // Reset failures on success
return exit.value
} else {
const failureCount = yield* Ref.updateAndGet(failures, (n) => n + 1)
if (failureCount >= config.failureThreshold) {
yield* tripBreaker
}
return yield* Effect.failCause(exit.cause)
}
}
}
})
return {
execute,
getState: Ref.get(state),
getStats: Effect.gen(function* () {
const f = yield* Ref.get(failures)
const s = yield* Ref.get(successes)
return { failures: f, successes: s }
})
}
})
// Тест
const program = Effect.scoped(
Effect.gen(function* () {
const breaker = yield* createCircuitBreaker({
failureThreshold: 3,
resetTimeout: 1000,
halfOpenRequests: 2
})
const unreliableService = Effect.gen(function* () {
if (Math.random() < 0.7) {
return yield* Effect.fail("Service error")
}
return "success"
})
// Тестируем circuit breaker
for (let i = 0; i < 10; i++) {
const state = yield* breaker.getState
const result = yield* breaker.execute(unreliableService).pipe(
Effect.catchAll((e) => Effect.succeed(`Error: ${e}`))
)
console.log(`Attempt ${i + 1}: ${result} (state: ${state})`)
yield* Effect.sleep("200 millis")
}
// Ждём reset
yield* Effect.sleep("1500 millis")
const finalState = yield* breaker.getState
const stats = yield* breaker.getStats
console.log(`Final state: ${finalState}, stats:`, stats)
})
)
Effect.runFork(program)Заключение
Операции с файберами в Effect предоставляют полный контроль над конкурентными вычислениями:
Fiber.join— простой способ получить результатFiber.await— полная информация через ExitFiber.poll— неблокирующий мониторингFiber.interrupt— безопасная отмена с cleanup- Композиция —
zip,orElse,allдля сложных сценариев
В следующей статье мы глубоко погрузимся в механизм прерывания файберов и узнаем, как создавать interruptible и uninterruptible регионы.