Effect Курс Прерывание

Прерывание

Модель асинхронной отмены в Effect.

Модель прерывания в Effect

При разработке конкурентных приложений часто возникает необходимость прервать выполнение файберов. Типичные сценарии:

┌─────────────────────────────────────────────────────────────────┐
│              СЦЕНАРИИ ПРЕРЫВАНИЯ                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Родительский файбер решил, что результат дочернего          │
│     больше не нужен                                             │
│                                                                 │
│  2. Race — первый завершившийся выигрывает, остальные           │
│     прерываются                                                 │
│                                                                 │
│  3. Пользователь нажал "Cancel" в UI                            │
│                                                                 │
│  4. Таймаут — операция превысила допустимое время               │
│                                                                 │
│  5. Изменение входных данных — старое вычисление                │
│     становится неактуальным                                     │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Почему наивный подход не работает

Наивный подход — просто “убить” файбер — опасен:

┌─────────────────────────────────────────────────────────────────┐
│           ПРОБЛЕМЫ ПРИНУДИТЕЛЬНОГО ПРЕРЫВАНИЯ                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Fiber A                     Shared State                       │
│     │                            │                              │
│     │── lock ───────────────────►│                              │
│     │                            │ locked = true                │
│     │── modify ─────────────────►│                              │
│     │                            │ partial modification         │
│     X  KILLED!                   │                              │
│                                  │ ← INCONSISTENT STATE!        │
│                                  │ ← RESOURCE LEAK!             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Effect решает эту проблему через асинхронное прерывание с возможностью защиты критических секций.


Polling vs Асинхронное прерывание

Существуют два основных подхода к прерыванию:

Polling (Semi-asynchronous)

Используется в Java и других императивных языках:

// Псевдокод polling подхода
while (!interrupted && hasWork) {
  doSomeWork()
  
  // Программист должен явно проверять флаг
  if (checkInterruptFlag()) {
    cleanup()
    return
  }
}

Проблемы polling:

  • Программист может забыть проверить флаг
  • Файбер становится неотзывчивым между проверками
  • Не соответствует функциональной парадигме

Асинхронное прерывание (Effect)

Effect использует асинхронное прерывание:

// Effect подход
const program = Effect.gen(function* () {
  // Этот код можно прервать в любой точке yield*
  const data = yield* fetchData()
  
  // Критическая секция — защищена от прерывания
  yield* Effect.uninterruptible(
    Effect.gen(function* () {
      yield* saveToDatabase(data)
      yield* sendNotification()
    })
  )
})

Преимущества:

  • Не требует явных проверок
  • Критические секции защищены декларативно
  • Полностью совместимо с ФП

Effect.interrupt — Самопрерывание

Effect.interrupt позволяет файберу прервать самого себя.

Сигнатура

declare const interrupt: Effect<never, never, never>

Базовый пример


const program = Effect.gen(function* () {
  console.log("start")
  yield* Effect.sleep("2 seconds")
  yield* Effect.interrupt  // Самопрерывание
  console.log("done")  // Никогда не выполнится
})

Effect.runPromiseExit(program).then(console.log)
/*
Output:
start
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: {
    _id: 'Cause',
    _tag: 'Interrupt',
    fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... }
  }
}
*/

Без прерывания (для сравнения)


const programWithoutInterrupt = Effect.gen(function* () {
  console.log("start")
  yield* Effect.sleep("100 millis")
  console.log("done")
})

Effect.runPromiseExit(programWithoutInterrupt).then(console.log)
/*
Output:
start
done
{ _id: 'Exit', _tag: 'Success', value: undefined }
*/

Условное прерывание


const conditionalInterrupt = (shouldStop: boolean) =>
  Effect.gen(function* () {
    yield* Effect.log("Processing...")
    
    if (shouldStop) {
      yield* Effect.log("Stopping early")
      yield* Effect.interrupt
    }
    
    yield* Effect.log("Completed")
    return "result"
  })

const program = Effect.gen(function* () {
  // Первый вызов — прервётся
  const exit1 = yield* Effect.exit(conditionalInterrupt(true))
  console.log("Exit 1:", exit1._tag)
  
  // Второй вызов — завершится успешно
  const exit2 = yield* Effect.exit(conditionalInterrupt(false))
  console.log("Exit 2:", exit2._tag)
})

