Supervisor
Стратегии супервизии файберов.
Введение в Supervisor
Supervisor<A> — это утилита для управления файберами в Effect, позволяющая отслеживать их жизненный цикл (создание и завершение) и производить значение типа A, отражающее результат супервизии.
Зачем нужен Supervisor?
┌─────────────────────────────────────────────────────────────────┐
│ СЛУЧАИ ИСПОЛЬЗОВАНИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Мониторинг │
│ └── Отслеживание количества активных файберов │
│ │
│ 2. Отладка │
│ └── Логирование создания/завершения файберов │
│ │
│ 3. Graceful Shutdown │
│ └── Ожидание завершения всех дочерних файберов │
│ │
│ 4. Метрики │
│ └── Сбор статистики по файберам │
│ │
│ 5. Resource Management │
│ └── Отслеживание файберов для cleanup │
│ │
└─────────────────────────────────────────────────────────────────┘
Концептуальная модель
┌─────────────────────────────────────────────────────────────────┐
│ │
│ Supervisor │
│ ┌───────────────┐ │
│ │ Track Fibers │ │
│ │ ┌─────────┐ │ │
│ │ │ Fiber 1 │ │ │
│ │ │ Fiber 2 │ │ │
│ │ │ Fiber 3 │ │ │
│ │ └─────────┘ │ │
│ └───────┬───────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Value A │ │
│ │ (fiber stats) │ │
│ └───────────────┘ │
│ │
│ Supervised Effect │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ fork ──────────► Fiber 1 ◄──── tracked │ │
│ │ fork ──────────► Fiber 2 ◄──── tracked │ │
│ │ fork ──────────► Fiber 3 ◄──── tracked │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Создание Supervisor
Supervisor.track — Отслеживание файберов
Основной способ создания Supervisor — Supervisor.track, который отслеживает все дочерние файберы.
const program = Effect.gen(function* () {
// Создаём supervisor
const supervisor = yield* Supervisor.track
// supervisor.value содержит текущий список файберов
const fibers = yield* supervisor.value
console.log("Initial fibers:", fibers.length) // 0
})
Effect.runFork(program)
Supervisor.fromEffect — Кастомный supervisor
const createCountingSupervisor = Effect.gen(function* () {
const count = yield* Ref.make(0)
const supervisor = Supervisor.fromEffect(
Ref.get(count)
)
// Этот supervisor считает файберы
// (упрощённый пример, реальная логика сложнее)
return supervisor
})
Supervisor.none — Пустой supervisor
// Supervisor, который ничего не делает
const noopSupervisor = Supervisor.none
Supervisor.fibersIn — Supervisor для коллекции
const program = Effect.gen(function* () {
// Создаём supervisor с SortedSet для хранения файберов
const supervisor = yield* Supervisor.fibersIn(
SortedSet.empty<Fiber.RuntimeFiber<unknown, unknown>>()
)
const fibers = yield* supervisor.value
console.log("Tracked fibers:", SortedSet.size(fibers))
})
Effect.supervised — Супервизия эффектов
Effect.supervised позволяет применить supervisor к эффекту, отслеживая все файберы, созданные внутри него.
Базовый пример
const program = Effect.gen(function* () {
// Создаём tracking supervisor
const supervisor = yield* Supervisor.track
// Запускаем supervised эффект
const fiber = yield* Effect.gen(function* () {
// Fork несколько файберов
yield* Effect.fork(Effect.sleep("100 millis"))
yield* Effect.fork(Effect.sleep("200 millis"))
yield* Effect.fork(Effect.sleep("300 millis"))
yield* Effect.sleep("50 millis")
// Проверяем количество отслеживаемых файберов
const fibers = yield* supervisor.value
console.log("Active fibers:", fibers.length)
return "done"
}).pipe(
Effect.supervised(supervisor),
Effect.fork
)
yield* Fiber.join(fiber)
})
Effect.runFork(program)
/*
Output:
Active fibers: 3
*/
Мониторинг Fibonacci вычисления
// Рекурсивный Fibonacci с параллельными вызовами
const fib = (n: number): Effect.Effect<number> =>
Effect.gen(function* () {
if (n <= 1) {
return 1
}
yield* Effect.sleep("500 millis") // Симуляция работы
// Fork два рекурсивных вызова
const fiber1 = yield* Effect.fork(fib(n - 2))
const fiber2 = yield* Effect.fork(fib(n - 1))
// Ждём результаты
const a = yield* Fiber.join(fiber1)
const b = yield* Fiber.join(fiber2)
return a + b
})
// Мониторинг файберов
const monitorFibers = (
supervisor: Supervisor.Supervisor<Array<Fiber.RuntimeFiber<unknown, unknown>>>
): Effect.Effect<void> =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
console.log(`Active fibers: ${fibers.length}`)
})
const program = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
// Запускаем Fibonacci с супервизией
const fibFiber = yield* fib(5).pipe(
Effect.supervised(supervisor),
Effect.fork
)
// Мониторим пока Fibonacci работает
const monitorFiber = yield* monitorFibers(supervisor).pipe(
Effect.repeat(
Schedule.spaced("500 millis").pipe(
Schedule.whileInputEffect(() =>
Fiber.status(fibFiber).pipe(
Effect.map((status) => status !== FiberStatus.done)
)
)
)
),
Effect.fork
)
yield* Fiber.join(monitorFiber)
const result = yield* Fiber.join(fibFiber)
console.log(`Fibonacci result: ${result}`)
})
Effect.runFork(program)
/*
Output (примерный):
Active fibers: 2
Active fibers: 4
Active fibers: 6
Active fibers: 8
...
Fibonacci result: 8
*/
Стратегии супервизии
Стратегия 1: Track and Wait
Отслеживаем все файберы и ждём их завершения.
const trackAndWait = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const result = yield* effect.pipe(Effect.supervised(supervisor))
// Ждём завершения всех дочерних файберов
const fibers = yield* supervisor.value
yield* Effect.forEach(fibers, Fiber.await, { discard: true })
return result
})
const program = trackAndWait(
Effect.gen(function* () {
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.log("Background task started")
yield* Effect.sleep("500 millis")
yield* Effect.log("Background task completed")
})
)
yield* Effect.log("Main work done")
return "result"
})
)
Effect.runFork(program)
/*
Output:
timestamp=... message="Background task started"
timestamp=... message="Main work done"
timestamp=... message="Background task completed"
*/
Стратегия 2: Track and Interrupt
Отслеживаем файберы и прерываем их при завершении основного эффекта.
const trackAndInterrupt = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const result = yield* effect.pipe(Effect.supervised(supervisor))
// Прерываем все дочерние файберы
const fibers = yield* supervisor.value
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
return result
})
Стратегия 3: Monitor with Callback
Мониторим файберы с callback при создании/завершении.
interface FiberEvent {
readonly type: "started" | "completed"
readonly fiberId: string
readonly timestamp: number
}
const createMonitoringSupervisor = (
onEvent: (event: FiberEvent) => void
) =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
// Периодически проверяем состояние файберов
// (упрощённая версия — реальная реализация сложнее)
return supervisor
})
Стратегия 4: Graceful Shutdown с таймаутом
const gracefulShutdown = <A, E, R>(
effect: Effect.Effect<A, E, R>,
timeout: Duration.DurationInput
): Effect.Effect<A, E, R> =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const result = yield* effect.pipe(Effect.supervised(supervisor))
// Ждём завершения с таймаутом
const fibers = yield* supervisor.value
if (fibers.length > 0) {
yield* Effect.forEach(fibers, Fiber.await, { discard: true }).pipe(
Effect.timeout(timeout),
Effect.catchAll(() =>
Effect.gen(function* () {
yield* Effect.log(`Timeout reached, interrupting ${fibers.length} fibers`)
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
})
)
)
}
return result
})
Мониторинг файберов
Получение списка активных файберов
const getActiveFibers = (
supervisor: Supervisor.Supervisor<Array<Fiber.RuntimeFiber<unknown, unknown>>>
) =>
supervisor.value
const program = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
yield* Effect.supervised(supervisor)(
Effect.gen(function* () {
yield* Effect.fork(Effect.sleep("1 second"))
yield* Effect.fork(Effect.sleep("2 seconds"))
const fibers = yield* getActiveFibers(supervisor)
for (const fiber of fibers) {
const status = yield* Fiber.status(fiber)
console.log(`Fiber ${fiber.id()}: ${status._tag}`)
}
})
)
})
Effect.runFork(program)
Подсчёт файберов по статусу
interface FiberStats {
readonly running: number
readonly suspended: number
readonly done: number
}
const getFiberStats = (
supervisor: Supervisor.Supervisor<Array<Fiber.RuntimeFiber<unknown, unknown>>>
): Effect.Effect<FiberStats> =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
let running = 0
let suspended = 0
let done = 0
for (const fiber of fibers) {
const status = yield* Fiber.status(fiber)
if (FiberStatus.isRunning(status)) running++
else if (FiberStatus.isSuspended(status)) suspended++
else if (FiberStatus.isDone(status)) done++
}
return { running, suspended, done }
})
Периодический мониторинг
const periodicMonitor = (
supervisor: Supervisor.Supervisor<Array<Fiber.RuntimeFiber<unknown, unknown>>>,
interval: string
) =>
Effect.repeat(
Effect.gen(function* () {
const fibers = yield* supervisor.value
const timestamp = new Date().toISOString()
yield* Effect.log(`[${timestamp}] Active fibers: ${fibers.length}`)
}),
Schedule.spaced(interval)
)
const program = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
// Запускаем мониторинг
const monitorFiber = yield* Effect.fork(
periodicMonitor(supervisor, "500 millis")
)
// Основная работа с супервизией
yield* Effect.supervised(supervisor)(
Effect.gen(function* () {
yield* Effect.fork(Effect.sleep("1 second").pipe(Effect.as("task1")))
yield* Effect.fork(Effect.sleep("2 seconds").pipe(Effect.as("task2")))
yield* Effect.fork(Effect.sleep("1.5 seconds").pipe(Effect.as("task3")))
yield* Effect.sleep("3 seconds")
})
)
yield* Fiber.interrupt(monitorFiber)
})
Effect.runFork(program)
Практические паттерны
Паттерн 1: Worker Pool с мониторингом
interface WorkerPoolStats {
readonly activeWorkers: number
readonly totalTasksProcessed: number
readonly queueSize: number
}
const createMonitoredWorkerPool = (workerCount: number) =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const taskQueue = yield* Queue.unbounded<Effect.Effect<void>>()
const processedCount = yield* Ref.make(0)
// Создаём воркеров
const workers = yield* Effect.forEach(
Array.from({ length: workerCount }, (_, i) => i),
(workerId) =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(taskQueue)
yield* task
yield* Ref.update(processedCount, (n) => n + 1)
})
)
).pipe(Effect.supervised(supervisor))
)
const getStats = (): Effect.Effect<WorkerPoolStats> =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
const processed = yield* Ref.get(processedCount)
const queueSize = yield* Queue.size(taskQueue)
return {
activeWorkers: fibers.length,
totalTasksProcessed: processed,
queueSize
}
})
const submitTask = (task: Effect.Effect<void>) =>
Queue.offer(taskQueue, task)
const shutdown = () =>
Effect.gen(function* () {
yield* Queue.shutdown(taskQueue)
const fibers = yield* supervisor.value
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
})
return {
submitTask,
getStats,
shutdown
}
})
Паттерн 2: Request Lifecycle Tracking
interface RequestTracker {
readonly startRequest: (requestId: string) => Effect.Effect<void>
readonly endRequest: (requestId: string) => Effect.Effect<void>
readonly getActiveRequests: Effect.Effect<ReadonlyArray<string>>
}
const createRequestTracker = (): Effect.Effect<RequestTracker> =>
Effect.gen(function* () {
const activeRequests = yield* Ref.make<HashMap.HashMap<string, number>>(
HashMap.empty()
)
return {
startRequest: (requestId) =>
Ref.update(activeRequests, HashMap.set(requestId, Date.now())),
endRequest: (requestId) =>
Ref.update(activeRequests, HashMap.remove(requestId)),
getActiveRequests: Ref.get(activeRequests).pipe(
Effect.map((map) => [...HashMap.keys(map)])
)
}
})
Паттерн 3: Fiber Tree Visualization
interface FiberNode {
readonly id: string
readonly status: string
readonly children: ReadonlyArray<FiberNode>
}
const buildFiberTree = (
supervisor: Supervisor.Supervisor<Array<Fiber.RuntimeFiber<unknown, unknown>>>
): Effect.Effect<ReadonlyArray<FiberNode>> =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
const nodes: FiberNode[] = []
for (const fiber of fibers) {
const status = yield* Fiber.status(fiber)
nodes.push({
id: `#${fiber.id()}`,
status: status._tag,
children: []
})
}
return nodes
})
const printFiberTree = (nodes: ReadonlyArray<FiberNode>, indent = 0) => {
for (const node of nodes) {
console.log(`${" ".repeat(indent)}├─ ${node.id} [${node.status}]`)
printFiberTree(node.children, indent + 1)
}
}
Паттерн 4: Supervised Service
// Service для мониторинга
class FiberMonitor extends Context.Tag("FiberMonitor")<
FiberMonitor,
{
readonly getActiveCount: Effect.Effect<number>
readonly interruptAll: Effect.Effect<void>
}
>() {}
const FiberMonitorLive = Layer.effect(
FiberMonitor,
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
return {
getActiveCount: supervisor.value.pipe(
Effect.map((fibers) => fibers.length)
),
interruptAll: Effect.gen(function* () {
const fibers = yield* supervisor.value
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
})
}
})
)
API Reference
Создание Supervisor
| Функция | Сигнатура | Описание |
|---|---|---|
Supervisor.track | Effect<Supervisor<Array<RuntimeFiber>>> | Отслеживающий supervisor |
Supervisor.none | Supervisor<void> | Пустой supervisor |
Supervisor.fibersIn | (Set) => Effect<Supervisor<Set>> | Supervisor с кастомным Set |
Supervisor.fromEffect | (Effect<A>) => Supervisor<A> | Supervisor из Effect |
Операции
| Функция | Описание |
|---|---|
supervisor.value | Получить текущее значение (список файберов) |
Effect.supervised(supervisor) | Применить supervisor к эффекту |
Комбинаторы
| Функция | Описание |
|---|---|
Supervisor.zip | Объединить два supervisor |
Supervisor.map | Трансформировать значение supervisor |
Примеры
Пример 1: Dashboard с метриками файберов
interface Dashboard {
readonly totalCreated: number
readonly totalCompleted: number
readonly currentlyActive: number
readonly peakActive: number
}
const createDashboard = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const totalCreated = yield* Ref.make(0)
const totalCompleted = yield* Ref.make(0)
const peakActive = yield* Ref.make(0)
const getMetrics = (): Effect.Effect<Dashboard> =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
const created = yield* Ref.get(totalCreated)
const completed = yield* Ref.get(totalCompleted)
const peak = yield* Ref.get(peakActive)
// Обновляем peak
if (fibers.length > peak) {
yield* Ref.set(peakActive, fibers.length)
}
return {
totalCreated: created,
totalCompleted: completed,
currentlyActive: fibers.length,
peakActive: yield* Ref.get(peakActive)
}
})
const trackFiber = <A, E>(effect: Effect.Effect<A, E>) =>
Effect.gen(function* () {
yield* Ref.update(totalCreated, (n) => n + 1)
const result = yield* effect
yield* Ref.update(totalCompleted, (n) => n + 1)
return result
})
return {
supervisor,
getMetrics,
trackFiber
}
})
const program = Effect.gen(function* () {
const dashboard = yield* createDashboard
// Периодический вывод метрик
const metricsFiber = yield* Effect.fork(
Effect.repeat(
Effect.gen(function* () {
const metrics = yield* dashboard.getMetrics()
console.log("Dashboard:", JSON.stringify(metrics))
}),
Schedule.spaced("300 millis")
)
)
// Симулируем работу
yield* Effect.supervised(dashboard.supervisor)(
Effect.gen(function* () {
for (let i = 0; i < 5; i++) {
yield* Effect.fork(
dashboard.trackFiber(
Effect.sleep(`${(i + 1) * 200} millis`)
)
)
yield* Effect.sleep("100 millis")
}
yield* Effect.sleep("1.5 seconds")
})
)
yield* Fiber.interrupt(metricsFiber)
const finalMetrics = yield* dashboard.getMetrics()
console.log("Final:", JSON.stringify(finalMetrics))
})
Effect.runFork(program)
Пример 2: Supervised HTTP Server
interface Request {
readonly id: string
readonly path: string
}
interface Response {
readonly status: number
readonly body: string
}
const createSupervisedServer = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const requestCount = yield* Ref.make(0)
const handleRequest = (request: Request): Effect.Effect<Response> =>
Effect.gen(function* () {
yield* Ref.update(requestCount, (n) => n + 1)
yield* Effect.log(`Handling ${request.path}`)
// Симуляция обработки
yield* Effect.sleep(`${100 + Math.random() * 200} millis`)
return {
status: 200,
body: `Response for ${request.path}`
}
})
const processRequests = (requests: ReadonlyArray<Request>) =>
Effect.supervised(supervisor)(
Effect.forEach(
requests,
(request) => Effect.fork(handleRequest(request)),
{ concurrency: "unbounded" }
).pipe(
Effect.flatMap((fibers) =>
Effect.forEach(fibers, Fiber.join)
)
)
)
const getStats = () =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
const count = yield* Ref.get(requestCount)
return {
activeRequests: fibers.length,
totalRequests: count
}
})
const shutdown = () =>
Effect.gen(function* () {
const fibers = yield* supervisor.value
yield* Effect.log(`Shutting down ${fibers.length} active requests`)
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
})
return {
processRequests,
getStats,
shutdown
}
})
const program = Effect.gen(function* () {
const server = yield* createSupervisedServer
const requests: ReadonlyArray<Request> = [
{ id: "1", path: "/users" },
{ id: "2", path: "/products" },
{ id: "3", path: "/orders" }
]
const responses = yield* server.processRequests(requests)
console.log("Responses:", responses)
const stats = yield* server.getStats()
console.log("Stats:", stats)
})
Effect.runFork(program)
Пример 3: Circuit Breaker с мониторингом
type CircuitState = "closed" | "open" | "half-open"
const createCircuitBreaker = (
failureThreshold: number,
resetTimeout: number
) =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const state = yield* Ref.make<CircuitState>("closed")
const failures = yield* Ref.make(0)
const successesInHalfOpen = yield* Ref.make(0)
const execute = <A, E>(effect: Effect.Effect<A, E>) =>
Effect.gen(function* () {
const currentState = yield* Ref.get(state)
if (currentState === "open") {
return yield* Effect.fail("Circuit is open" as const)
}
const fiber = yield* Effect.fork(effect).pipe(
Effect.supervised(supervisor)
)
const exit = yield* Fiber.await(fiber)
if (exit._tag === "Success") {
yield* Ref.set(failures, 0)
if (currentState === "half-open") {
const successes = yield* Ref.updateAndGet(successesInHalfOpen, (n) => n + 1)
if (successes >= 3) {
yield* Ref.set(state, "closed")
yield* Effect.log("Circuit closed")
}
}
return exit.value
} else {
const failCount = yield* Ref.updateAndGet(failures, (n) => n + 1)
if (failCount >= failureThreshold) {
yield* Ref.set(state, "open")
yield* Effect.log("Circuit opened!")
// Schedule reset
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(`${resetTimeout} millis`)
yield* Ref.set(state, "half-open")
yield* Ref.set(successesInHalfOpen, 0)
yield* Effect.log("Circuit half-open")
})
)
}
return yield* Effect.failCause(exit.cause)
}
})
const getState = Ref.get(state)
const getActiveOperations = supervisor.value.pipe(
Effect.map((fibers) => fibers.length)
)
return { execute, getState, getActiveOperations }
})
Упражнения
Упражнение 1: Простой мониторинг
Создайте supervisor и отследите количество файберов.
import { Effect, Supervisor, Fiber } from "effect"
const program = Effect.gen(function* () {
// 1. Создайте supervisor
// 2. Запустите supervised эффект с несколькими fork
// 3. Выведите количество активных файберов
???
})import { Effect, Supervisor, Fiber } from "effect"
const program = Effect.gen(function* () {
const supervisor = yield* Supervisor.track
yield* Effect.supervised(supervisor)(
Effect.gen(function* () {
yield* Effect.fork(Effect.sleep("1 second"))
yield* Effect.fork(Effect.sleep("2 seconds"))
yield* Effect.fork(Effect.sleep("3 seconds"))
const fibers = yield* supervisor.value
console.log(`Active fibers: ${fibers.length}`)
})
)
})
Effect.runFork(program)
// Output: Active fibers: 3Упражнение 2: Graceful shutdown
Реализуйте graceful shutdown, который ждёт завершения всех файберов.
import { Effect, Supervisor, Fiber } from "effect"
const gracefulShutdown = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
// Ваша реализация
???import { Effect, Supervisor, Fiber } from "effect"
const gracefulShutdown = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<A, E, R> =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const result = yield* effect.pipe(Effect.supervised(supervisor))
const fibers = yield* supervisor.value
console.log(`Waiting for ${fibers.length} fibers to complete...`)
yield* Effect.forEach(fibers, Fiber.await, { discard: true })
console.log("All fibers completed!")
return result
})
// Тест
const program = gracefulShutdown(
Effect.gen(function* () {
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.log("Task 1 started")
yield* Effect.sleep("500 millis")
yield* Effect.log("Task 1 completed")
})
)
yield* Effect.log("Main work done")
return "result"
})
)
Effect.runFork(program)Упражнение 3: Fiber Stats Service
Создайте сервис для сбора статистики о файберах.
import { Effect, Supervisor, Fiber, FiberStatus, Context, Layer } from "effect"
interface FiberStats {
readonly active: number
readonly running: number
readonly suspended: number
}
class FiberStatsService extends Context.Tag("FiberStatsService")<
FiberStatsService,
{
readonly getStats: Effect.Effect<FiberStats>
readonly supervised: <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
}
>() {}
const FiberStatsServiceLive: Layer.Layer<FiberStatsService> =
// Ваша реализация
???import { Effect, Supervisor, Fiber, FiberStatus, Context, Layer } from "effect"
interface FiberStats {
readonly active: number
readonly running: number
readonly suspended: number
}
class FiberStatsService extends Context.Tag("FiberStatsService")<
FiberStatsService,
{
readonly getStats: Effect.Effect<FiberStats>
readonly supervised: <A, E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>
}
>() {}
const FiberStatsServiceLive = Layer.effect(
FiberStatsService,
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
return {
getStats: Effect.gen(function* () {
const fibers = yield* supervisor.value
let running = 0
let suspended = 0
for (const fiber of fibers) {
const status = yield* Fiber.status(fiber)
if (FiberStatus.isRunning(status)) running++
else if (FiberStatus.isSuspended(status)) suspended++
}
return {
active: fibers.length,
running,
suspended
}
}),
supervised: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
effect.pipe(Effect.supervised(supervisor))
}
})
)
// Тест
const program = Effect.gen(function* () {
const service = yield* FiberStatsService
yield* service.supervised(
Effect.gen(function* () {
yield* Effect.fork(Effect.sleep("1 second"))
yield* Effect.fork(Effect.sleep("2 seconds"))
yield* Effect.sleep("100 millis")
const stats = yield* service.getStats
console.log("Stats:", stats)
})
)
}).pipe(Effect.provide(FiberStatsServiceLive))
Effect.runFork(program)Упражнение 4: Adaptive Worker Pool
Создайте пул воркеров, который автоматически масштабируется на основе нагрузки.
import { Effect, Supervisor, Fiber, Queue, Ref, Schedule } from "effect"
interface AdaptivePool {
readonly submit: <A>(task: Effect.Effect<A>) => Effect.Effect<A>
readonly getWorkerCount: Effect.Effect<number>
readonly getQueueSize: Effect.Effect<number>
}
const createAdaptivePool = (config: {
readonly minWorkers: number
readonly maxWorkers: number
readonly scaleUpThreshold: number
readonly scaleDownThreshold: number
}): Effect.Effect<AdaptivePool, never, Scope> =>
// Ваша реализация
???import { Effect, Supervisor, Fiber, Queue, Ref, Schedule, Scope, Deferred } from "effect"
interface AdaptivePool {
readonly submit: <A>(task: Effect.Effect<A>) => Effect.Effect<A>
readonly getWorkerCount: Effect.Effect<number>
readonly getQueueSize: Effect.Effect<number>
}
interface WorkItem {
readonly task: Effect.Effect<unknown>
readonly deferred: Deferred.Deferred<unknown, unknown>
}
const createAdaptivePool = (config: {
readonly minWorkers: number
readonly maxWorkers: number
readonly scaleUpThreshold: number
readonly scaleDownThreshold: number
}): Effect.Effect<AdaptivePool, never, Scope> =>
Effect.gen(function* () {
const supervisor = yield* Supervisor.track
const queue = yield* Queue.unbounded<WorkItem>()
const workers = yield* Ref.make<Array<Fiber.RuntimeFiber<void, never>>>([])
const createWorker = () =>
Effect.forever(
Effect.gen(function* () {
const item = yield* Queue.take(queue)
const exit = yield* Effect.exit(item.task)
if (exit._tag === "Success") {
yield* Deferred.succeed(item.deferred, exit.value)
} else {
yield* Deferred.failCause(item.deferred, exit.cause)
}
})
).pipe(Effect.interruptible)
const addWorker = Effect.gen(function* () {
const currentWorkers = yield* Ref.get(workers)
if (currentWorkers.length < config.maxWorkers) {
const fiber = yield* Effect.forkScoped(createWorker())
yield* Ref.update(workers, (ws) => [...ws, fiber])
yield* Effect.log(`Added worker. Total: ${currentWorkers.length + 1}`)
}
})
const removeWorker = Effect.gen(function* () {
const currentWorkers = yield* Ref.get(workers)
if (currentWorkers.length > config.minWorkers) {
const toRemove = currentWorkers[currentWorkers.length - 1]
if (toRemove) {
yield* Fiber.interrupt(toRemove)
yield* Ref.set(workers, currentWorkers.slice(0, -1))
yield* Effect.log(`Removed worker. Total: ${currentWorkers.length - 1}`)
}
}
})
// Initialize min workers
yield* Effect.forEach(
Array.from({ length: config.minWorkers }),
() => addWorker,
{ discard: true }
)
// Auto-scaler
yield* Effect.forkScoped(
Effect.repeat(
Effect.gen(function* () {
const queueSize = yield* Queue.size(queue)
const workerCount = yield* Ref.get(workers).pipe(Effect.map((w) => w.length))
if (queueSize > config.scaleUpThreshold) {
yield* addWorker
} else if (queueSize < config.scaleDownThreshold && workerCount > config.minWorkers) {
yield* removeWorker
}
}),
Schedule.spaced("500 millis")
)
)
return {
submit: <A>(task: Effect.Effect<A>) =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, unknown>()
yield* Queue.offer(queue, { task, deferred } as WorkItem)
return yield* Deferred.await(deferred) as A
}),
getWorkerCount: Ref.get(workers).pipe(Effect.map((w) => w.length)),
getQueueSize: Queue.size(queue)
}
})
// Тест
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* createAdaptivePool({
minWorkers: 2,
maxWorkers: 5,
scaleUpThreshold: 5,
scaleDownThreshold: 1
})
// Создаём нагрузку
const tasks = Array.from({ length: 15 }, (_, i) =>
pool.submit(
Effect.gen(function* () {
yield* Effect.sleep(`${100 + Math.random() * 200} millis`)
return i
})
)
)
// Мониторинг
yield* Effect.fork(
Effect.repeat(
Effect.gen(function* () {
const workers = yield* pool.getWorkerCount
const queueSize = yield* pool.getQueueSize
yield* Effect.log(`Workers: ${workers}, Queue: ${queueSize}`)
}),
Schedule.spaced("200 millis").pipe(Schedule.intersect(Schedule.recurs(20)))
)
)
const results = yield* Effect.all(tasks, { concurrency: "unbounded" })
yield* Effect.log(`Completed ${results.length} tasks`)
})
)
Effect.runFork(program)Заключение
Supervisor в Effect предоставляет мощные возможности для мониторинга и управления файберами:
- Отслеживание — наблюдение за созданием и завершением файберов
- Мониторинг — получение статистики в реальном времени
- Graceful Shutdown — корректное завершение всех дочерних файберов
- Кастомные стратегии — гибкая настройка поведения супервизии
Supervisor особенно полезен для:
- Отладки конкурентных приложений
- Сбора метрик
- Реализации graceful shutdown
- Управления пулами ресурсов
Это завершает модуль о Fiber — фундаментальном строительном блоке конкурентности в Effect. В следующих модулях мы изучим более высокоуровневые абстракции, построенные на основе файберов: Schedule, Stream и другие.