Forking
Стратегии создания файберов.
Введение в Forking
Forking — это процесс создания нового файбера из эффекта. При fork’е эффект начинает выполняться параллельно, и вы получаете ссылку на созданный файбер для дальнейшего управления.
Базовая концепция
const myEffect: Effect.Effect<number> = Effect.succeed(42)
// Fork возвращает Effect, производящий RuntimeFiber
// ┌─── Effect<RuntimeFiber<number, never>, never, never>
// ▼
const fiberEffect = Effect.fork(myEffect)
Четыре стратегии жизненного цикла
┌─────────────────────────────────────────────────────────────────┐
│ СТРАТЕГИИ FORKING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Effect.fork (Automatic Supervision) │
│ └── Файбер привязан к родителю │
│ │
│ 2. Effect.forkDaemon (Global Scope) │
│ └── Файбер привязан к глобальному scope │
│ │
│ 3. Effect.forkScoped (Local Scope) │
│ └── Файбер привязан к текущему Scope │
│ │
│ 4. Effect.forkIn (Specific Scope) │
│ └── Файбер привязан к указанному Scope │
│ │
└─────────────────────────────────────────────────────────────────┘
Effect.fork — Базовый fork
Effect.fork создаёт дочерний файбер, который автоматически супервизируется родительским файбером. Это структурная конкурентность в действии.
Сигнатура
declare const fork: <A, E, R>(
self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>
Ключевые характеристики
- Дочерний файбер наследует контекст родителя (services, FiberRef и т.д.)
- При завершении родителя все дочерние файберы прерываются
- Ошибки в дочерних файберах НЕ propagate к родителю автоматически
- Идеально для краткоживущих параллельных задач
Базовый пример
const fib = (n: number): Effect.Effect<number> =>
n < 2
? Effect.succeed(n)
: Effect.zipWith(fib(n - 1), fib(n - 2), (a, b) => a + b)
const program = Effect.gen(function* () {
// Fork вычисления Фибоначчи
const fiber = yield* Effect.fork(fib(10))
console.log("Fibonacci calculation started...")
// Делаем другую работу
yield* Effect.sleep("100 millis")
// Получаем результат
const result = yield* Fiber.join(fiber)
console.log(`Result: ${result}`)
})
Effect.runFork(program)
/*
Output:
Fibonacci calculation started...
Result: 55
*/
Автоматическая супервизия
// Дочерний файбер, который логирует каждую секунду
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
// Дочерний файбер супервизируется родителем
yield* Effect.fork(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
// При завершении parent, child автоматически прерывается
})
Effect.runFork(parent)
/*
Output:
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!
*/
Визуализация жизненного цикла
Parent Fiber
╔═══════════════════════════════════════════╗
║ start running finished ║
╚═══════════════════════════════════════════╝
│ │
│ fork │ interrupt (automatic)
▼ ▼
╔═══════════════════════════════════════════╗
║ Child Fiber (supervised) ║
║ running...running...running...|STOP| ║
╚═══════════════════════════════════════════╝
Многоуровневая иерархия
Структурная конкурентность работает на любом уровне вложенности:
const grandchild = Effect.repeat(
Console.log(" grandchild: tick"),
Schedule.fixed("300 millis")
)
const child = Effect.gen(function* () {
yield* Effect.fork(grandchild)
yield* Effect.repeat(
Console.log(" child: tick"),
Schedule.fixed("500 millis")
)
})
const parent = Effect.gen(function* () {
console.log("parent: start")
yield* Effect.fork(child)
yield* Effect.sleep("2 seconds")
console.log("parent: finish")
})
Effect.runFork(parent)
/*
Output:
parent: start
grandchild: tick
child: tick
grandchild: tick
grandchild: tick
child: tick
grandchild: tick
grandchild: tick
child: tick
grandchild: tick
parent: finish
← grandchild и child прерываются вместе с parent
*/
Effect.forkDaemon — Daemon файберы
Effect.forkDaemon создаёт daemon файбер, который не привязан к родителю. Его время жизни связано с глобальным scope приложения.
Сигнатура
declare const forkDaemon: <A, E, R>(
self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>
Ключевые характеристики
- Не имеет родительского файбера (не супервизируется)
- Продолжает работать после завершения “создателя”
- Завершается только при shutdown приложения или естественном завершении
- Идеально для фоновых задач: мониторинг, метрики, healthcheck
Базовый пример daemon файбера
// Daemon файбер для фоновой задачи
const daemon = Effect.repeat(
Console.log("daemon: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
// Daemon файбер работает независимо
yield* Effect.forkDaemon(daemon)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
// Daemon продолжает работать!
})
Effect.runFork(parent)
/*
Output:
parent: started!
daemon: still running!
daemon: still running!
daemon: still running!
parent: finished!
daemon: still running!
daemon: still running!
...продолжает бесконечно...
*/
Визуализация daemon
Parent Fiber
╔═══════════════════════════════════╗
║ start running finished ║
╚═══════════════════════════════════╝
│
│ forkDaemon
▼
╔═════════════════════════════════════════════════════════════╗
║ Daemon Fiber (global scope) ║
║ running...running...running...running...running... ║
╠═════════════════════════════════════════════════════════════╣
║ Живёт до shutdown приложения или естественного завершения ║
╚═════════════════════════════════════════════════════════════╝
Daemon не прерывается при прерывании родителя
const daemon = Effect.repeat(
Console.log("daemon: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
yield* Effect.forkDaemon(daemon)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
}).pipe(
Effect.onInterrupt(() => Console.log("parent: interrupted!"))
)
// Программа, которая прерывает parent
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(parent)
yield* Effect.sleep("2 seconds")
yield* Fiber.interrupt(fiber) // Прерываем parent
})
Effect.runFork(program)
/*
Output:
parent: started!
daemon: still running!
daemon: still running!
parent: interrupted!
daemon: still running! ← Daemon продолжает работать!
daemon: still running!
...
*/
Практический пример: Health Check Service
// Глобальное состояние здоровья системы
const createHealthMonitor = Effect.gen(function* () {
const healthStatus = yield* Ref.make({
healthy: true,
lastCheck: Date.now()
})
// Daemon для периодических health checks
const healthChecker = Effect.repeat(
Effect.gen(function* () {
// Симуляция проверки
const isHealthy = Math.random() > 0.1
yield* Ref.set(healthStatus, {
healthy: isHealthy,
lastCheck: Date.now()
})
yield* Effect.log(`Health check: ${isHealthy ? "OK" : "DEGRADED"}`)
}),
Schedule.fixed("5 seconds")
)
yield* Effect.forkDaemon(healthChecker)
return {
getStatus: Ref.get(healthStatus)
}
})
Предупреждения
┌─────────────────────────────────────────────────────────────────┐
│ ⚠️ ОСТОРОЖНО С DAEMON ФАЙБЕРАМИ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Утечки ресурсов │
│ └── Daemon файберы могут держать ресурсы бесконечно │
│ │
│ 2. Нет автоматического cleanup │
│ └── Вы ответственны за их завершение │
│ │
│ 3. Сложность отладки │
│ └── "Призрачные" файберы сложно отследить │
│ │
│ Рекомендация: Используйте только для истинно глобальных │
│ фоновых задач (logging, metrics, health checks) │
│ │
└─────────────────────────────────────────────────────────────────┘
Effect.forkScoped — Привязка к Scope
Effect.forkScoped создаёт файбер, привязанный к текущему Scope. Файбер может пережить своего “создателя”, но будет прерван при закрытии Scope.
Сигнатура
declare const forkScoped: <A, E, R>(
self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R | Scope>
Ключевые характеристики
- Требует Scope в зависимостях (R включает Scope)
- Не привязан к родительскому файберу
- Прерывается при закрытии Scope
- Идеально для задач, привязанных к ресурсам
Базовый пример
// Дочерний файбер
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
// ┌─── Effect<void, never, Scope>
// ▼
const parent = Effect.gen(function* () {
console.log("parent: started!")
// Файбер привязан к Scope, НЕ к parent
yield* Effect.forkScoped(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
// Программа с локальным scope
const program = Effect.scoped(
Effect.gen(function* () {
console.log("Local scope started!")
yield* Effect.fork(parent) // fork parent в своём файбере
yield* Effect.sleep("5 seconds")
console.log("Leaving the local scope!")
})
)
Effect.runFork(program)
/*
Output:
Local scope started!
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished! ← parent завершён, но child продолжает!
child: still running!
child: still running!
Leaving the local scope! ← Scope закрывается, child прерывается
*/
Визуализация
Scope Boundary
═══════════════════════════════════════════════════════════════════
║ ║
║ Parent Fiber ║
║ ╔═══════════════════════╗ ║
║ ║ start finish ║ ║
║ ╚═══════════════════════╝ ║
║ │ ║
║ │ forkScoped ║
║ ▼ ║
║ ╔═══════════════════════════════════════════════════════╗ ║
║ ║ Child Fiber (scoped) ║ ║
║ ║ running...running...running...running...|STOP| ║ ║
║ ╚═══════════════════════════════════════════════════════╝ ║
║ │ ║
═══════════════════════════════════════════════════════════════════
│
Scope closes
Практический пример: Ресурс с фоновой задачей
// Ресурс подключения к БД с фоновым heartbeat
const createDbConnection = Effect.gen(function* () {
console.log("Opening DB connection...")
// Фоновый heartbeat привязан к scope соединения
yield* Effect.forkScoped(
Effect.repeat(
Effect.gen(function* () {
yield* Effect.log("DB heartbeat: ping")
// Симуляция ping
}),
Schedule.fixed("2 seconds")
)
)
yield* Effect.addFinalizer(() =>
Effect.sync(() => console.log("Closing DB connection..."))
)
return {
query: (sql: string) => Effect.succeed(`Result for: ${sql}`)
}
})
const program = Effect.scoped(
Effect.gen(function* () {
const db = yield* createDbConnection
const result = yield* db.query("SELECT * FROM users")
console.log(result)
yield* Effect.sleep("5 seconds")
console.log("Done with database work")
})
)
Effect.runFork(program)
/*
Output:
Opening DB connection...
Result for: SELECT * FROM users
DB heartbeat: ping
DB heartbeat: ping
Done with database work
Closing DB connection...
← heartbeat прекращается с закрытием scope
*/
Effect.forkIn — Fork в конкретный Scope
Effect.forkIn даёт максимальный контроль — вы явно указываете Scope, к которому будет привязан файбер.
Сигнатура
declare const forkIn: (
scope: Scope.Scope
) => <A, E, R>(
self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>
Ключевые характеристики
- Явное указание Scope
- Файбер может пережить и родителя, и текущий scope
- Максимальная гибкость в управлении временем жизни
- Используется для сложных сценариев
Пример с вложенными scope
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
const program = Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() =>
Console.log("The outer scope is about to be closed!")
)
// Захватываем внешний scope
const outerScope = yield* Effect.scope
// Создаём внутренний scope
yield* Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() =>
Console.log("The inner scope is about to be closed!")
)
// Fork child во ВНЕШНИЙ scope
yield* Effect.forkIn(child, outerScope)
yield* Effect.sleep("3 seconds")
})
)
// Inner scope закрыт, но child продолжает!
yield* Effect.sleep("5 seconds")
})
)
Effect.runFork(program)
/*
Output:
child: still running!
child: still running!
child: still running!
The inner scope is about to be closed! ← Inner scope закрыт
child: still running! ← Child продолжает!
child: still running!
child: still running!
child: still running!
child: still running!
child: still running!
The outer scope is about to be closed! ← Outer scope закрыт
← Child прерван
*/
Визуализация forkIn
Outer Scope
╔═════════════════════════════════════════════════════════════════════╗
║ ║
║ Inner Scope ║
║ ╔═════════════════════════╗ ║
║ ║ ║ ║
║ ║ forkIn(child, outer) ║ ║
║ ║ │ ║ ║
║ ╚═════════│═══════════════╝ ║
║ │ ◄── Inner scope закрыт ║
║ │ ║
║ ╔═════════▼═════════════════════════════════════════════════════╗ ║
║ ║ Child Fiber (owned by outer scope) ║ ║
║ ║ running...running...running...running...running...|STOP| ║ ║
║ ╚═══════════════════════════════════════════════════════════════╝ ║
║ ║
╚═════════════════════════════════════════════════════════════════════╝
│
Outer scope closes ▼
Практический пример: Pool с переживающими воркерами
interface Task {
readonly id: number
readonly work: Effect.Effect<void>
}
const createWorkerPool = (size: number) =>
Effect.gen(function* () {
const poolScope = yield* Effect.scope
const taskQueue = yield* Queue.unbounded<Task>()
// Воркеры привязаны к pool scope, а не к caller scope
const workers = yield* Effect.forEach(
Array.from({ length: size }, (_, i) => i),
(workerId) =>
Effect.forkIn(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(taskQueue)
yield* Effect.log(`Worker ${workerId}: executing task ${task.id}`)
yield* task.work
})
),
poolScope
)
)
return {
submit: (task: Task) => Queue.offer(taskQueue, task),
shutdown: Effect.forEach(workers, Fiber.interrupt, { discard: true })
}
})
Effect.forkAll — Массовый fork
Effect.forkAll форкает коллекцию эффектов одновременно.
Сигнатура
declare const forkAll: <A, E, R>(
effects: Iterable<Effect.Effect<A, E, R>>,
options?: { readonly discard?: boolean }
) => Effect.Effect<Fiber.Fiber<A[], E>, never, R>
Пример использования
const tasks = [
Effect.sleep("100 millis").pipe(Effect.as(1)),
Effect.sleep("200 millis").pipe(Effect.as(2)),
Effect.sleep("150 millis").pipe(Effect.as(3))
]
const program = Effect.gen(function* () {
// Fork все задачи одновременно
const fiber = yield* Effect.forkAll(tasks)
console.log("All tasks forked!")
// Join composite fiber
const results = yield* Fiber.join(fiber)
console.log("Results:", results)
})
Effect.runFork(program)
/*
Output:
All tasks forked!
Results: [1, 2, 3]
*/
Опция discard
const effects = [
Effect.log("Task 1"),
Effect.log("Task 2"),
Effect.log("Task 3")
]
// Результаты не нужны, только side effects
const program = Effect.gen(function* () {
const fiber = yield* Effect.forkAll(effects, { discard: true })
yield* Fiber.join(fiber)
})
Когда файберы начинают выполнение
Важно понимать, что форкнутые файберы начинают выполнение после того, как текущий файбер завершит текущую операцию или выполнит yield.
Проблема позднего старта
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
yield* ref.changes.pipe(
Stream.tap((n) => Console.log(`Changed to ${n}`)),
Stream.runDrain,
Effect.fork // Fork здесь
)
// Эти обновления происходят ДО старта файбера!
yield* SubscriptionRef.set(ref, 1)
yield* SubscriptionRef.set(ref, 2)
})
Effect.runFork(program)
/*
Output:
Changed to 2
← Видим только последнее значение!
*/
Решение: Effect.yieldNow или Effect.sleep
const program = Effect.gen(function* () {
const ref = yield* SubscriptionRef.make(0)
yield* ref.changes.pipe(
Stream.tap((n) => Console.log(`Changed to ${n}`)),
Stream.runDrain,
Effect.fork
)
// Даём файберу время запуститься
yield* Effect.sleep("100 millis")
// или: yield* Effect.yieldNow()
yield* SubscriptionRef.set(ref, 1)
yield* SubscriptionRef.set(ref, 2)
})
Effect.runFork(program)
/*
Output:
Changed to 0
Changed to 1
Changed to 2
*/
Недетерминированность
┌─────────────────────────────────────────────────────────────────┐
│ ⚠️ ВЫПОЛНЕНИЕ ФАЙБЕРОВ НЕДЕТЕРМИНИРОВАНО │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Время старта файбера зависит от многих факторов: │
│ │
│ • Планировщик runtime │
│ • Текущая нагрузка │
│ • Количество других файберов │
│ • Внешние факторы │
│ │
│ НЕ полагайтесь на то, что один yield гарантирует │
│ старт файбера в определённый момент. │
│ │
└─────────────────────────────────────────────────────────────────┘
Сравнение стратегий
Таблица сравнения
| Операция | Привязка | Супервизия | Scope в R | Use Case |
|---|---|---|---|---|
fork | Parent fiber | Да | Нет | Краткоживущие параллельные задачи |
forkDaemon | Global | Нет | Нет | Фоновые системные задачи |
forkScoped | Current Scope | Нет | Да | Задачи с временем жизни ресурса |
forkIn | Specified Scope | Нет | Нет | Точный контроль времени жизни |
Дерево решений
Нужен ли файбер, привязанный к родителю?
│
├── Да ──────────► Effect.fork
│
└── Нет
│
Нужен ли файбер в глобальном scope?
│
├── Да ──────────► Effect.forkDaemon
│
└── Нет
│
Нужна привязка к конкретному scope?
│
├── К текущему ────► Effect.forkScoped
│
└── К другому ─────► Effect.forkIn(scope)
Пример выбора стратегии
// Сценарий: HTTP сервер с различными типами задач
// 1. fork — обработка запроса (умирает с handler'ом)
const handleRequest = Effect.gen(function* () {
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.log("Processing request...")
yield* Effect.sleep("100 millis")
yield* Effect.log("Request processed")
})
)
})
// 2. forkDaemon — глобальный health monitor
const startHealthMonitor = Effect.forkDaemon(
Effect.repeat(
Effect.log("Health: OK"),
Schedule.fixed("30 seconds")
)
)
// 3. forkScoped — connection pool worker
const createPoolWorker = Effect.forkScoped(
Effect.forever(
Effect.gen(function* () {
yield* Effect.log("Pool worker: waiting for connection...")
yield* Effect.sleep("1 second")
})
)
)
// 4. forkIn — кастомный scope для группы воркеров
const createWorkerGroup = (groupScope: Scope.Scope) =>
Effect.forkIn(
Effect.repeat(
Effect.log("Worker group task"),
Schedule.fixed("500 millis")
),
groupScope
)
API Reference
Основные функции forking
| Функция | Сигнатура | Описание |
|---|---|---|
Effect.fork | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R> | Fork с автосупервизией |
Effect.forkDaemon | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R> | Fork в глобальный scope |
Effect.forkScoped | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R | Scope> | Fork в текущий scope |
Effect.forkIn | (Scope) → Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R> | Fork в указанный scope |
Effect.forkAll | Iterable<Effect<A, E, R>> → Effect<Fiber<A[], E>, never, R> | Fork коллекции |
Вспомогательные функции
| Функция | Описание |
|---|---|
Effect.forkWithErrorHandler | Fork с обработчиком ошибок |
Effect.ensuring | Гарантировать выполнение finalizer |
Примеры
Пример 1: Параллельная загрузка с прогрессом
interface DownloadTask {
readonly url: string
readonly size: number
}
const createDownloader = Effect.gen(function* () {
const progress = yield* Ref.make(new Map<string, number>())
const download = (task: DownloadTask) =>
Effect.gen(function* () {
let downloaded = 0
while (downloaded < task.size) {
yield* Effect.sleep("50 millis")
downloaded += Math.min(10, task.size - downloaded)
yield* Ref.update(progress, (map) =>
new Map(map).set(task.url, (downloaded / task.size) * 100)
)
}
return task.url
})
const getProgress = Ref.get(progress)
return { download, getProgress }
})
const program = Effect.gen(function* () {
const { download, getProgress } = yield* createDownloader
const tasks: ReadonlyArray<DownloadTask> = [
{ url: "file1.txt", size: 100 },
{ url: "file2.txt", size: 50 },
{ url: "file3.txt", size: 150 }
]
// Fork все загрузки
const fibers = yield* Effect.forEach(tasks, (task) =>
Effect.fork(download(task))
)
// Мониторинг прогресса
yield* Effect.fork(
Effect.repeat(
Effect.gen(function* () {
const p = yield* getProgress
console.log("Progress:", Object.fromEntries(p))
}),
Schedule.fixed("100 millis").pipe(Schedule.intersect(Schedule.recurs(20)))
)
)
// Ждём завершения всех загрузок
const results = yield* Effect.forEach(fibers, Fiber.join)
console.log("Completed:", results)
})
Effect.runFork(program)
Пример 2: Background job processor с daemon
interface Job {
readonly id: string
readonly payload: unknown
}
const createJobProcessor = Effect.gen(function* () {
const jobQueue = yield* Queue.unbounded<Job>()
const processedCount = yield* Ref.make(0)
// Daemon processor - живёт пока живёт приложение
yield* Effect.forkDaemon(
Effect.forever(
Effect.gen(function* () {
const job = yield* Queue.take(jobQueue)
yield* Effect.log(`Processing job: ${job.id}`)
yield* Effect.sleep("100 millis") // Симуляция работы
yield* Ref.update(processedCount, (n) => n + 1)
yield* Effect.log(`Completed job: ${job.id}`)
})
)
)
return {
submit: (job: Job) => Queue.offer(jobQueue, job),
getProcessedCount: Ref.get(processedCount)
}
})
const program = Effect.gen(function* () {
const processor = yield* createJobProcessor
// Отправляем задачи
for (let i = 0; i < 5; i++) {
yield* processor.submit({ id: `job-${i}`, payload: { data: i } })
}
yield* Effect.sleep("1 second")
const count = yield* processor.getProcessedCount
console.log(`Processed ${count} jobs`)
})
Effect.runFork(program)
Пример 3: Resource pool с forkScoped
interface Connection {
readonly id: number
readonly execute: (sql: string) => Effect.Effect<string>
}
const createConnectionPool = (size: number) =>
Effect.gen(function* () {
const connectionId = yield* Ref.make(0)
const available = yield* Queue.bounded<Connection>(size)
// Создаём соединения
for (let i = 0; i < size; i++) {
const id = yield* Ref.updateAndGet(connectionId, (n) => n + 1)
const conn: Connection = {
id,
execute: (sql) => Effect.succeed(`[Conn ${id}] Result: ${sql}`)
}
yield* Queue.offer(available, conn)
}
// Heartbeat worker привязан к scope пула
yield* Effect.forkScoped(
Effect.repeat(
Effect.log("Pool heartbeat: connections alive"),
Schedule.fixed("2 seconds")
)
)
const withConnection = <A, E, R>(
use: (conn: Connection) => Effect.Effect<A, E, R>
) =>
Effect.gen(function* () {
const conn = yield* Queue.take(available)
const result = yield* use(conn).pipe(
Effect.ensuring(Queue.offer(available, conn))
)
return result
})
return { withConnection }
})
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* createConnectionPool(3)
const result = yield* pool.withConnection((conn) =>
conn.execute("SELECT * FROM users")
)
console.log(result)
yield* Effect.sleep("5 seconds")
console.log("Closing pool...")
})
)
Effect.runFork(program)
Упражнения
Упражнение 1: Простой fork и join
Создайте программу, которая форкает три эффекта с разными задержками и собирает результаты.
import { Effect, Fiber } from "effect"
const task = (id: number, delay: number) =>
Effect.gen(function* () {
yield* Effect.sleep(`${delay} millis`)
return `Task ${id} completed`
})
const program = Effect.gen(function* () {
// Ваша реализация:
// 1. Fork три задачи с разными задержками
// 2. Соберите все результаты
// 3. Выведите результаты
???
})import { Effect, Fiber } from "effect"
const task = (id: number, delay: number) =>
Effect.gen(function* () {
yield* Effect.sleep(`${delay} millis`)
return `Task ${id} completed`
})
const program = Effect.gen(function* () {
// Fork три задачи
const fiber1 = yield* Effect.fork(task(1, 100))
const fiber2 = yield* Effect.fork(task(2, 50))
const fiber3 = yield* Effect.fork(task(3, 150))
// Собираем результаты
const result1 = yield* Fiber.join(fiber1)
const result2 = yield* Fiber.join(fiber2)
const result3 = yield* Fiber.join(fiber3)
console.log([result1, result2, result3])
})
Effect.runFork(program)
// Output: ['Task 1 completed', 'Task 2 completed', 'Task 3 completed']Упражнение 2: Daemon logger
Создайте daemon logger, который периодически выводит timestamp.
import { Effect, Schedule } from "effect"
const createDaemonLogger = () =>
// Ваша реализация
???
const program = Effect.gen(function* () {
yield* createDaemonLogger()
// Основная работа
yield* Effect.log("Main: doing work...")
yield* Effect.sleep("3 seconds")
yield* Effect.log("Main: finished!")
// Logger должен продолжить работать
})import { Effect, Schedule, Console } from "effect"
const createDaemonLogger = () =>
Effect.forkDaemon(
Effect.repeat(
Effect.sync(() => console.log(`[${new Date().toISOString()}] heartbeat`)),
Schedule.fixed("500 millis")
)
)
const program = Effect.gen(function* () {
yield* createDaemonLogger()
yield* Effect.log("Main: doing work...")
yield* Effect.sleep("3 seconds")
yield* Effect.log("Main: finished!")
})
Effect.runFork(program)Упражнение 3: Connection с scoped heartbeat
Создайте ресурс соединения с heartbeat, привязанным к scope.
import { Effect, Schedule, Scope } from "effect"
interface Connection {
readonly send: (msg: string) => Effect.Effect<void>
readonly isAlive: Effect.Effect<boolean>
}
const createConnection = (): Effect.Effect<Connection, never, Scope> =>
// Ваша реализация:
// 1. Создайте heartbeat, который проверяет соединение каждые 500ms
// 2. Heartbeat должен быть forkScoped
// 3. При закрытии scope heartbeat должен прекратиться
???import { Effect, Schedule, Scope, Ref } from "effect"
interface Connection {
readonly send: (msg: string) => Effect.Effect<void>
readonly isAlive: Effect.Effect<boolean>
}
const createConnection = (): Effect.Effect<Connection, never, Scope> =>
Effect.gen(function* () {
const alive = yield* Ref.make(true)
// Heartbeat привязан к scope
yield* Effect.forkScoped(
Effect.repeat(
Effect.gen(function* () {
const isAlive = yield* Ref.get(alive)
yield* Effect.log(`Heartbeat: connection ${isAlive ? "alive" : "dead"}`)
}),
Schedule.fixed("500 millis")
)
)
// Finalizer для закрытия
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* Ref.set(alive, false)
yield* Effect.log("Connection closed")
})
)
return {
send: (msg) => Effect.log(`Sending: ${msg}`),
isAlive: Ref.get(alive)
}
})
const program = Effect.scoped(
Effect.gen(function* () {
const conn = yield* createConnection()
yield* conn.send("Hello")
yield* Effect.sleep("2 seconds")
yield* conn.send("World")
yield* Effect.log("Closing scope...")
})
)
Effect.runFork(program)Упражнение 4: Nested scopes с forkIn
Реализуйте систему с тремя уровнями scopes, где файбер форкается в middle scope.
import { Effect, Scope } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
yield* Effect.log("Outer scope started")
// Ваша реализация:
// 1. Создайте middle scope внутри outer
// 2. Внутри middle создайте inner scope
// 3. В inner scope форкните файбер в middle scope
// 4. Покажите, что файбер переживает inner scope
???
})
)import { Effect, Scope, Console, Schedule } from "effect"
const worker = Effect.repeat(
Console.log(" Worker: running"),
Schedule.fixed("300 millis")
)
const program = Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() => Console.log("Outer scope closing"))
yield* Effect.log("Outer scope started")
// Middle scope
yield* Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() => Console.log("Middle scope closing"))
yield* Effect.log("Middle scope started")
// Захватываем middle scope
const middleScope = yield* Effect.scope
// Inner scope
yield* Effect.scoped(
Effect.gen(function* () {
yield* Effect.addFinalizer(() => Console.log("Inner scope closing"))
yield* Effect.log("Inner scope started")
// Fork в middle scope, не в inner!
yield* Effect.forkIn(worker, middleScope)
yield* Effect.sleep("500 millis")
yield* Effect.log("Inner scope finishing")
})
)
// Inner закрыт, но worker продолжает!
yield* Effect.log("Middle scope: inner closed, waiting...")
yield* Effect.sleep("1 second")
yield* Effect.log("Middle scope finishing")
})
)
// Middle закрыт, worker завершён
yield* Effect.log("Outer scope: middle closed")
yield* Effect.sleep("500 millis")
yield* Effect.log("Outer scope finishing")
})
)
Effect.runFork(program)
/*
Output:
Outer scope started
Middle scope started
Inner scope started
Worker: running
Inner scope finishing
Inner scope closing
Middle scope: inner closed, waiting...
Worker: running
Worker: running
Worker: running
Middle scope finishing
Middle scope closing
Outer scope: middle closed
Outer scope finishing
Outer scope closing
*/Упражнение 5: Worker Pool с динамическим масштабированием
Создайте pool воркеров, который автоматически масштабируется в зависимости от нагрузки.
import { Effect, Queue, Fiber, Ref, Scope } from "effect"
interface ScalablePool {
readonly submit: <A>(task: Effect.Effect<A>) => Effect.Effect<A>
readonly getWorkerCount: Effect.Effect<number>
readonly shutdown: Effect.Effect<void>
}
const createScalablePool = (config: {
readonly minWorkers: number
readonly maxWorkers: number
readonly scaleUpThreshold: number // Добавить воркера если очередь > threshold
readonly scaleDownThreshold: number // Убрать воркера если очередь < threshold
}): Effect.Effect<ScalablePool, never, Scope> =>
// Ваша реализация
???import { Effect, Queue, Fiber, Ref, Scope, Deferred, Schedule } from "effect"
interface ScalablePool {
readonly submit: <A>(task: Effect.Effect<A>) => Effect.Effect<A>
readonly getWorkerCount: Effect.Effect<number>
readonly shutdown: Effect.Effect<void>
}
interface WorkItem {
readonly task: Effect.Effect<unknown>
readonly deferred: Deferred.Deferred<unknown, unknown>
}
const createScalablePool = (config: {
readonly minWorkers: number
readonly maxWorkers: number
readonly scaleUpThreshold: number
readonly scaleDownThreshold: number
}): Effect.Effect<ScalablePool, never, Scope> =>
Effect.gen(function* () {
const poolScope = yield* Effect.scope
const queue = yield* Queue.unbounded<WorkItem>()
const workers = yield* Ref.make<Array<Fiber.RuntimeFiber<void, never>>>([])
const shuttingDown = yield* Ref.make(false)
const createWorker = () =>
Effect.gen(function* () {
while (true) {
const item = yield* Queue.take(queue)
const exit = yield* Effect.exit(item.task)
if (exit._tag === "Success") {
yield* Deferred.succeed(item.deferred, exit.value)
} else {
yield* Deferred.failCause(item.deferred, exit.cause)
}
}
}).pipe(Effect.interruptible)
const addWorker = Effect.gen(function* () {
const currentWorkers = yield* Ref.get(workers)
if (currentWorkers.length < config.maxWorkers) {
const fiber = yield* Effect.forkIn(createWorker(), poolScope)
yield* Ref.update(workers, (ws) => [...ws, fiber])
yield* Effect.log(`Worker added. Total: ${currentWorkers.length + 1}`)
}
})
const removeWorker = Effect.gen(function* () {
const currentWorkers = yield* Ref.get(workers)
if (currentWorkers.length > config.minWorkers) {
const [toRemove, remaining] = [
currentWorkers[currentWorkers.length - 1],
currentWorkers.slice(0, -1)
]
if (toRemove) {
yield* Fiber.interrupt(toRemove)
yield* Ref.set(workers, remaining)
yield* Effect.log(`Worker removed. Total: ${remaining.length}`)
}
}
})
// Инициализация минимального количества воркеров
yield* Effect.forEach(
Array.from({ length: config.minWorkers }),
() => addWorker,
{ discard: true }
)
// Autoscaler daemon
yield* Effect.forkScoped(
Effect.repeat(
Effect.gen(function* () {
const isShuttingDown = yield* Ref.get(shuttingDown)
if (isShuttingDown) return
const queueSize = yield* Queue.size(queue)
const workerCount = yield* Ref.get(workers).pipe(Effect.map((w) => w.length))
if (queueSize > config.scaleUpThreshold && workerCount < config.maxWorkers) {
yield* addWorker
} else if (queueSize < config.scaleDownThreshold && workerCount > config.minWorkers) {
yield* removeWorker
}
}),
Schedule.fixed("500 millis")
)
)
const pool: ScalablePool = {
submit: <A>(task: Effect.Effect<A>) =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, unknown>()
yield* Queue.offer(queue, { task, deferred } as WorkItem)
return yield* Deferred.await(deferred) as A
}),
getWorkerCount: Ref.get(workers).pipe(Effect.map((w) => w.length)),
shutdown: Effect.gen(function* () {
yield* Ref.set(shuttingDown, true)
yield* Queue.shutdown(queue)
const ws = yield* Ref.get(workers)
yield* Effect.forEach(ws, Fiber.interrupt, { discard: true })
})
}
return pool
})
// Тест
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* createScalablePool({
minWorkers: 2,
maxWorkers: 5,
scaleUpThreshold: 5,
scaleDownThreshold: 1
})
// Создаём нагрузку
const tasks = Array.from({ length: 20 }, (_, i) =>
pool.submit(
Effect.gen(function* () {
yield* Effect.log(`Task ${i} started`)
yield* Effect.sleep(`${100 + Math.random() * 200} millis`)
yield* Effect.log(`Task ${i} completed`)
return i
})
)
)
// Мониторинг
yield* Effect.fork(
Effect.repeat(
Effect.gen(function* () {
const count = yield* pool.getWorkerCount
yield* Effect.log(`Current workers: ${count}`)
}),
Schedule.fixed("300 millis").pipe(Schedule.intersect(Schedule.recurs(20)))
)
)
const results = yield* Effect.all(tasks, { concurrency: "unbounded" })
yield* Effect.log(`Completed ${results.length} tasks`)
yield* pool.shutdown
})
)
Effect.runFork(program)Заключение
Forking в Effect предоставляет четыре стратегии управления жизненным циклом файберов:
Effect.fork— структурная конкурентность с автосупервизиейEffect.forkDaemon— глобальные фоновые задачиEffect.forkScoped— привязка к времени жизни ресурсаEffect.forkIn— точный контроль над scope
Выбор правильной стратегии критичен для:
- Предотвращения утечек ресурсов
- Корректной обработки ошибок
- Предсказуемого поведения приложения
В следующей статье мы подробно рассмотрим операции для работы с уже созданными файберами: join, await, poll и interrupt.