Прерывание
Модель асинхронной отмены в Effect.
Модель прерывания в Effect
При разработке конкурентных приложений часто возникает необходимость прервать выполнение файберов. Типичные сценарии:
┌─────────────────────────────────────────────────────────────────┐
│ СЦЕНАРИИ ПРЕРЫВАНИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Родительский файбер решил, что результат дочернего │
│ больше не нужен │
│ │
│ 2. Race — первый завершившийся выигрывает, остальные │
│ прерываются │
│ │
│ 3. Пользователь нажал "Cancel" в UI │
│ │
│ 4. Таймаут — операция превысила допустимое время │
│ │
│ 5. Изменение входных данных — старое вычисление │
│ становится неактуальным │
│ │
└─────────────────────────────────────────────────────────────────┘
Почему наивный подход не работает
Наивный подход — просто “убить” файбер — опасен:
┌─────────────────────────────────────────────────────────────────┐
│ ПРОБЛЕМЫ ПРИНУДИТЕЛЬНОГО ПРЕРЫВАНИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Fiber A Shared State │
│ │ │ │
│ │── lock ───────────────────►│ │
│ │ │ locked = true │
│ │── modify ─────────────────►│ │
│ │ │ partial modification │
│ X KILLED! │ │
│ │ ← INCONSISTENT STATE! │
│ │ ← RESOURCE LEAK! │
│ │
└─────────────────────────────────────────────────────────────────┘
Effect решает эту проблему через асинхронное прерывание с возможностью защиты критических секций.
Polling vs Асинхронное прерывание
Существуют два основных подхода к прерыванию:
Polling (Semi-asynchronous)
Используется в Java и других императивных языках:
// Псевдокод polling подхода
while (!interrupted && hasWork) {
doSomeWork()
// Программист должен явно проверять флаг
if (checkInterruptFlag()) {
cleanup()
return
}
}
Проблемы polling:
- Программист может забыть проверить флаг
- Файбер становится неотзывчивым между проверками
- Не соответствует функциональной парадигме
Асинхронное прерывание (Effect)
Effect использует асинхронное прерывание:
// Effect подход
const program = Effect.gen(function* () {
// Этот код можно прервать в любой точке yield*
const data = yield* fetchData()
// Критическая секция — защищена от прерывания
yield* Effect.uninterruptible(
Effect.gen(function* () {
yield* saveToDatabase(data)
yield* sendNotification()
})
)
})
Преимущества:
- Не требует явных проверок
- Критические секции защищены декларативно
- Полностью совместимо с ФП
Effect.interrupt — Самопрерывание
Effect.interrupt позволяет файберу прервать самого себя.
Сигнатура
declare const interrupt: Effect<never, never, never>
Базовый пример
const program = Effect.gen(function* () {
console.log("start")
yield* Effect.sleep("2 seconds")
yield* Effect.interrupt // Самопрерывание
console.log("done") // Никогда не выполнится
})
Effect.runPromiseExit(program).then(console.log)
/*
Output:
start
{
_id: 'Exit',
_tag: 'Failure',
cause: {
_id: 'Cause',
_tag: 'Interrupt',
fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... }
}
}
*/
Без прерывания (для сравнения)
const programWithoutInterrupt = Effect.gen(function* () {
console.log("start")
yield* Effect.sleep("100 millis")
console.log("done")
})
Effect.runPromiseExit(programWithoutInterrupt).then(console.log)
/*
Output:
start
done
{ _id: 'Exit', _tag: 'Success', value: undefined }
*/
Условное прерывание
const conditionalInterrupt = (shouldStop: boolean) =>
Effect.gen(function* () {
yield* Effect.log("Processing...")
if (shouldStop) {
yield* Effect.log("Stopping early")
yield* Effect.interrupt
}
yield* Effect.log("Completed")
return "result"
})
const program = Effect.gen(function* () {
// Первый вызов — прервётся
const exit1 = yield* Effect.exit(conditionalInterrupt(true))
console.log("Exit 1:", exit1._tag)
// Второй вызов — завершится успешно
const exit2 = yield* Effect.exit(conditionalInterrupt(false))
console.log("Exit 2:", exit2._tag)
})
Effect.runFork(program)
/*
Output:
timestamp=... level=INFO fiber=#0 message=Processing...
timestamp=... level=INFO fiber=#0 message="Stopping early"
Exit 1: Failure
timestamp=... level=INFO fiber=#0 message=Processing...
timestamp=... level=INFO fiber=#0 message=Completed
Exit 2: Success
*/
Interruptible и Uninterruptible регионы
Effect позволяет декларативно контролировать, где файбер можно прервать.
Effect.uninterruptible — Защита от прерывания
const criticalSection = Effect.gen(function* () {
yield* Effect.log("Starting critical work...")
yield* Effect.sleep("1 second")
yield* Effect.log("Critical work completed!")
return "result"
})
const program = Effect.gen(function* () {
// Fork защищённого эффекта
const fiber = yield* Effect.fork(
Effect.uninterruptible(criticalSection)
)
// Пытаемся прервать через 200ms
yield* Effect.sleep("200 millis")
yield* Effect.log("Attempting to interrupt...")
// Прерывание будет ждать завершения uninterruptible секции
const exit = yield* Fiber.interrupt(fiber)
yield* Effect.log(`Fiber completed with: ${exit._tag}`)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Starting critical work..."
timestamp=... message="Attempting to interrupt..."
timestamp=... message="Critical work completed!"
timestamp=... message="Fiber completed with: Failure"
*/
Обратите внимание: прерывание ждёт завершения uninterruptible секции.
Effect.interruptible — Восстановление прерываемости
Внутри uninterruptible региона можно создать interruptible “окно”:
const mixedRegions = Effect.uninterruptible(
Effect.gen(function* () {
yield* Effect.log("Phase 1: uninterruptible")
yield* Effect.sleep("100 millis")
// Interruptible окно внутри uninterruptible
yield* Effect.interruptible(
Effect.gen(function* () {
yield* Effect.log("Phase 2: interruptible window")
yield* Effect.sleep("500 millis") // Здесь можно прервать
yield* Effect.log("Phase 2 completed")
})
)
yield* Effect.log("Phase 3: uninterruptible again")
yield* Effect.sleep("100 millis")
yield* Effect.log("All phases completed")
})
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(mixedRegions)
yield* Effect.sleep("250 millis")
yield* Effect.log("Sending interrupt...")
const exit = yield* Fiber.interrupt(fiber)
yield* Effect.log(`Result: ${exit._tag}`)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Phase 1: uninterruptible"
timestamp=... message="Phase 2: interruptible window"
timestamp=... message="Sending interrupt..."
timestamp=... message="Result: Failure"
Phase 2 прервана, Phase 3 не выполнилась
*/
Диаграмма регионов прерываемости
┌─────────────────────────────────────────────────────────────────┐
│ РЕГИОНЫ ПРЕРЫВАЕМОСТИ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ По умолчанию: INTERRUPTIBLE │
│ ═══════════════════════════════════════════════════════════ │
│ │ │
│ │ Effect.uninterruptible(...) │
│ │ ╔═══════════════════════════════════════════════════════╗ │
│ │ ║ UNINTERRUPTIBLE ║ │
│ │ ║ ║ │
│ │ ║ Effect.interruptible(...) ║ │
│ │ ║ ┌───────────────────────────────────────────────┐ ║ │
│ │ ║ │ INTERRUPTIBLE (вложенное окно) │ ║ │
│ │ ║ └───────────────────────────────────────────────┘ ║ │
│ │ ║ ║ │
│ │ ╚═══════════════════════════════════════════════════════╝ │
│ │ │
│ ═══════════════════════════════════════════════════════════ │
│ │
└─────────────────────────────────────────────────────────────────┘
Effect.uninterruptibleMask — Гибкий контроль
uninterruptibleMask позволяет создавать interruptible окна через restore функцию:
const withMask = Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
yield* Effect.log("Setup: uninterruptible")
yield* Effect.sleep("100 millis")
// restore делает секцию interruptible
yield* restore(
Effect.gen(function* () {
yield* Effect.log("Main work: interruptible")
yield* Effect.sleep("500 millis")
yield* Effect.log("Main work done")
})
)
yield* Effect.log("Cleanup: uninterruptible")
yield* Effect.sleep("100 millis")
yield* Effect.log("All done")
})
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(withMask)
yield* Effect.sleep("300 millis")
const exit = yield* Fiber.interrupt(fiber)
yield* Effect.log(`Exit: ${exit._tag}`)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Setup: uninterruptible"
timestamp=... message="Main work: interruptible"
timestamp=... message="Exit: Failure"
Setup выполнен, main work прерван, cleanup НЕ выполнен
*/
Паттерн: Acquire-Use-Release с прерыванием
const acquireUseRelease = <R, A>(
acquire: Effect.Effect<R>,
use: (resource: R) => Effect.Effect<A>,
release: (resource: R) => Effect.Effect<void>
) =>
Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
// Acquire: uninterruptible
const resource = yield* acquire
// Use: interruptible (через restore)
const result = yield* restore(use(resource)).pipe(
Effect.onExit(() => release(resource))
)
return result
})
)
// Пример использования
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
acquireUseRelease(
Effect.sync(() => {
console.log("Acquiring resource...")
return { id: 1 }
}),
(resource) =>
Effect.gen(function* () {
console.log(`Using resource ${resource.id}...`)
yield* Effect.sleep("1 second")
console.log("Use completed")
return "result"
}),
(resource) =>
Effect.sync(() => console.log(`Releasing resource ${resource.id}`))
)
)
yield* Effect.sleep("200 millis")
yield* Fiber.interrupt(fiber)
})
Effect.runFork(program)
/*
Output:
Acquiring resource...
Using resource 1...
Releasing resource 1
*/
Effect.disconnect — Отсоединение от прерывания
Effect.disconnect позволяет uninterruptible эффекту продолжить выполнение в фоне, пока основной поток управления продолжает работу.
Проблема без disconnect
const uninterruptibleTask = Effect.gen(function* () {
console.log("Start processing...")
yield* Effect.sleep("2 seconds")
console.log("Processing complete.")
return "Result"
})
const withoutDisconnect = Effect.gen(function* () {
const result = yield* uninterruptibleTask.pipe(
Effect.uninterruptible,
Effect.timeout("1 second")
)
return result
})
Effect.runPromiseExit(withoutDisconnect).then(console.log)
/*
Output:
Start processing...
Processing complete. ← Ждём 2 секунды несмотря на timeout!
{
_id: 'Exit',
_tag: 'Failure',
cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
*/
Проблема: timeout срабатывает только ПОСЛЕ завершения uninterruptible эффекта.
Решение с disconnect
const withDisconnect = Effect.gen(function* () {
const result = yield* uninterruptibleTask.pipe(
Effect.uninterruptible,
Effect.disconnect, // Отсоединяем от родительского fiber
Effect.timeout("1 second")
)
return result
})
Effect.runPromiseExit(withDisconnect).then(console.log)
/*
Output:
Start processing...
{
_id: 'Exit',
_tag: 'Failure',
cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
Processing complete. ← Выполняется в фоне!
*/
Диаграмма disconnect
БЕЗ disconnect:
─────────────────────────────────────────────────────────────►
Main fiber [===== uninterruptible (2s) =====]
│
Timeout (1s) ─────────────────────────────────────► (ждёт!)
С disconnect:
─────────────────────────────────────────────────────────────►
Main fiber [─────────] ← timeout срабатывает
│
│ disconnect
▼
Background [===== uninterruptible (2s) =====]
(продолжает независимо)
Практический пример: Graceful shutdown
const gracefulShutdown = (
saveState: Effect.Effect<void>,
timeout: number
) =>
Effect.gen(function* () {
yield* Effect.log("Starting graceful shutdown...")
// Пытаемся сохранить состояние с таймаутом
// Если не успеваем — продолжаем в фоне
const result = yield* saveState.pipe(
Effect.uninterruptible,
Effect.disconnect,
Effect.timeout(`${timeout} millis`),
Effect.option
)
if (result._tag === "None") {
yield* Effect.log("Save timed out, continuing in background")
} else {
yield* Effect.log("State saved successfully")
}
yield* Effect.log("Shutdown complete")
})
const program = gracefulShutdown(
Effect.gen(function* () {
yield* Effect.log("Saving state...")
yield* Effect.sleep("2 seconds")
yield* Effect.log("State saved!")
}),
500
)
Effect.runFork(program)
/*
Output:
timestamp=... message="Starting graceful shutdown..."
timestamp=... message="Saving state..."
timestamp=... message="Save timed out, continuing in background"
timestamp=... message="Shutdown complete"
timestamp=... message="State saved!" ← В фоне
*/
Finalizers и прерывание
Finalizers гарантированно выполняются даже при прерывании.
Effect.ensuring — Гарантированный finalizer
const withFinalizer = Effect.gen(function* () {
yield* Effect.log("Starting work...")
yield* Effect.sleep("1 second")
yield* Effect.log("Work completed")
return "result"
}).pipe(
Effect.ensuring(
Effect.log("Finalizer: cleanup executed!")
)
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(withFinalizer)
yield* Effect.sleep("200 millis")
yield* Effect.log("Interrupting...")
const exit = yield* Fiber.interrupt(fiber)
yield* Effect.log(`Exit: ${exit._tag}`)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Starting work..."
timestamp=... message="Interrupting..."
timestamp=... message="Finalizer: cleanup executed!"
timestamp=... message="Exit: Failure"
*/
Effect.onInterrupt — Обработка прерывания
const withInterruptHandler = Effect.gen(function* () {
yield* Effect.log("Working...")
yield* Effect.sleep("1 second")
return "done"
}).pipe(
Effect.onInterrupt((interruptors) =>
Effect.gen(function* () {
const ids = [...interruptors].map(FiberId.threadName).join(", ")
yield* Effect.log(`Interrupted by: ${ids}`)
yield* Effect.log("Performing interrupt-specific cleanup...")
})
)
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(withInterruptHandler)
yield* Effect.sleep("200 millis")
yield* Fiber.interrupt(fiber)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Working..."
timestamp=... message="Interrupted by: #0"
timestamp=... message="Performing interrupt-specific cleanup..."
*/
Effect.onExit — Обработка любого завершения
const withExitHandler = Effect.gen(function* () {
yield* Effect.log("Working...")
yield* Effect.sleep("500 millis")
return "result"
}).pipe(
Effect.onExit((exit) =>
Effect.gen(function* () {
if (Exit.isSuccess(exit)) {
yield* Effect.log(`Success: ${exit.value}`)
} else if (Exit.isInterrupted(exit)) {
yield* Effect.log("Was interrupted")
} else {
yield* Effect.log("Failed with error")
}
})
)
)
// Тест успеха
Effect.runFork(withExitHandler)
// Тест прерывания
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(withExitHandler)
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(fiber)
})
Effect.runFork(program)
Порядок выполнения finalizers
Finalizers выполняются в обратном порядке добавления (LIFO):
const program = Effect.gen(function* () {
yield* Effect.addFinalizer(() => Effect.log("Finalizer 1"))
yield* Effect.addFinalizer(() => Effect.log("Finalizer 2"))
yield* Effect.addFinalizer(() => Effect.log("Finalizer 3"))
yield* Effect.log("Working...")
yield* Effect.sleep("1 second")
})
const main = Effect.scoped(program)
Effect.runFork(
Effect.gen(function* () {
const fiber = yield* Effect.fork(main)
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(fiber)
})
)
/*
Output:
timestamp=... message="Working..."
timestamp=... message="Finalizer 3"
timestamp=... message="Finalizer 2"
timestamp=... message="Finalizer 1"
*/
Таймауты и прерывание
Таймауты тесно связаны с прерыванием.
Effect.timeout — Базовый таймаут
const slowTask = Effect.gen(function* () {
yield* Effect.log("Starting slow task...")
yield* Effect.sleep("5 seconds")
yield* Effect.log("Slow task completed")
return "result"
})
const withTimeout = slowTask.pipe(
Effect.timeout("1 second")
)
Effect.runPromiseExit(withTimeout).then(console.log)
/*
Output:
timestamp=... message="Starting slow task..."
{
_id: 'Exit',
_tag: 'Failure',
cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
*/
Interruptible vs Uninterruptible при таймауте
// Interruptible — прерывается немедленно
const interruptibleTimeout = Effect.gen(function* () {
console.log("Start")
yield* Effect.sleep("2 seconds")
console.log("End")
return "Result"
}).pipe(
Effect.timeout("1 second")
)
// Uninterruptible — таймаут ждёт завершения
const uninterruptibleTimeout = Effect.gen(function* () {
console.log("Start")
yield* Effect.sleep("2 seconds")
console.log("End")
return "Result"
}).pipe(
Effect.uninterruptible,
Effect.timeout("1 second")
)
// Interruptible
console.log("=== Interruptible ===")
await Effect.runPromiseExit(interruptibleTimeout)
// Output: Start, затем сразу timeout
console.log("\n=== Uninterruptible ===")
await Effect.runPromiseExit(uninterruptibleTimeout)
// Output: Start, End (ждём 2 сек), затем timeout
Effect.timeoutTo — Кастомное поведение при таймауте
const withCustomTimeout = Effect.gen(function* () {
yield* Effect.sleep("2 seconds")
return "completed"
}).pipe(
Effect.timeoutTo({
duration: "500 millis",
onTimeout: () => "timed out" as const
})
)
Effect.runPromise(withCustomTimeout).then(console.log)
// Output: timed out
Effect.timeoutFail — Кастомная ошибка
class CustomTimeoutError {
readonly _tag = "CustomTimeoutError"
constructor(readonly message: string) {}
}
const withCustomError = Effect.gen(function* () {
yield* Effect.sleep("2 seconds")
return "result"
}).pipe(
Effect.timeoutFail({
duration: "500 millis",
onTimeout: () => new CustomTimeoutError("Operation took too long")
})
)
Effect.runPromiseExit(withCustomError).then(console.log)
/*
{
_id: 'Exit',
_tag: 'Failure',
cause: {
_tag: 'Fail',
failure: CustomTimeoutError { _tag: 'CustomTimeoutError', message: 'Operation took too long' }
}
}
*/
API Reference
Прерывание
| Функция | Сигнатура | Описание |
|---|---|---|
Effect.interrupt | Effect<never> | Прервать текущий файбер |
Effect.interruptWith | (FiberId) => Effect<never> | Прервать с указанным ID |
Fiber.interrupt | Fiber<A, E> => Effect<Exit<A, E>> | Прервать файбер |
Fiber.interruptFork | Fiber<A, E> => Effect<void> | Прервать без ожидания |
Регионы прерываемости
| Функция | Описание |
|---|---|
Effect.interruptible | Сделать эффект прерываемым |
Effect.uninterruptible | Защитить от прерывания |
Effect.uninterruptibleMask | Гибкий контроль с restore |
Effect.disconnect | Отсоединить от родительского fiber |
Finalizers
| Функция | Описание |
|---|---|
Effect.ensuring | Гарантированный finalizer |
Effect.onInterrupt | Только при прерывании |
Effect.onExit | При любом завершении |
Effect.addFinalizer | Добавить в Scope |
Таймауты
| Функция | Описание |
|---|---|
Effect.timeout | Таймаут с TimeoutException |
Effect.timeoutTo | Таймаут с fallback значением |
Effect.timeoutFail | Таймаут с кастомной ошибкой |
Effect.timeoutFailCause | Таймаут с кастомным Cause |
Примеры
Пример 1: HTTP клиент с отменой
const httpRequest = (url: string, signal?: AbortSignal) =>
Effect.tryPromise({
try: () => fetch(url, { signal }).then((r) => r.json()),
catch: (error) => new Error(`Fetch failed: ${error}`)
})
const cancellableRequest = (url: string) =>
Effect.asyncInterrupt<unknown, Error>((resume) => {
const controller = new AbortController()
fetch(url, { signal: controller.signal })
.then((r) => r.json())
.then((data) => resume(Effect.succeed(data)))
.catch((error) => {
if (error.name !== "AbortError") {
resume(Effect.fail(new Error(`Fetch failed: ${error}`)))
}
})
// Возвращаем Effect для отмены
return Effect.sync(() => {
console.log("Aborting request...")
controller.abort()
})
})
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
cancellableRequest("https://api.example.com/slow")
)
yield* Effect.sleep("100 millis")
yield* Effect.log("Cancelling request...")
yield* Fiber.interrupt(fiber)
yield* Effect.log("Request cancelled")
})
Пример 2: Транзакция с гарантированным rollback
interface Transaction {
readonly id: string
readonly operations: ReadonlyArray<string>
}
const executeTransaction = (tx: Transaction) =>
Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
const committed = yield* Ref.make<ReadonlyArray<string>>([])
yield* Effect.log(`Starting transaction ${tx.id}`)
// Выполняем операции (interruptible)
const result = yield* restore(
Effect.forEach(
tx.operations,
(op) =>
Effect.gen(function* () {
yield* Effect.log(`Executing: ${op}`)
yield* Effect.sleep("200 millis")
yield* Ref.update(committed, (ops) => [...ops, op])
return op
})
)
).pipe(
// При ошибке или прерывании — rollback
Effect.onError(() =>
Effect.gen(function* () {
const ops = yield* Ref.get(committed)
yield* Effect.log(`Rolling back ${ops.length} operations...`)
// Rollback в обратном порядке
yield* Effect.forEach(
ops.reverse(),
(op) => Effect.log(`Rollback: ${op}`),
{ discard: true }
)
})
)
)
yield* Effect.log(`Transaction ${tx.id} committed`)
return result
})
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
executeTransaction({
id: "tx-001",
operations: ["insert user", "update balance", "send notification"]
})
)
yield* Effect.sleep("350 millis")
yield* Effect.log("Interrupting transaction...")
yield* Fiber.interrupt(fiber)
})
Effect.runFork(program)
/*
Output:
timestamp=... message="Starting transaction tx-001"
timestamp=... message="Executing: insert user"
timestamp=... message="Executing: update balance"
timestamp=... message="Interrupting transaction..."
timestamp=... message="Rolling back 2 operations..."
timestamp=... message="Rollback: update balance"
timestamp=... message="Rollback: insert user"
*/
Пример 3: Graceful shutdown сервера
interface Server {
readonly activeRequests: Ref.Ref<number>
readonly shutdown: Deferred.Deferred<void, never>
}
const createServer = Effect.gen(function* () {
const activeRequests = yield* Ref.make(0)
const shutdown = yield* Deferred.make<void, never>()
return { activeRequests, shutdown } satisfies Server
})
const handleRequest = (server: Server, requestId: number) =>
Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
// Увеличиваем счётчик (uninterruptible)
yield* Ref.update(server.activeRequests, (n) => n + 1)
yield* Effect.log(`Request ${requestId}: started`)
// Обработка запроса (interruptible)
yield* restore(
Effect.gen(function* () {
yield* Effect.sleep(`${100 + Math.random() * 400} millis`)
})
).pipe(
Effect.onExit(() =>
Effect.gen(function* () {
yield* Ref.update(server.activeRequests, (n) => n - 1)
yield* Effect.log(`Request ${requestId}: completed`)
})
)
)
})
)
const gracefulShutdown = (server: Server, timeout: number) =>
Effect.gen(function* () {
yield* Effect.log("Initiating graceful shutdown...")
yield* Deferred.succeed(server.shutdown, undefined)
// Ждём завершения активных запросов
yield* Effect.gen(function* () {
while (true) {
const active = yield* Ref.get(server.activeRequests)
if (active === 0) {
yield* Effect.log("All requests completed")
return
}
yield* Effect.log(`Waiting for ${active} requests...`)
yield* Effect.sleep("100 millis")
}
}).pipe(
Effect.timeout(`${timeout} millis`),
Effect.catchAll(() => Effect.log("Shutdown timeout, forcing..."))
)
yield* Effect.log("Server shutdown complete")
})
const program = Effect.gen(function* () {
const server = yield* createServer
// Запускаем несколько запросов
const requestFibers = yield* Effect.forEach(
[1, 2, 3, 4, 5],
(id) => Effect.fork(handleRequest(server, id))
)
// Через 200ms инициируем shutdown
yield* Effect.sleep("200 millis")
yield* gracefulShutdown(server, 2000)
})
Effect.runFork(program)
Упражнения
Упражнение 1: Защита критической секции
Создайте функцию, которая защищает эффект от прерывания.
import { Effect, Fiber } from "effect"
const protect = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
// Ваша реализация
???
// Тест
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
protect(
Effect.gen(function* () {
yield* Effect.log("Start")
yield* Effect.sleep("500 millis")
yield* Effect.log("End")
})
)
)
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(fiber)
yield* Effect.log("After interrupt")
})
// Ожидается: Start, End, After interruptimport { Effect, Fiber } from "effect"
const protect = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
Effect.uninterruptible(effect)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
protect(
Effect.gen(function* () {
yield* Effect.log("Start")
yield* Effect.sleep("500 millis")
yield* Effect.log("End")
})
)
)
yield* Effect.sleep("100 millis")
yield* Fiber.interrupt(fiber)
yield* Effect.log("After interrupt")
})
Effect.runFork(program)Упражнение 2: Таймаут с fallback
Создайте функцию с таймаутом и значением по умолчанию.
import { Effect } from "effect"
const withTimeoutDefault = <A, E, R>(
effect: Effect.Effect<A, E, R>,
timeout: number,
defaultValue: A
): Effect.Effect<A, E, R> =>
// Ваша реализация
???import { Effect } from "effect"
const withTimeoutDefault = <A, E, R>(
effect: Effect.Effect<A, E, R>,
timeout: number,
defaultValue: A
): Effect.Effect<A, E, R> =>
effect.pipe(
Effect.timeoutTo({
duration: `${timeout} millis`,
onTimeout: () => defaultValue
})
)
// Тест
const program = Effect.gen(function* () {
const fast = yield* withTimeoutDefault(
Effect.sleep("100 millis").pipe(Effect.as("fast")),
500,
"default"
)
console.log("Fast:", fast)
const slow = yield* withTimeoutDefault(
Effect.sleep("1 second").pipe(Effect.as("slow")),
200,
"default"
)
console.log("Slow:", slow)
})
Effect.runFork(program)
// Output: Fast: fast, Slow: defaultУпражнение 3: Acquire-Use-Release с прерыванием
Реализуйте паттерн безопасного управления ресурсами.
import { Effect } from "effect"
const bracket = <R, A, E, B>(
acquire: Effect.Effect<R>,
use: (resource: R) => Effect.Effect<A, E>,
release: (resource: R) => Effect.Effect<B>
): Effect.Effect<A, E> =>
// Ваша реализация:
// 1. acquire — uninterruptible
// 2. use — interruptible
// 3. release — uninterruptible, всегда выполняется
???import { Effect, Fiber } from "effect"
const bracket = <R, A, E, B>(
acquire: Effect.Effect<R>,
use: (resource: R) => Effect.Effect<A, E>,
release: (resource: R) => Effect.Effect<B>
): Effect.Effect<A, E> =>
Effect.uninterruptibleMask((restore) =>
Effect.gen(function* () {
// Acquire: uninterruptible
const resource = yield* acquire
// Use: interruptible, с гарантированным release
return yield* restore(use(resource)).pipe(
Effect.ensuring(release(resource))
)
})
)
// Тест
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(
bracket(
Effect.sync(() => {
console.log("Acquiring...")
return { handle: 42 }
}),
(resource) =>
Effect.gen(function* () {
console.log(`Using resource ${resource.handle}...`)
yield* Effect.sleep("1 second")
console.log("Use completed")
return "result"
}),
(resource) =>
Effect.sync(() => console.log(`Releasing resource ${resource.handle}`))
)
)
yield* Effect.sleep("200 millis")
console.log("Interrupting...")
yield* Fiber.interrupt(fiber)
console.log("Done")
})
Effect.runFork(program)
/*
Output:
Acquiring...
Using resource 42...
Interrupting...
Releasing resource 42
Done
*/Упражнение 4: Cancellation token pattern
Реализуйте паттерн отмены через токен.
import { Effect, Fiber, Deferred, Ref } from "effect"
interface CancellationToken {
readonly isCancelled: Effect.Effect<boolean>
readonly cancel: Effect.Effect<void>
readonly throwIfCancelled: Effect.Effect<void, "Cancelled">
}
const createCancellationToken = (): Effect.Effect<CancellationToken> =>
// Ваша реализация
???
const withCancellation = <A, E, R>(
token: CancellationToken,
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E | "Cancelled", R> =>
// Ваша реализация
???import { Effect, Fiber, Deferred, Ref } from "effect"
interface CancellationToken {
readonly isCancelled: Effect.Effect<boolean>
readonly cancel: Effect.Effect<void>
readonly throwIfCancelled: Effect.Effect<void, "Cancelled">
}
const createCancellationToken = (): Effect.Effect<CancellationToken> =>
Effect.gen(function* () {
const cancelled = yield* Ref.make(false)
return {
isCancelled: Ref.get(cancelled),
cancel: Ref.set(cancelled, true),
throwIfCancelled: Effect.gen(function* () {
const isCancelled = yield* Ref.get(cancelled)
if (isCancelled) {
return yield* Effect.fail("Cancelled" as const)
}
})
}
})
const withCancellation = <A, E, R>(
token: CancellationToken,
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E | "Cancelled", R> =>
Effect.gen(function* () {
// Проверяем перед началом
yield* token.throwIfCancelled
// Выполняем с периодической проверкой
const fiber = yield* Effect.fork(effect)
// Мониторим отмену
const monitorFiber = yield* Effect.fork(
Effect.gen(function* () {
while (true) {
yield* Effect.sleep("50 millis")
const cancelled = yield* token.isCancelled
if (cancelled) {
yield* Fiber.interrupt(fiber)
return
}
}
})
)
// Ждём результат
const exit = yield* Fiber.await(fiber)
yield* Fiber.interrupt(monitorFiber)
// Проверяем после завершения
const cancelled = yield* token.isCancelled
if (cancelled) {
return yield* Effect.fail("Cancelled" as const)
}
if (exit._tag === "Success") {
return exit.value
} else {
return yield* Effect.failCause(exit.cause)
}
})
// Тест
const program = Effect.gen(function* () {
const token = yield* createCancellationToken()
const task = withCancellation(
token,
Effect.gen(function* () {
yield* Effect.log("Working...")
yield* Effect.sleep("1 second")
yield* Effect.log("Done!")
return "result"
})
)
const fiber = yield* Effect.fork(task)
yield* Effect.sleep("200 millis")
yield* Effect.log("Cancelling...")
yield* token.cancel
const exit = yield* Fiber.await(fiber)
yield* Effect.log(`Exit: ${exit._tag}`)
})
Effect.runFork(program)Заключение
Модель прерывания в Effect обеспечивает:
- Безопасную отмену — finalizers всегда выполняются
- Гибкий контроль — interruptible/uninterruptible регионы
- Композицию — uninterruptibleMask для сложных сценариев
- Disconnect — независимое выполнение в фоне
В следующей статье мы изучим FiberRef — механизм локального хранения данных для файберов.