FiberMap, FiberSet
Управление коллекциями файберов.
Введение в коллекции файберов
При работе с конкурентными системами часто возникает необходимость управлять динамическими наборами файберов. Например, пул воркеров, активные подключения, параллельные задачи. Effect предоставляет три абстракции для этих целей.
Проблемы ручного управления файберами
┌─────────────────────────────────────────────────────────────────┐
│ ПРОБЛЕМЫ РУЧНОГО УПРАВЛЕНИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Tracking │
│ ├── Где хранить ссылки на файберы? │
│ ├── Как отслеживать завершённые? │
│ └── Как избежать утечек памяти? │
│ │
│ 2. Cleanup │
│ ├── Ручной interrupt каждого файбера │
│ ├── Забытые файберы продолжают работать │
│ └── Сложно гарантировать cleanup при ошибках │
│ │
│ 3. Coordination │
│ ├── Ожидание завершения всех файберов │
│ ├── Propagation ошибок │
│ └── Graceful shutdown │
│ │
│ 4. Concurrency Safety │
│ ├── Race conditions при добавлении/удалении │
│ └── Inconsistent state │
│ │
└─────────────────────────────────────────────────────────────────┘
Решение: FiberSet, FiberMap, FiberHandle
┌─────────────────────────────────────────────────────────────────┐
│ КОЛЛЕКЦИИ ФАЙБЕРОВ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ FiberSet<A, E> │
│ ├── Неупорядоченная коллекция файберов │
│ ├── Автоматическое удаление завершённых │
│ └── Идеально для: пулов воркеров, параллельных задач │
│ │
│ FiberMap<K, A, E> │
│ ├── Коллекция с ключами (как Map) │
│ ├── Lookup по ключу O(1) │
│ └── Идеально для: именованных воркеров, по-клиентных задач │
│ │
│ FiberHandle<A, E> │
│ ├── Контейнер для одного файбера │
│ ├── Замена старого при новом запуске │
│ └── Идеально для: singleton задач, текущей операции │
│ │
└─────────────────────────────────────────────────────────────────┘
Общие характеристики
Все три абстракции имеют общие свойства:
┌─────────────────────────────────────────────────────────────────┐
│ ОБЩИЕ ХАРАКТЕРИСТИКИ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ✓ Scope-привязка │
│ └── Автоматический interrupt при закрытии Scope │
│ │
│ ✓ Автоматический cleanup │
│ └── Завершённые файберы удаляются автоматически │
│ │
│ ✓ Join/Await операции │
│ └── Ожидание всех файберов с propagation ошибок │
│ │
│ ✓ Runtime функции │
│ └── makeRuntime, makeRuntimePromise │
│ │
│ ✓ Iterable │
│ └── Можно итерировать по файберам │
│ │
└─────────────────────────────────────────────────────────────────┘
FiberSet — Неупорядоченная коллекция
FiberSet<A, E> — это коллекция файберов без определённого порядка. Файберы автоматически удаляются из коллекции при завершении.
Создание FiberSet
// FiberSet требует Scope для lifecycle management
const program = Effect.scoped(
Effect.gen(function* () {
// Создаём FiberSet
const set = yield* FiberSet.make<number, Error>()
// Используем set...
// При выходе из scope все файберы будут прерваны
})
)
Effect.runFork(program)
Сигнатура типа
interface FiberSet<out A = unknown, out E = unknown>
extends Iterable<Fiber.RuntimeFiber<A, E>> {
// Deferred для сигнализации о закрытии
readonly deferred: Deferred.Deferred<void, unknown>
// Внутреннее состояние
readonly state:
| { readonly _tag: "Open"; readonly backing: Set<Fiber.RuntimeFiber<A, E>> }
| { readonly _tag: "Closed" }
}
Добавление файберов
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number>()
// Способ 1: Добавить существующий файбер
const fiber = yield* Effect.fork(Effect.succeed(42))
yield* FiberSet.add(set, fiber)
// Способ 2: Fork и добавить одной операцией (рекомендуется)
yield* FiberSet.run(set, Effect.succeed(100))
yield* FiberSet.run(set, Effect.succeed(200))
// Получаем размер
const size = yield* FiberSet.size(set)
console.log("Fibers in set:", size)
})
)
FiberSet.run — Fork и добавление
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<string>()
// run создаёт файбер и добавляет его в set
// Возвращает RuntimeFiber
const fiber1 = yield* FiberSet.run(set,
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return "task-1"
})
)
const fiber2 = yield* FiberSet.run(set,
Effect.gen(function* () {
yield* Effect.sleep("200 millis")
return "task-2"
})
)
console.log("Started fibers:", fiber1.id(), fiber2.id())
// Файберы автоматически удаляются при завершении
yield* Effect.sleep("300 millis")
const size = yield* FiberSet.size(set)
console.log("Remaining:", size) // 0
})
)
FiberSet.join — Ожидание всех файберов
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number, Error>()
// Запускаем несколько задач
yield* FiberSet.run(set, Effect.sleep("50 millis").pipe(Effect.as(1)))
yield* FiberSet.run(set, Effect.sleep("100 millis").pipe(Effect.as(2)))
yield* FiberSet.run(set, Effect.sleep("150 millis").pipe(Effect.as(3)))
// join ожидает завершения ВСЕХ файберов
// Если любой файбер завершится с ошибкой, join тоже завершится с ошибкой
yield* FiberSet.join(set)
console.log("All tasks completed!")
})
)
FiberSet.join с ошибками
class TaskError {
readonly _tag = "TaskError"
constructor(readonly message: string) {}
}
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number, TaskError>()
yield* FiberSet.run(set, Effect.sleep("50 millis").pipe(Effect.as(1)))
yield* FiberSet.run(set, Effect.fail(new TaskError("Task 2 failed")))
yield* FiberSet.run(set, Effect.sleep("150 millis").pipe(Effect.as(3)))
// join завершится с первой ошибкой
yield* FiberSet.join(set)
})
).pipe(
Effect.catchTag("TaskError", (e) =>
Effect.log(`Caught error: ${e.message}`)
)
)
Effect.runFork(program)
// Output: Caught error: Task 2 failed
FiberSet.awaitEmpty — Ожидание опустошения
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<void>()
// Запускаем задачи
yield* FiberSet.run(set, Effect.sleep("100 millis"))
yield* FiberSet.run(set, Effect.sleep("200 millis"))
console.log("Waiting for set to become empty...")
// awaitEmpty ожидает когда set станет пустым
// НЕ propagates ошибки, просто ждёт
yield* FiberSet.awaitEmpty(set)
console.log("Set is empty!")
})
)
FiberSet.clear — Очистка с прерыванием
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<void>()
// Запускаем долгие задачи
yield* FiberSet.run(set, Effect.sleep("10 seconds"))
yield* FiberSet.run(set, Effect.sleep("10 seconds"))
const before = yield* FiberSet.size(set)
console.log("Before clear:", before) // 2
// clear прерывает все файберы и очищает set
yield* FiberSet.clear(set)
const after = yield* FiberSet.size(set)
console.log("After clear:", after) // 0
})
)
Итерация по FiberSet
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number>()
yield* FiberSet.run(set, Effect.succeed(1))
yield* FiberSet.run(set, Effect.succeed(2))
yield* FiberSet.run(set, Effect.succeed(3))
// FiberSet — это Iterable
for (const fiber of set) {
const status = yield* Fiber.status(fiber)
console.log(`Fiber ${fiber.id()}: ${status._tag}`)
}
})
)
FiberMap — Коллекция с ключами
FiberMap<K, A, E> — это коллекция файберов, индексированная по ключу. Позволяет быстро находить, заменять или удалять файберы по ключу.
Создание FiberMap
const program = Effect.scoped(
Effect.gen(function* () {
// Типизированный FiberMap
// K = string (тип ключа)
// A = number (тип результата)
// E = Error (тип ошибки)
const map = yield* FiberMap.make<string, number, Error>()
// Используем map...
})
)
Сигнатура типа
interface FiberMap<in out K, out A = unknown, out E = unknown>
extends Iterable<[K, Fiber.RuntimeFiber<A, E>]> {
readonly deferred: Deferred.Deferred<void, unknown>
readonly state:
| {
readonly _tag: "Open"
readonly backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>
}
| { readonly _tag: "Closed" }
}
Добавление и запуск файберов
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, number>()
// Способ 1: Добавить существующий файбер
const fiber = yield* Effect.fork(Effect.succeed(42))
yield* FiberMap.set(map, "task-1", fiber)
// Способ 2: Fork и добавить (рекомендуется)
yield* FiberMap.run(map, "task-2", Effect.succeed(100))
yield* FiberMap.run(map, "task-3", Effect.succeed(200))
const size = yield* FiberMap.size(map)
console.log("Fibers in map:", size) // 3
})
)
FiberMap.run с заменой
При запуске файбера с уже существующим ключом, старый файбер прерывается:
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, string>()
// Запускаем долгую задачу
yield* FiberMap.run(map, "current-job",
Effect.gen(function* () {
yield* Effect.log("Job 1 started")
yield* Effect.sleep("10 seconds")
return "Job 1 done"
})
)
yield* Effect.sleep("100 millis")
// Запускаем новую задачу с тем же ключом
// Job 1 будет прервана!
yield* FiberMap.run(map, "current-job",
Effect.gen(function* () {
yield* Effect.log("Job 2 started")
yield* Effect.sleep("50 millis")
return "Job 2 done"
})
)
yield* FiberMap.join(map)
// Output:
// Job 1 started
// Job 2 started
// (Job 1 была прервана, Job 2 завершилась)
})
)
FiberMap.run с onlyIfMissing
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, string>()
// Запускаем первую задачу
yield* FiberMap.run(map, "singleton",
Effect.gen(function* () {
yield* Effect.sleep("1 second")
return "first"
})
)
// Пытаемся запустить вторую с тем же ключом
// С onlyIfMissing: true — НЕ заменит существующую
yield* FiberMap.run(map, "singleton",
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return "second"
}),
{ onlyIfMissing: true }
)
yield* FiberMap.join(map)
// Выполнится только первая задача
})
)
FiberMap.get — Получение файбера
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, number>()
yield* FiberMap.run(map, "task-1", Effect.succeed(42))
// get возвращает Effect с ошибкой NoSuchElementException если ключа нет
const fiber = yield* FiberMap.get(map, "task-1")
const result = yield* Fiber.join(fiber)
console.log("Result:", result) // 42
// Безопасный вариант через Effect.option
const maybeFiber = yield* FiberMap.get(map, "unknown").pipe(
Effect.option
)
console.log("Maybe:", Option.isNone(maybeFiber)) // true
})
)
FiberMap.has — Проверка наличия
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, void>()
yield* FiberMap.run(map, "active-task", Effect.never)
const hasActive = yield* FiberMap.has(map, "active-task")
const hasOther = yield* FiberMap.has(map, "other-task")
console.log("Has active-task:", hasActive) // true
console.log("Has other-task:", hasOther) // false
})
)
FiberMap.remove — Удаление с прерыванием
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, void>()
yield* FiberMap.run(map, "task-to-remove",
Effect.gen(function* () {
yield* Effect.log("Task started")
yield* Effect.sleep("10 seconds")
yield* Effect.log("Task completed") // Не выполнится
})
)
yield* Effect.sleep("100 millis")
// remove прерывает файбер и удаляет его из map
yield* FiberMap.remove(map, "task-to-remove")
const has = yield* FiberMap.has(map, "task-to-remove")
console.log("Has after remove:", has) // false
})
)
FiberMap.join и awaitEmpty
class WorkerError {
readonly _tag = "WorkerError"
constructor(readonly workerId: string) {}
}
const program = Effect.scoped(
Effect.gen(function* () {
const workers = yield* FiberMap.make<string, void, WorkerError>()
// Запускаем воркеры
yield* FiberMap.run(workers, "worker-1",
Effect.sleep("100 millis")
)
yield* FiberMap.run(workers, "worker-2",
Effect.sleep("200 millis")
)
yield* FiberMap.run(workers, "worker-3",
Effect.fail(new WorkerError("worker-3"))
)
// join завершится с ошибкой от worker-3
yield* FiberMap.join(workers).pipe(
Effect.catchTag("WorkerError", (e) =>
Effect.log(`Worker ${e.workerId} failed`)
)
)
})
)
Итерация по FiberMap
const program = Effect.scoped(
Effect.gen(function* () {
const map = yield* FiberMap.make<string, number>()
yield* FiberMap.run(map, "a", Effect.succeed(1))
yield* FiberMap.run(map, "b", Effect.succeed(2))
yield* FiberMap.run(map, "c", Effect.succeed(3))
// FiberMap — это Iterable<[K, RuntimeFiber<A, E>]>
for (const [key, fiber] of map) {
const status = yield* Fiber.status(fiber)
console.log(`${key}: ${status._tag}`)
}
})
)
FiberHandle — Единичный файбер
FiberHandle<A, E> — это контейнер для одного файбера. При запуске нового файбера старый автоматически прерывается. Идеально для singleton-задач.
Создание FiberHandle
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<string, Error>()
// handle содержит максимум один файбер
})
)
Сигнатура типа
interface FiberHandle<out A = unknown, out E = unknown> {
readonly deferred: Deferred.Deferred<void, unknown>
readonly state:
| {
readonly _tag: "Open"
readonly fiber: Fiber.RuntimeFiber<A, E> | undefined
}
| { readonly _tag: "Closed" }
}
FiberHandle.run — Запуск с заменой
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<string>()
// Запускаем первую задачу
yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.log("Task 1 started")
yield* Effect.sleep("5 seconds")
yield* Effect.log("Task 1 done") // Не выполнится
return "result-1"
})
)
yield* Effect.sleep("100 millis")
// Запускаем вторую задачу — первая прерывается!
yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.log("Task 2 started")
yield* Effect.sleep("100 millis")
yield* Effect.log("Task 2 done")
return "result-2"
})
)
yield* FiberHandle.join(handle)
})
)
Effect.runFork(program)
/*
Output:
Task 1 started
Task 2 started
Task 2 done
*/
FiberHandle.get — Получение текущего файбера
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<number>()
// Изначально пусто
const empty = yield* FiberHandle.get(handle).pipe(Effect.option)
console.log("Before run:", Option.isNone(empty)) // true
yield* FiberHandle.run(handle, Effect.succeed(42))
// Теперь есть файбер
const fiber = yield* FiberHandle.get(handle)
const result = yield* Fiber.join(fiber)
console.log("Result:", result) // 42
})
)
FiberHandle.clear — Очистка
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<void>()
yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.log("Long task started")
yield* Effect.sleep("10 seconds")
yield* Effect.log("Long task done")
})
)
yield* Effect.sleep("100 millis")
// clear прерывает текущий файбер
yield* FiberHandle.clear(handle)
yield* Effect.log("Handle cleared")
})
)
FiberHandle.join и awaitEmpty
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<string>()
yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return "done"
})
)
// join ожидает текущий файбер
yield* FiberHandle.join(handle)
console.log("Joined!")
// awaitEmpty ожидает когда handle станет пустым
// (полезно после clear или когда файбер завершился)
yield* FiberHandle.awaitEmpty(handle)
console.log("Empty!")
})
)
Паттерн: Текущая операция
interface SearchService {
readonly search: (query: string) => Effect.Effect<ReadonlyArray<string>>
readonly cancel: Effect.Effect<void>
}
const SearchService = Effect.gen(function* () {
const handle = yield* FiberHandle.make<ReadonlyArray<string>>()
const search = (query: string) =>
Effect.gen(function* () {
// Запуск нового поиска отменяет предыдущий
const fiber = yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.log(`Searching: ${query}`)
yield* Effect.sleep("500 millis") // Симуляция API
return [`Result for "${query}" 1`, `Result for "${query}" 2`]
})
)
return yield* Fiber.join(fiber)
})
const cancel = FiberHandle.clear(handle)
return { search, cancel } as const
})
Lifecycle и Scope
Все три коллекции привязаны к Scope. При закрытии Scope все файберы в коллекции автоматически прерываются.
Визуализация Lifecycle
┌─────────────────────────────────────────────────────────────────┐
│ Scope Lifecycle │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Effect.scoped( │
│ Effect.gen(function* () { │
│ const set = yield* FiberSet.make() │
│ ───────────────────────────────────────────────────────── │
│ │ │
│ │ FiberSet.run(set, task1) ──► Fiber #1 running │
│ │ FiberSet.run(set, task2) ──► Fiber #2 running │
│ │ FiberSet.run(set, task3) ──► Fiber #3 running │
│ │ │
│ │ ... работа с файберами ... │
│ │ │
│ │ Fiber #1 completes ──► removed from set │
│ │ Fiber #2 still running │
│ │ Fiber #3 still running │
│ │ │
│ ───────────────────────────────────────────────────────── │
│ }) ◄── Scope closes │
│ ) │
│ │ │
│ └── Fiber #2 interrupted │
│ Fiber #3 interrupted │
│ FiberSet closed │
│ │
└─────────────────────────────────────────────────────────────────┘
Практический пример: HTTP сервер с активными соединениями
interface Connection {
readonly id: string
readonly handle: (data: string) => Effect.Effect<void>
}
const ConnectionManager = Effect.gen(function* () {
// FiberMap для активных соединений
const connections = yield* FiberMap.make<string, void>()
const addConnection = (conn: Connection) =>
FiberMap.run(connections, conn.id,
Effect.gen(function* () {
yield* Effect.log(`Connection ${conn.id} established`)
// Симуляция обработки данных
yield* Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* conn.handle(`ping from ${conn.id}`)
})
)
}).pipe(
Effect.onInterrupt(() =>
Effect.log(`Connection ${conn.id} closed`)
)
)
)
const removeConnection = (id: string) =>
FiberMap.remove(connections, id)
const getActiveConnections = Effect.gen(function* () {
const ids: string[] = []
for (const [id] of connections) {
ids.push(id)
}
return ids
})
const shutdown = Effect.gen(function* () {
yield* Effect.log("Shutting down all connections...")
yield* FiberMap.clear(connections)
})
return {
addConnection,
removeConnection,
getActiveConnections,
shutdown
} as const
})
// Использование
const program = Effect.scoped(
Effect.gen(function* () {
const manager = yield* ConnectionManager
// Добавляем соединения
yield* manager.addConnection({
id: "conn-1",
handle: (data) => Effect.log(`Received: ${data}`)
})
yield* manager.addConnection({
id: "conn-2",
handle: (data) => Effect.log(`Received: ${data}`)
})
yield* Effect.sleep("3 seconds")
const active = yield* manager.getActiveConnections
console.log("Active connections:", active)
// При выходе из scope все соединения закроются
})
)
Propagation of Interruption
По умолчанию join не propagates прерывания. Можно включить это поведение через опцию propagateInterruption.
Без propagation (по умолчанию)
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<void>()
// Файбер, который будет прерван
yield* FiberSet.run(set, Effect.interrupt)
// join НЕ завершится с ошибкой прерывания
// Просто дождётся завершения
yield* FiberSet.join(set)
console.log("Completed (interruption was swallowed)")
})
)
С propagateInterruption
const program = Effect.scoped(
Effect.gen(function* () {
const handle = yield* FiberHandle.make<void>()
// Запускаем с propagateInterruption
yield* FiberHandle.run(handle, Effect.interrupt, {
propagateInterruption: true
})
// join теперь propagates прерывание
yield* FiberHandle.join(handle)
})
).pipe(
Effect.catchAllCause((cause) =>
Effect.log(`Caught cause: ${cause}`)
)
)
Effect.runFork(program)
// Output: Caught cause: Interrupt(...)
Паттерн: Отмена всех задач при ошибке
class CriticalError {
readonly _tag = "CriticalError"
}
const program = Effect.scoped(
Effect.gen(function* () {
const workers = yield* FiberSet.make<void, CriticalError>()
// Запускаем несколько воркеров
yield* FiberSet.run(workers,
Effect.gen(function* () {
yield* Effect.sleep("5 seconds")
yield* Effect.log("Worker 1 done")
})
)
yield* FiberSet.run(workers,
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
// Критическая ошибка
return yield* Effect.fail(new CriticalError())
}),
{ propagateInterruption: true }
)
yield* FiberSet.run(workers,
Effect.gen(function* () {
yield* Effect.sleep("5 seconds")
yield* Effect.log("Worker 3 done")
})
)
// join завершится с CriticalError
// Все остальные файберы будут прерваны при выходе из scope
yield* FiberSet.join(workers)
})
).pipe(
Effect.catchTag("CriticalError", () =>
Effect.log("Critical error occurred, all workers stopped")
)
)
Runtime функции
FiberSet, FiberMap и FiberHandle предоставляют функции для создания runtime с привязкой к коллекции.
FiberSet.makeRuntime
interface Logger {
readonly log: (msg: string) => Effect.Effect<void>
}
const Logger = Context.GenericTag<Logger>("Logger")
const LoggerLive = Logger.of({
log: (msg) => Effect.log(msg)
})
const program = Effect.scoped(
Effect.gen(function* () {
// makeRuntime создаёт функцию run с контекстом
const run = yield* FiberSet.makeRuntime<Logger>().pipe(
Effect.provideService(Logger, LoggerLive)
)
// Теперь можно запускать эффекты с доступом к Logger
const fiber1 = run(
Effect.gen(function* () {
const logger = yield* Logger
yield* logger.log("Task 1 running")
yield* Effect.sleep("100 millis")
return "result-1"
})
)
const fiber2 = run(
Effect.gen(function* () {
const logger = yield* Logger
yield* logger.log("Task 2 running")
return "result-2"
})
)
// Файберы добавлены в FiberSet автоматически
})
)
FiberMap.makeRuntime
const program = Effect.scoped(
Effect.gen(function* () {
// makeRuntime для FiberMap принимает ключ
const run = yield* FiberMap.makeRuntime<never, string>()
// Первый аргумент — ключ
run("task-a", Effect.succeed(1))
run("task-b", Effect.succeed(2))
run("task-c", Effect.succeed(3))
})
)
makeRuntimePromise — Для Promise-based кода
const program = Effect.scoped(
Effect.gen(function* () {
// makeRuntimePromise возвращает функцию, возвращающую Promise
const runPromise = yield* FiberSet.makeRuntimePromise<never>()
// Можно использовать с async/await
const result1 = await runPromise(Effect.succeed(42))
console.log("Result 1:", result1)
const result2 = await runPromise(Effect.succeed("hello"))
console.log("Result 2:", result2)
})
)
Паттерны использования
Паттерн 1: Worker Pool
interface Task<A> {
readonly id: string
readonly execute: Effect.Effect<A>
}
interface WorkerPool<A> {
readonly submit: (task: Task<A>) => Effect.Effect<void>
readonly shutdown: Effect.Effect<void>
readonly stats: Effect.Effect<{ active: number; completed: number }>
}
const createWorkerPool = <A>(
workerCount: number
): Effect.Effect<WorkerPool<A>, never, Scope.Scope> =>
Effect.gen(function* () {
const workers = yield* FiberSet.make<void>()
const taskQueue = yield* Queue.unbounded<Task<A>>()
const completedCount = yield* Ref.make(0)
// Создаём воркеры
for (let i = 0; i < workerCount; i++) {
yield* FiberSet.run(workers,
Effect.gen(function* () {
while (true) {
const task = yield* Queue.take(taskQueue)
yield* Effect.log(`Worker ${i}: executing ${task.id}`)
yield* task.execute
yield* Ref.update(completedCount, (n) => n + 1)
}
}).pipe(Effect.interruptible)
)
}
return {
submit: (task) => Queue.offer(taskQueue, task),
shutdown: Effect.gen(function* () {
yield* Queue.shutdown(taskQueue)
yield* FiberSet.clear(workers)
}),
stats: Effect.gen(function* () {
const active = yield* FiberSet.size(workers)
const completed = yield* Ref.get(completedCount)
return { active, completed }
})
}
})
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* createWorkerPool<string>(3)
// Отправляем задачи
for (let i = 0; i < 10; i++) {
yield* pool.submit({
id: `task-${i}`,
execute: Effect.sleep(`${50 + Math.random() * 100} millis`).pipe(
Effect.as(`Result ${i}`)
)
})
}
yield* Effect.sleep("1 second")
const stats = yield* pool.stats
console.log("Stats:", stats)
})
)
Паттерн 2: Per-User Tasks
interface UserTask {
readonly userId: string
readonly task: Effect.Effect<void>
}
interface UserTaskManager {
readonly startTask: (task: UserTask) => Effect.Effect<void>
readonly cancelTask: (userId: string) => Effect.Effect<void>
readonly isTaskRunning: (userId: string) => Effect.Effect<boolean>
}
const createUserTaskManager = (): Effect.Effect<
UserTaskManager,
never,
Scope.Scope
> =>
Effect.gen(function* () {
const tasks = yield* FiberMap.make<string, void>()
return {
startTask: ({ userId, task }) =>
FiberMap.run(tasks, userId,
task.pipe(
Effect.onInterrupt(() =>
Effect.log(`Task for user ${userId} was cancelled`)
)
)
).pipe(Effect.asVoid),
cancelTask: (userId) =>
FiberMap.remove(tasks, userId),
isTaskRunning: (userId) =>
FiberMap.has(tasks, userId)
}
})
const program = Effect.scoped(
Effect.gen(function* () {
const manager = yield* createUserTaskManager()
// Запускаем задачи для разных пользователей
yield* manager.startTask({
userId: "user-1",
task: Effect.gen(function* () {
yield* Effect.log("User 1: working...")
yield* Effect.sleep("500 millis")
yield* Effect.log("User 1: done!")
})
})
yield* manager.startTask({
userId: "user-2",
task: Effect.gen(function* () {
yield* Effect.log("User 2: working...")
yield* Effect.sleep("1 second")
yield* Effect.log("User 2: done!")
})
})
yield* Effect.sleep("200 millis")
// Отменяем задачу user-2
yield* manager.cancelTask("user-2")
yield* Effect.sleep("1 second")
})
)
Паттерн 3: Debounced Search
interface SearchResult {
readonly query: string
readonly results: ReadonlyArray<string>
}
const createDebouncedSearch = (
debounceMs: number,
search: (query: string) => Effect.Effect<ReadonlyArray<string>>
): Effect.Effect<
(query: string) => Effect.Effect<SearchResult>,
never,
Scope.Scope
> =>
Effect.gen(function* () {
const handle = yield* FiberHandle.make<SearchResult>()
return (query: string) =>
Effect.gen(function* () {
// Отменяем предыдущий поиск
yield* FiberHandle.clear(handle)
// Запускаем новый с debounce
const fiber = yield* FiberHandle.run(handle,
Effect.gen(function* () {
yield* Effect.sleep(`${debounceMs} millis`)
const results = yield* search(query)
return { query, results }
})
)
return yield* Fiber.join(fiber)
})
})
const program = Effect.scoped(
Effect.gen(function* () {
const searchDebounced = yield* createDebouncedSearch(
300,
(query) => Effect.succeed([`Result 1 for ${query}`, `Result 2 for ${query}`])
)
// Быстрые последовательные запросы
const results = yield* Effect.race(
searchDebounced("a"),
Effect.gen(function* () {
yield* Effect.sleep("50 millis")
return yield* searchDebounced("ab")
})
).pipe(
Effect.race(
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return yield* searchDebounced("abc")
})
)
)
// Только последний поиск "abc" выполнится
console.log("Results:", results)
})
)
API Reference
FiberSet API
| Функция | Сигнатура | Описание |
|---|---|---|
FiberSet.make | () => Effect<FiberSet<A, E>, never, Scope> | Создать FiberSet |
FiberSet.add | (set, fiber, options?) => Effect<void> | Добавить файбер |
FiberSet.run | (set, effect, options?) => Effect<RuntimeFiber> | Fork и добавить |
FiberSet.join | (set) => Effect<void, E> | Ждать все файберы |
FiberSet.awaitEmpty | (set) => Effect<void> | Ждать опустошения |
FiberSet.clear | (set) => Effect<void> | Очистить с interrupt |
FiberSet.size | (set) => Effect<number> | Количество файберов |
FiberSet.makeRuntime | <R>() => Effect<RunFn, never, Scope | R> | Runtime функция |
FiberSet.makeRuntimePromise | <R>() => Effect<RunPromiseFn, never, Scope | R> | Promise runtime |
FiberMap API
| Функция | Сигнатура | Описание |
|---|---|---|
FiberMap.make | () => Effect<FiberMap<K, A, E>, never, Scope> | Создать FiberMap |
FiberMap.set | (map, key, fiber) => Effect<void> | Добавить файбер |
FiberMap.run | (map, key, effect, options?) => Effect<RuntimeFiber> | Fork и добавить |
FiberMap.get | (map, key) => Effect<RuntimeFiber, NoSuchElementException> | Получить файбер |
FiberMap.has | (map, key) => Effect<boolean> | Проверить наличие |
FiberMap.remove | (map, key) => Effect<void> | Удалить с interrupt |
FiberMap.join | (map) => Effect<void, E> | Ждать все файберы |
FiberMap.awaitEmpty | (map) => Effect<void> | Ждать опустошения |
FiberMap.clear | (map) => Effect<void> | Очистить с interrupt |
FiberMap.size | (map) => Effect<number> | Количество файберов |
FiberHandle API
| Функция | Сигнатура | Описание |
|---|---|---|
FiberHandle.make | () => Effect<FiberHandle<A, E>, never, Scope> | Создать FiberHandle |
FiberHandle.run | (handle, effect, options?) => Effect<RuntimeFiber> | Запустить (заменить) |
FiberHandle.get | (handle) => Effect<RuntimeFiber, NoSuchElementException> | Получить файбер |
FiberHandle.join | (handle) => Effect<void, E> | Ждать файбер |
FiberHandle.awaitEmpty | (handle) => Effect<void> | Ждать опустошения |
FiberHandle.clear | (handle) => Effect<void> | Очистить с interrupt |
Общие опции
interface RunOptions {
// Если true, join propagates прерывания
readonly propagateInterruption?: boolean
}
interface FiberMapRunOptions extends RunOptions {
// Если true, не заменяет существующий файбер
readonly onlyIfMissing?: boolean
}
Примеры
Пример 1: Параллельная загрузка файлов
interface FileDownload {
readonly url: string
readonly content: string
}
const downloadFile = (url: string): Effect.Effect<FileDownload> =>
Effect.gen(function* () {
yield* Effect.log(`Downloading: ${url}`)
yield* Effect.sleep(`${100 + Math.random() * 200} millis`)
return { url, content: `Content of ${url}` }
})
const downloadAllFiles = (
urls: ReadonlyArray<string>
): Effect.Effect<ReadonlyArray<FileDownload>, never, Scope.Scope> =>
Effect.gen(function* () {
const downloads = yield* FiberSet.make<FileDownload>()
const results = yield* Ref.make<Chunk.Chunk<FileDownload>>(Chunk.empty())
// Запускаем загрузки параллельно
for (const url of urls) {
yield* FiberSet.run(downloads,
downloadFile(url).pipe(
Effect.tap((file) =>
Ref.update(results, Chunk.append(file))
)
)
)
}
// Ждём завершения всех
yield* FiberSet.join(downloads)
return yield* Ref.get(results).pipe(Effect.map(Chunk.toArray))
})
const program = Effect.scoped(
Effect.gen(function* () {
const files = yield* downloadAllFiles([
"https://example.com/file1.txt",
"https://example.com/file2.txt",
"https://example.com/file3.txt"
])
console.log("Downloaded files:", files.map((f) => f.url))
})
)
Пример 2: Rate-Limited API Calls
interface ApiClient {
readonly call: (endpoint: string) => Effect.Effect<string>
}
const createRateLimitedClient = (
maxConcurrent: number
): Effect.Effect<ApiClient, never, Scope.Scope> =>
Effect.gen(function* () {
const activeCalls = yield* FiberMap.make<string, string>()
const semaphore = yield* Semaphore.make(maxConcurrent)
const call = (endpoint: string) =>
Effect.gen(function* () {
// Проверяем, есть ли уже активный вызов к этому endpoint
const alreadyRunning = yield* FiberMap.has(activeCalls, endpoint)
if (alreadyRunning) {
// Ждём завершения существующего
yield* FiberMap.join(activeCalls).pipe(Effect.ignore)
}
// Запускаем новый вызов с rate limiting
const fiber = yield* FiberMap.run(activeCalls, endpoint,
semaphore.withPermits(1)(
Effect.gen(function* () {
yield* Effect.log(`Calling API: ${endpoint}`)
yield* Effect.sleep("200 millis")
return `Response from ${endpoint}`
})
)
)
return yield* Fiber.join(fiber)
})
return { call }
})
const program = Effect.scoped(
Effect.gen(function* () {
const client = yield* createRateLimitedClient(2)
// Параллельные вызовы (но максимум 2 одновременно)
const results = yield* Effect.all([
client.call("/users"),
client.call("/products"),
client.call("/orders"),
client.call("/stats")
], { concurrency: "unbounded" })
console.log("Results:", results)
})
)
Упражнения
Упражнение 1: Простой FiberSet
Создайте FiberSet и запустите несколько задач параллельно.
import { Effect, FiberSet } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number>()
// Запустите 5 задач, каждая возвращает свой номер
// Дождитесь завершения всех
???
})
)import { Effect, FiberSet } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const set = yield* FiberSet.make<number>()
for (let i = 1; i <= 5; i++) {
yield* FiberSet.run(set,
Effect.gen(function* () {
yield* Effect.log(`Task ${i} started`)
yield* Effect.sleep(`${i * 50} millis`)
yield* Effect.log(`Task ${i} completed`)
return i
})
)
}
yield* FiberSet.join(set)
console.log("All tasks completed!")
})
)
Effect.runFork(program)Упражнение 2: FiberMap с ключами
Создайте FiberMap для управления задачами по имени.
import { Effect, FiberMap, Fiber } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const tasks = yield* FiberMap.make<string, string>()
// Запустите задачи "task-a", "task-b", "task-c"
// Получите результат задачи "task-b"
???
})
)import { Effect, FiberMap, Fiber } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const tasks = yield* FiberMap.make<string, string>()
yield* FiberMap.run(tasks, "task-a",
Effect.succeed("Result A")
)
yield* FiberMap.run(tasks, "task-b",
Effect.succeed("Result B")
)
yield* FiberMap.run(tasks, "task-c",
Effect.succeed("Result C")
)
const fiberB = yield* FiberMap.get(tasks, "task-b")
const result = yield* Fiber.join(fiberB)
console.log("Task B result:", result)
})
)
Effect.runFork(program)Упражнение 3: Отмена предыдущей задачи
Используйте FiberHandle для реализации поиска с отменой предыдущего.
import { Effect, FiberHandle, Fiber } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const searchHandle = yield* FiberHandle.make<string>()
// Реализуйте search, который отменяет предыдущий поиск
const search = (query: string): Effect.Effect<string> =>
???
// Быстрые последовательные поиски
yield* Effect.fork(search("a"))
yield* Effect.sleep("10 millis")
yield* Effect.fork(search("ab"))
yield* Effect.sleep("10 millis")
const result = yield* search("abc")
// Только "abc" должен завершиться
console.log("Result:", result)
})
)import { Effect, FiberHandle, Fiber } from "effect"
const program = Effect.scoped(
Effect.gen(function* () {
const searchHandle = yield* FiberHandle.make<string>()
const search = (query: string): Effect.Effect<string> =>
Effect.gen(function* () {
const fiber = yield* FiberHandle.run(searchHandle,
Effect.gen(function* () {
yield* Effect.log(`Searching for: ${query}`)
yield* Effect.sleep("100 millis")
return `Results for: ${query}`
}).pipe(
Effect.onInterrupt(() =>
Effect.log(`Search for "${query}" cancelled`)
)
)
)
return yield* Fiber.join(fiber)
})
yield* Effect.fork(search("a"))
yield* Effect.sleep("10 millis")
yield* Effect.fork(search("ab"))
yield* Effect.sleep("10 millis")
const result = yield* search("abc")
console.log("Result:", result)
})
)
Effect.runFork(program)
/*
Output:
Searching for: a
Search for "a" cancelled
Searching for: ab
Search for "ab" cancelled
Searching for: abc
Result: Results for: abc
*/Упражнение 4: Task Queue с Worker Pool
Реализуйте очередь задач с пулом воркеров.
import { Effect, FiberSet, Queue, Ref, Scope } from "effect"
interface TaskQueue<T, R> {
readonly submit: (task: Effect.Effect<T>) => Effect.Effect<void>
readonly getStats: Effect.Effect<{ pending: number; completed: number }>
readonly shutdown: Effect.Effect<void>
}
const createTaskQueue = <T>(
workerCount: number
): Effect.Effect<TaskQueue<T, never>, never, Scope.Scope> =>
???
// Использование
const program = Effect.scoped(
Effect.gen(function* () {
const queue = yield* createTaskQueue<number>(3)
// Отправляем 10 задач
for (let i = 0; i < 10; i++) {
yield* queue.submit(
Effect.gen(function* () {
yield* Effect.sleep(`${Math.random() * 100} millis`)
return i
})
)
}
yield* Effect.sleep("500 millis")
const stats = yield* queue.getStats
console.log("Stats:", stats)
})
)import { Effect, FiberSet, Queue, Ref, Scope, Option } from "effect"
interface TaskQueue<T> {
readonly submit: (task: Effect.Effect<T>) => Effect.Effect<void>
readonly getStats: Effect.Effect<{ pending: number; completed: number }>
readonly shutdown: Effect.Effect<void>
}
const createTaskQueue = <T>(
workerCount: number
): Effect.Effect<TaskQueue<T>, never, Scope.Scope> =>
Effect.gen(function* () {
const workers = yield* FiberSet.make<void>()
const taskQueue = yield* Queue.unbounded<Effect.Effect<T>>()
const completedCount = yield* Ref.make(0)
const isShuttingDown = yield* Ref.make(false)
// Воркер
const worker = (id: number) =>
Effect.gen(function* () {
while (true) {
const shutting = yield* Ref.get(isShuttingDown)
if (shutting) break
const maybeTask = yield* Queue.poll(taskQueue)
if (Option.isSome(maybeTask)) {
yield* Effect.log(`Worker ${id}: executing task`)
yield* maybeTask.value.pipe(Effect.ignore)
yield* Ref.update(completedCount, (n) => n + 1)
} else {
yield* Effect.sleep("10 millis")
}
}
}).pipe(Effect.interruptible)
// Запускаем воркеры
for (let i = 0; i < workerCount; i++) {
yield* FiberSet.run(workers, worker(i))
}
return {
submit: (task) => Queue.offer(taskQueue, task),
getStats: Effect.gen(function* () {
const pending = yield* Queue.size(taskQueue)
const completed = yield* Ref.get(completedCount)
return { pending, completed }
}),
shutdown: Effect.gen(function* () {
yield* Ref.set(isShuttingDown, true)
yield* Queue.shutdown(taskQueue)
yield* FiberSet.clear(workers)
})
}
})
const program = Effect.scoped(
Effect.gen(function* () {
const queue = yield* createTaskQueue<number>(3)
for (let i = 0; i < 10; i++) {
yield* queue.submit(
Effect.gen(function* () {
yield* Effect.sleep(`${Math.random() * 100} millis`)
return i
})
)
}
yield* Effect.sleep("500 millis")
const stats = yield* queue.getStats
console.log("Stats:", stats)
})
)
Effect.runFork(program)Заключение
FiberMap, FiberSet и FiberHandle предоставляют высокоуровневые абстракции для управления коллекциями файберов:
- FiberSet — для неупорядоченных коллекций (пулы, параллельные задачи)
- FiberMap — для коллекций с ключами (именованные воркеры, per-user задачи)
- FiberHandle — для единичных файберов (текущая операция, singleton)
Ключевые преимущества:
- Автоматический lifecycle management через Scope
- Автоматическое удаление завершённых файберов
- Удобные операции join/await
- Runtime функции для создания эффектов с контекстом
- Настраиваемое поведение прерываний
В следующей статье мы рассмотрим подробно, когда использовать Fiber, а когда Worker для настоящего параллелизма.