Effect.runFork(program)
/*
Output:
timestamp=... level=INFO fiber=#0 message=Processing...
timestamp=... level=INFO fiber=#0 message="Stopping early"
Exit 1: Failure
timestamp=... level=INFO fiber=#0 message=Processing...
timestamp=... level=INFO fiber=#0 message=Completed
Exit 2: Success
*/

Interruptible и Uninterruptible регионы

Effect позволяет декларативно контролировать, где файбер можно прервать.

Effect.uninterruptible — Защита от прерывания


const criticalSection = Effect.gen(function* () {
  yield* Effect.log("Starting critical work...")
  yield* Effect.sleep("1 second")
  yield* Effect.log("Critical work completed!")
  return "result"
})

const program = Effect.gen(function* () {
  // Fork защищённого эффекта
  const fiber = yield* Effect.fork(
    Effect.uninterruptible(criticalSection)
  )
  
  // Пытаемся прервать через 200ms
  yield* Effect.sleep("200 millis")
  yield* Effect.log("Attempting to interrupt...")
  
  // Прерывание будет ждать завершения uninterruptible секции
  const exit = yield* Fiber.interrupt(fiber)
  yield* Effect.log(`Fiber completed with: ${exit._tag}`)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Starting critical work..."
timestamp=... message="Attempting to interrupt..."
timestamp=... message="Critical work completed!"
timestamp=... message="Fiber completed with: Failure"
*/

Обратите внимание: прерывание ждёт завершения uninterruptible секции.

Effect.interruptible — Восстановление прерываемости

Внутри uninterruptible региона можно создать interruptible “окно”:


const mixedRegions = Effect.uninterruptible(
  Effect.gen(function* () {
    yield* Effect.log("Phase 1: uninterruptible")
    yield* Effect.sleep("100 millis")
    
    // Interruptible окно внутри uninterruptible
    yield* Effect.interruptible(
      Effect.gen(function* () {
        yield* Effect.log("Phase 2: interruptible window")
        yield* Effect.sleep("500 millis")  // Здесь можно прервать
        yield* Effect.log("Phase 2 completed")
      })
    )
    
    yield* Effect.log("Phase 3: uninterruptible again")
    yield* Effect.sleep("100 millis")
    yield* Effect.log("All phases completed")
  })
)

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(mixedRegions)
  
  yield* Effect.sleep("250 millis")
  yield* Effect.log("Sending interrupt...")
  
  const exit = yield* Fiber.interrupt(fiber)
  yield* Effect.log(`Result: ${exit._tag}`)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Phase 1: uninterruptible"
timestamp=... message="Phase 2: interruptible window"
timestamp=... message="Sending interrupt..."
timestamp=... message="Result: Failure"

Phase 2 прервана, Phase 3 не выполнилась
*/

Диаграмма регионов прерываемости

┌─────────────────────────────────────────────────────────────────┐
│                    РЕГИОНЫ ПРЕРЫВАЕМОСТИ                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│   По умолчанию: INTERRUPTIBLE                                   │
│   ═══════════════════════════════════════════════════════════   │
│   │                                                             │
│   │  Effect.uninterruptible(...)                                │
│   │  ╔═══════════════════════════════════════════════════════╗  │
│   │  ║  UNINTERRUPTIBLE                                      ║  │
│   │  ║                                                       ║  │
│   │  ║  Effect.interruptible(...)                            ║  │
│   │  ║  ┌───────────────────────────────────────────────┐    ║  │
│   │  ║  │  INTERRUPTIBLE (вложенное окно)               │    ║  │
│   │  ║  └───────────────────────────────────────────────┘    ║  │
│   │  ║                                                       ║  │
│   │  ╚═══════════════════════════════════════════════════════╝  │
│   │                                                             │
│   ═══════════════════════════════════════════════════════════   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Effect.uninterruptibleMask — Гибкий контроль

uninterruptibleMask позволяет создавать interruptible окна через restore функцию:


const withMask = Effect.uninterruptibleMask((restore) =>
  Effect.gen(function* () {
    yield* Effect.log("Setup: uninterruptible")
    yield* Effect.sleep("100 millis")
    
    // restore делает секцию interruptible
    yield* restore(
      Effect.gen(function* () {
        yield* Effect.log("Main work: interruptible")
        yield* Effect.sleep("500 millis")
        yield* Effect.log("Main work done")
      })
    )
    
    yield* Effect.log("Cleanup: uninterruptible")
    yield* Effect.sleep("100 millis")
    yield* Effect.log("All done")
  })
)

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(withMask)
  
  yield* Effect.sleep("300 millis")
  const exit = yield* Fiber.interrupt(fiber)
  
  yield* Effect.log(`Exit: ${exit._tag}`)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Setup: uninterruptible"
timestamp=... message="Main work: interruptible"
timestamp=... message="Exit: Failure"

Setup выполнен, main work прерван, cleanup НЕ выполнен
*/

Паттерн: Acquire-Use-Release с прерыванием


const acquireUseRelease = <R, A>(
  acquire: Effect.Effect<R>,
  use: (resource: R) => Effect.Effect<A>,
  release: (resource: R) => Effect.Effect<void>
) =>
  Effect.uninterruptibleMask((restore) =>
    Effect.gen(function* () {
      // Acquire: uninterruptible
      const resource = yield* acquire
      
      // Use: interruptible (через restore)
      const result = yield* restore(use(resource)).pipe(
        Effect.onExit(() => release(resource))
      )
      
      return result
    })
  )

// Пример использования
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    acquireUseRelease(
      Effect.sync(() => {
        console.log("Acquiring resource...")
        return { id: 1 }
      }),
      (resource) =>
        Effect.gen(function* () {
          console.log(`Using resource ${resource.id}...`)
          yield* Effect.sleep("1 second")
          console.log("Use completed")
          return "result"
        }),
      (resource) =>
        Effect.sync(() => console.log(`Releasing resource ${resource.id}`))
    )
  )
  
  yield* Effect.sleep("200 millis")
  yield* Fiber.interrupt(fiber)
})

Effect.runFork(program)
/*
Output:
Acquiring resource...
Using resource 1...
Releasing resource 1
*/

Effect.disconnect — Отсоединение от прерывания

Effect.disconnect позволяет uninterruptible эффекту продолжить выполнение в фоне, пока основной поток управления продолжает работу.

Проблема без disconnect


const uninterruptibleTask = Effect.gen(function* () {
  console.log("Start processing...")
  yield* Effect.sleep("2 seconds")
  console.log("Processing complete.")
  return "Result"
})

const withoutDisconnect = Effect.gen(function* () {
  const result = yield* uninterruptibleTask.pipe(
    Effect.uninterruptible,
    Effect.timeout("1 second")
  )
  return result
})

Effect.runPromiseExit(withoutDisconnect).then(console.log)
/*
Output:
Start processing...
Processing complete.  ← Ждём 2 секунды несмотря на timeout!
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
*/

Проблема: timeout срабатывает только ПОСЛЕ завершения uninterruptible эффекта.

Решение с disconnect


const withDisconnect = Effect.gen(function* () {
  const result = yield* uninterruptibleTask.pipe(
    Effect.uninterruptible,
    Effect.disconnect,  // Отсоединяем от родительского fiber
    Effect.timeout("1 second")
  )
  return result
})

Effect.runPromiseExit(withDisconnect).then(console.log)
/*
Output:
Start processing...
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
Processing complete.  ← Выполняется в фоне!
*/

Диаграмма disconnect

БЕЗ disconnect:
─────────────────────────────────────────────────────────────►
Main fiber     [===== uninterruptible (2s) =====]

Timeout (1s)  ─────────────────────────────────────► (ждёт!)

С disconnect:
─────────────────────────────────────────────────────────────►
Main fiber     [─────────]  ← timeout срабатывает

                    │ disconnect

Background    [===== uninterruptible (2s) =====]
              (продолжает независимо)

Практический пример: Graceful shutdown


const gracefulShutdown = (
  saveState: Effect.Effect<void>,
  timeout: number
) =>
  Effect.gen(function* () {
    yield* Effect.log("Starting graceful shutdown...")
    
    // Пытаемся сохранить состояние с таймаутом
    // Если не успеваем — продолжаем в фоне
    const result = yield* saveState.pipe(
      Effect.uninterruptible,
      Effect.disconnect,
      Effect.timeout(`${timeout} millis`),
      Effect.option
    )
    
    if (result._tag === "None") {
      yield* Effect.log("Save timed out, continuing in background")
    } else {
      yield* Effect.log("State saved successfully")
    }
    
    yield* Effect.log("Shutdown complete")
  })

const program = gracefulShutdown(
  Effect.gen(function* () {
    yield* Effect.log("Saving state...")
    yield* Effect.sleep("2 seconds")
    yield* Effect.log("State saved!")
  }),
  500
)

Effect.runFork(program)
/*
Output:
timestamp=... message="Starting graceful shutdown..."
timestamp=... message="Saving state..."
timestamp=... message="Save timed out, continuing in background"
timestamp=... message="Shutdown complete"
timestamp=... message="State saved!"  ← В фоне
*/

Finalizers и прерывание

Finalizers гарантированно выполняются даже при прерывании.

Effect.ensuring — Гарантированный finalizer


const withFinalizer = Effect.gen(function* () {
  yield* Effect.log("Starting work...")
  yield* Effect.sleep("1 second")
  yield* Effect.log("Work completed")
  return "result"
}).pipe(
  Effect.ensuring(
    Effect.log("Finalizer: cleanup executed!")
  )
)

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(withFinalizer)
  
  yield* Effect.sleep("200 millis")
  yield* Effect.log("Interrupting...")
  
  const exit = yield* Fiber.interrupt(fiber)
  yield* Effect.log(`Exit: ${exit._tag}`)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Starting work..."
timestamp=... message="Interrupting..."
timestamp=... message="Finalizer: cleanup executed!"
timestamp=... message="Exit: Failure"
*/

Effect.onInterrupt — Обработка прерывания


const withInterruptHandler = Effect.gen(function* () {
  yield* Effect.log("Working...")
  yield* Effect.sleep("1 second")
  return "done"
}).pipe(
  Effect.onInterrupt((interruptors) =>
    Effect.gen(function* () {
      const ids = [...interruptors].map(FiberId.threadName).join(", ")
      yield* Effect.log(`Interrupted by: ${ids}`)
      yield* Effect.log("Performing interrupt-specific cleanup...")
    })
  )
)

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(withInterruptHandler)
  
  yield* Effect.sleep("200 millis")
  yield* Fiber.interrupt(fiber)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Working..."
timestamp=... message="Interrupted by: #0"
timestamp=... message="Performing interrupt-specific cleanup..."
*/

Effect.onExit — Обработка любого завершения


const withExitHandler = Effect.gen(function* () {
  yield* Effect.log("Working...")
  yield* Effect.sleep("500 millis")
  return "result"
}).pipe(
  Effect.onExit((exit) =>
    Effect.gen(function* () {
      if (Exit.isSuccess(exit)) {
        yield* Effect.log(`Success: ${exit.value}`)
      } else if (Exit.isInterrupted(exit)) {
        yield* Effect.log("Was interrupted")
      } else {
        yield* Effect.log("Failed with error")
      }
    })
  )
)

// Тест успеха
Effect.runFork(withExitHandler)

// Тест прерывания
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(withExitHandler)
  yield* Effect.sleep("100 millis")
  yield* Fiber.interrupt(fiber)
})

Effect.runFork(program)

Порядок выполнения finalizers

Finalizers выполняются в обратном порядке добавления (LIFO):


const program = Effect.gen(function* () {
  yield* Effect.addFinalizer(() => Effect.log("Finalizer 1"))
  yield* Effect.addFinalizer(() => Effect.log("Finalizer 2"))
  yield* Effect.addFinalizer(() => Effect.log("Finalizer 3"))
  
  yield* Effect.log("Working...")
  yield* Effect.sleep("1 second")
})

const main = Effect.scoped(program)

Effect.runFork(
  Effect.gen(function* () {
    const fiber = yield* Effect.fork(main)
    yield* Effect.sleep("100 millis")
    yield* Fiber.interrupt(fiber)
  })
)
/*
Output:
timestamp=... message="Working..."
timestamp=... message="Finalizer 3"
timestamp=... message="Finalizer 2"
timestamp=... message="Finalizer 1"
*/

Таймауты и прерывание

Таймауты тесно связаны с прерыванием.

Effect.timeout — Базовый таймаут


const slowTask = Effect.gen(function* () {
  yield* Effect.log("Starting slow task...")
  yield* Effect.sleep("5 seconds")
  yield* Effect.log("Slow task completed")
  return "result"
})

const withTimeout = slowTask.pipe(
  Effect.timeout("1 second")
)

Effect.runPromiseExit(withTimeout).then(console.log)
/*
Output:
timestamp=... message="Starting slow task..."
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: { _tag: 'Fail', failure: { _tag: 'TimeoutException' } }
}
*/

Interruptible vs Uninterruptible при таймауте


// Interruptible — прерывается немедленно
const interruptibleTimeout = Effect.gen(function* () {
  console.log("Start")
  yield* Effect.sleep("2 seconds")
  console.log("End")
  return "Result"
}).pipe(
  Effect.timeout("1 second")
)

// Uninterruptible — таймаут ждёт завершения
const uninterruptibleTimeout = Effect.gen(function* () {
  console.log("Start")
  yield* Effect.sleep("2 seconds")
  console.log("End")
  return "Result"
}).pipe(
  Effect.uninterruptible,
  Effect.timeout("1 second")
)

// Interruptible
console.log("=== Interruptible ===")
await Effect.runPromiseExit(interruptibleTimeout)
// Output: Start, затем сразу timeout

console.log("\n=== Uninterruptible ===")
await Effect.runPromiseExit(uninterruptibleTimeout)
// Output: Start, End (ждём 2 сек), затем timeout

Effect.timeoutTo — Кастомное поведение при таймауте


const withCustomTimeout = Effect.gen(function* () {
  yield* Effect.sleep("2 seconds")
  return "completed"
}).pipe(
  Effect.timeoutTo({
    duration: "500 millis",
    onTimeout: () => "timed out" as const
  })
)

Effect.runPromise(withCustomTimeout).then(console.log)
// Output: timed out

Effect.timeoutFail — Кастомная ошибка


class CustomTimeoutError {
  readonly _tag = "CustomTimeoutError"
  constructor(readonly message: string) {}
}

const withCustomError = Effect.gen(function* () {
  yield* Effect.sleep("2 seconds")
  return "result"
}).pipe(
  Effect.timeoutFail({
    duration: "500 millis",
    onTimeout: () => new CustomTimeoutError("Operation took too long")
  })
)

Effect.runPromiseExit(withCustomError).then(console.log)
/*
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: {
    _tag: 'Fail',
    failure: CustomTimeoutError { _tag: 'CustomTimeoutError', message: 'Operation took too long' }
  }
}
*/

API Reference

Прерывание

ФункцияСигнатураОписание
Effect.interruptEffect<never>Прервать текущий файбер
Effect.interruptWith(FiberId) => Effect<never>Прервать с указанным ID
Fiber.interruptFiber<A, E> => Effect<Exit<A, E>>Прервать файбер
Fiber.interruptForkFiber<A, E> => Effect<void>Прервать без ожидания

Регионы прерываемости

ФункцияОписание
Effect.interruptibleСделать эффект прерываемым
Effect.uninterruptibleЗащитить от прерывания
Effect.uninterruptibleMaskГибкий контроль с restore
Effect.disconnectОтсоединить от родительского fiber

Finalizers

ФункцияОписание
Effect.ensuringГарантированный finalizer
Effect.onInterruptТолько при прерывании
Effect.onExitПри любом завершении
Effect.addFinalizerДобавить в Scope

Таймауты

ФункцияОписание
Effect.timeoutТаймаут с TimeoutException
Effect.timeoutToТаймаут с fallback значением
Effect.timeoutFailТаймаут с кастомной ошибкой
Effect.timeoutFailCauseТаймаут с кастомным Cause

Примеры

Пример 1: HTTP клиент с отменой


const httpRequest = (url: string, signal?: AbortSignal) =>
  Effect.tryPromise({
    try: () => fetch(url, { signal }).then((r) => r.json()),
    catch: (error) => new Error(`Fetch failed: ${error}`)
  })

const cancellableRequest = (url: string) =>
  Effect.asyncInterrupt<unknown, Error>((resume) => {
    const controller = new AbortController()
    
    fetch(url, { signal: controller.signal })
      .then((r) => r.json())
      .then((data) => resume(Effect.succeed(data)))
      .catch((error) => {
        if (error.name !== "AbortError") {
          resume(Effect.fail(new Error(`Fetch failed: ${error}`)))
        }
      })
    
    // Возвращаем Effect для отмены
    return Effect.sync(() => {
      console.log("Aborting request...")
      controller.abort()
    })
  })

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    cancellableRequest("https://api.example.com/slow")
  )
  
  yield* Effect.sleep("100 millis")
  yield* Effect.log("Cancelling request...")
  yield* Fiber.interrupt(fiber)
  yield* Effect.log("Request cancelled")
})

Пример 2: Транзакция с гарантированным rollback


interface Transaction {
  readonly id: string
  readonly operations: ReadonlyArray<string>
}

const executeTransaction = (tx: Transaction) =>
  Effect.uninterruptibleMask((restore) =>
    Effect.gen(function* () {
      const committed = yield* Ref.make<ReadonlyArray<string>>([])
      
      yield* Effect.log(`Starting transaction ${tx.id}`)
      
      // Выполняем операции (interruptible)
      const result = yield* restore(
        Effect.forEach(
          tx.operations,
          (op) =>
            Effect.gen(function* () {
              yield* Effect.log(`Executing: ${op}`)
              yield* Effect.sleep("200 millis")
              yield* Ref.update(committed, (ops) => [...ops, op])
              return op
            })
        )
      ).pipe(
        // При ошибке или прерывании — rollback
        Effect.onError(() =>
          Effect.gen(function* () {
            const ops = yield* Ref.get(committed)
            yield* Effect.log(`Rolling back ${ops.length} operations...`)
            
            // Rollback в обратном порядке
            yield* Effect.forEach(
              ops.reverse(),
              (op) => Effect.log(`Rollback: ${op}`),
              { discard: true }
            )
          })
        )
      )
      
      yield* Effect.log(`Transaction ${tx.id} committed`)
      return result
    })
  )

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    executeTransaction({
      id: "tx-001",
      operations: ["insert user", "update balance", "send notification"]
    })
  )
  
  yield* Effect.sleep("350 millis")
  yield* Effect.log("Interrupting transaction...")
  yield* Fiber.interrupt(fiber)
})

Effect.runFork(program)
/*
Output:
timestamp=... message="Starting transaction tx-001"
timestamp=... message="Executing: insert user"
timestamp=... message="Executing: update balance"
timestamp=... message="Interrupting transaction..."
timestamp=... message="Rolling back 2 operations..."
timestamp=... message="Rollback: update balance"
timestamp=... message="Rollback: insert user"
*/

Пример 3: Graceful shutdown сервера


interface Server {
  readonly activeRequests: Ref.Ref<number>
  readonly shutdown: Deferred.Deferred<void, never>
}

const createServer = Effect.gen(function* () {
  const activeRequests = yield* Ref.make(0)
  const shutdown = yield* Deferred.make<void, never>()
  
  return { activeRequests, shutdown } satisfies Server
})

const handleRequest = (server: Server, requestId: number) =>
  Effect.uninterruptibleMask((restore) =>
    Effect.gen(function* () {
      // Увеличиваем счётчик (uninterruptible)
      yield* Ref.update(server.activeRequests, (n) => n + 1)
      yield* Effect.log(`Request ${requestId}: started`)
      
      // Обработка запроса (interruptible)
      yield* restore(
        Effect.gen(function* () {
          yield* Effect.sleep(`${100 + Math.random() * 400} millis`)
        })
      ).pipe(
        Effect.onExit(() =>
          Effect.gen(function* () {
            yield* Ref.update(server.activeRequests, (n) => n - 1)
            yield* Effect.log(`Request ${requestId}: completed`)
          })
        )
      )
    })
  )

const gracefulShutdown = (server: Server, timeout: number) =>
  Effect.gen(function* () {
    yield* Effect.log("Initiating graceful shutdown...")
    yield* Deferred.succeed(server.shutdown, undefined)
    
    // Ждём завершения активных запросов
    yield* Effect.gen(function* () {
      while (true) {
        const active = yield* Ref.get(server.activeRequests)
        if (active === 0) {
          yield* Effect.log("All requests completed")
          return
        }
        yield* Effect.log(`Waiting for ${active} requests...`)
        yield* Effect.sleep("100 millis")
      }
    }).pipe(
      Effect.timeout(`${timeout} millis`),
      Effect.catchAll(() => Effect.log("Shutdown timeout, forcing..."))
    )
    
    yield* Effect.log("Server shutdown complete")
  })

const program = Effect.gen(function* () {
  const server = yield* createServer
  
  // Запускаем несколько запросов
  const requestFibers = yield* Effect.forEach(
    [1, 2, 3, 4, 5],
    (id) => Effect.fork(handleRequest(server, id))
  )
  
  // Через 200ms инициируем shutdown
  yield* Effect.sleep("200 millis")
  yield* gracefulShutdown(server, 2000)
})

Effect.runFork(program)

Упражнения

Упражнение

Упражнение 1: Защита критической секции

Легко

Создайте функцию, которая защищает эффект от прерывания.

import { Effect, Fiber } from "effect"

const protect = <A, E, R>(
  effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
  // Ваша реализация
  ???

// Тест
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    protect(
      Effect.gen(function* () {
        yield* Effect.log("Start")
        yield* Effect.sleep("500 millis")
        yield* Effect.log("End")
      })
    )
  )
  
  yield* Effect.sleep("100 millis")
  yield* Fiber.interrupt(fiber)
  yield* Effect.log("After interrupt")
})
// Ожидается: Start, End, After interrupt
Упражнение

Упражнение 2: Таймаут с fallback

Легко

Создайте функцию с таймаутом и значением по умолчанию.

import { Effect } from "effect"

const withTimeoutDefault = <A, E, R>(
  effect: Effect.Effect<A, E, R>,
  timeout: number,
  defaultValue: A
): Effect.Effect<A, E, R> =>
  // Ваша реализация
  ???
Упражнение

Упражнение 3: Acquire-Use-Release с прерыванием

Средне

Реализуйте паттерн безопасного управления ресурсами.

import { Effect } from "effect"

const bracket = <R, A, E, B>(
  acquire: Effect.Effect<R>,
  use: (resource: R) => Effect.Effect<A, E>,
  release: (resource: R) => Effect.Effect<B>
): Effect.Effect<A, E> =>
  // Ваша реализация:
  // 1. acquire — uninterruptible
  // 2. use — interruptible
  // 3. release — uninterruptible, всегда выполняется
  ???
Упражнение

Упражнение 4: Cancellation token pattern

Сложно

Реализуйте паттерн отмены через токен.

import { Effect, Fiber, Deferred, Ref } from "effect"

interface CancellationToken {
  readonly isCancelled: Effect.Effect<boolean>
  readonly cancel: Effect.Effect<void>
  readonly throwIfCancelled: Effect.Effect<void, "Cancelled">
}

const createCancellationToken = (): Effect.Effect<CancellationToken> =>
  // Ваша реализация
  ???

const withCancellation = <A, E, R>(
  token: CancellationToken,
  effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E | "Cancelled", R> =>
  // Ваша реализация
  ???

Заключение

Модель прерывания в Effect обеспечивает:

  • Безопасную отмену — finalizers всегда выполняются
  • Гибкий контроль — interruptible/uninterruptible регионы
  • Композицию — uninterruptibleMask для сложных сценариев
  • Disconnect — независимое выполнение в фоне

В следующей статье мы изучим FiberRef — механизм локального хранения данных для файберов.