Fiber: Green Threads
Глубокое погружение в модель конкурентности Effect.
Введение в Fiber
Effect — это высококонкурентный фреймворк, построенный на основе файберов (fibers). Файберы представляют собой легковесные виртуальные потоки с возможностью безопасной отмены, которые обеспечивают множество ключевых возможностей Effect.
Почему файберы?
В традиционной конкурентности мы сталкиваемся с несколькими проблемами:
┌─────────────────────────────────────────────────────────────────┐
│ ПРОБЛЕМЫ ТРАДИЦИОННЫХ ПОТОКОВ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Тяжеловесность │
│ ├── Каждый поток ОС потребляет ~1-8 MB стека │
│ └── Переключение контекста дорогостоящее │
│ │
│ 2. Неструктурированность │
│ ├── Потоки живут независимо │
│ └── Нет автоматической связи parent-child │
│ │
│ 3. Сложность отмены │
│ ├── Thread.interrupt() небезопасен │
│ └── Требуется ручная кооперативная отмена │
│ │
│ 4. Утечки ресурсов │
│ ├── Orphaned threads │
│ └── Незакрытые соединения при исключениях │
│ │
└─────────────────────────────────────────────────────────────────┘
Файберы Effect решают все эти проблемы:
┌─────────────────────────────────────────────────────────────────┐
│ ПРЕИМУЩЕСТВА FIBER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ✓ Легковесность ~200 байт на файбер │
│ ✓ Структурированность Автоматическая иерархия │
│ ✓ Безопасная отмена Асинхронное прерывание │
│ ✓ Resource-safe Гарантированный cleanup │
│ ✓ Composable Комбинаторы для композиции │
│ │
└─────────────────────────────────────────────────────────────────┘
Что такое виртуальные потоки
JavaScript изначально однопоточен — код выполняется последовательно в единственной последовательности инструкций. Однако современные JavaScript-среды используют event loop для управления асинхронными операциями, создавая иллюзию многозадачности.
Архитектура виртуальных потоков
┌─────────────────────────────────────────────────────────────────────┐
│ JavaScript Event Loop │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────┐ │
│ │ Call Stack │ ←── Выполнение синхронного кода │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Effect Runtime │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Fiber Scheduler │ │ │
│ │ │ │ │ │
│ │ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │ │
│ │ │ │Fiber #1│ │Fiber #2│ │Fiber #3│ │Fiber #n│ │ │ │
│ │ │ └────────┘ └────────┘ └────────┘ └────────┘ │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Microtask Queue │ │
│ └──────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────┐ │
│ │ Macrotask Queue │ │
│ └──────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘
В контексте Effect виртуальные потоки, или файберы, — это логические потоки, симулируемые Effect runtime. Они позволяют выполнять конкурентные операции без использования реальной многопоточности, которая не поддерживается в JavaScript нативно.
Ключевая идея
// Effect — это описание вычисления (ленивое, иммутабельное)
const myEffect = Effect.succeed(42)
// Fiber — это выполняющееся вычисление (активное, управляемое)
const fiber = Effect.runFork(myEffect)
Effect — это высокоуровневая концепция, описывающая эффективное вычисление. Он ленив и иммутабелен, то есть представляет вычисление, которое может произвести значение или завершиться ошибкой, но не выполняется немедленно.
Fiber — это работающее выполнение Effect. Его можно прервать или дождаться для получения результата. Думайте о нём как о способе контролировать и взаимодействовать с продолжающимся вычислением.
Как работают Fiber
Все эффекты в Effect выполняются файберами. Если вы не создали файбер самостоятельно, он был создан используемой операцией (если она конкурентная) или системой runtime Effect.
Создание файберов
Файбер создаётся каждый раз, когда выполняется эффект:
// При запуске создаётся "main" fiber
const program = Effect.gen(function* () {
const value = yield* Effect.succeed(42)
return value
})
// Запуск создаёт root fiber
Effect.runFork(program)
При выполнении конкурентных операций файбер создаётся для каждого конкурентного эффекта:
const program = Effect.gen(function* () {
// Каждый эффект в all выполняется в своём файбере
const results = yield* Effect.all(
[
Effect.succeed(1),
Effect.succeed(2),
Effect.succeed(3)
],
{ concurrency: "unbounded" }
)
return results
})
Иерархия файберов
┌─────────────────────────────────────────────────────────────────┐
│ Root Fiber (main) │
│ FiberId: #0 │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Child #1 │ │ Child #2 │ │ Child #3 │ │
│ │ FiberId: #1 │ │ FiberId: #2 │ │ FiberId: #3 │ │
│ └──────┬───────┘ └──────────────┘ └──────────────┘ │
│ │ │
│ ┌──────┴───────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌────────┐ ┌────────┐ │
│ │Grand#1 │ │Grand#2 │ │
│ │Id: #4 │ │Id: #5 │ │
│ └────────┘ └────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Важно помнить
Даже если вы пишете “однопоточный” код без конкурентных операций, всегда будет как минимум один файбер — “main” файбер, выполняющий ваш эффект.
Тип данных Fiber
Тип данных Fiber в Effect представляет “handle” на выполнение эффекта.
Сигнатура типа
// ┌─── Тип успешного результата
// │ ┌─── Тип ошибки
// │ │
// ▼ ▼
Fiber<Success, Error>
Этот тип указывает, что файбер:
- Успешно завершается и возвращает значение типа
Success - Завершается с ошибкой типа
Error
Отсутствие Requirements
// Effect имеет Requirements (R)
type MyEffect = Effect.Effect<number, Error, MyService>
// Fiber НЕ имеет Requirements
type MyFiber = Fiber.Fiber<number, Error>
Файберы не имеют параметра типа Requirements, потому что они выполняют только те эффекты, зависимости которых уже были предоставлены.
RuntimeFiber vs Fiber
Effect различает два типа файберов:
// Fiber — базовый тип (может быть synthetic)
type BaseFiber<A, E> = Fiber.Fiber<A, E>
// RuntimeFiber — реальный выполняющийся файбер
type RuntimeFiber<A, E> = Fiber.RuntimeFiber<A, E>
┌─────────────────────────────────────────────────────────────────┐
│ Fiber Types │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Fiber.Fiber<A, E> │
│ ├── Базовый интерфейс │
│ ├── Может быть синтетическим (Fiber.zip, Fiber.orElse) │
│ └── Минимальный набор операций │
│ │
│ Fiber.RuntimeFiber<A, E> │
│ ├── Реальный выполняющийся файбер │
│ ├── Создаётся Effect.fork, Effect.runFork │
│ ├── Имеет FiberId, status, scope │
│ └── Полный набор операций │
│ │
└─────────────────────────────────────────────────────────────────┘
Структурная конкурентность
Effect следует модели структурной конкурентности (structured concurrency), где время жизни дочерних файберов привязано к родительским. Проще говоря, срок жизни файбера зависит от срока жизни его родительского файбера.
Принципы структурной конкурентности
┌─────────────────────────────────────────────────────────────────┐
│ СТРУКТУРНАЯ КОНКУРЕНТНОСТЬ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Иерархия владения │
│ └── Родительский файбер "владеет" дочерними │
│ │
│ 2. Автоматическая отмена │
│ └── При завершении родителя дети отменяются │
│ │
│ 3. Распространение ошибок │
│ └── Ошибка ребёнка может завершить родителя │
│ │
│ 4. Гарантированный cleanup │
│ └── Finalizers выполняются в обратном порядке │
│ │
│ 5. Предсказуемость │
│ └── Поведение определяется структурой кода │
│ │
└─────────────────────────────────────────────────────────────────┘
Пример структурной конкурентности
// Дочерний файбер, который логирует сообщение каждую секунду
const child = Effect.repeat(
Console.log("child: still running!"),
Schedule.fixed("1 second")
)
const parent = Effect.gen(function* () {
console.log("parent: started!")
// Дочерний файбер супервизируется родителем
yield* Effect.fork(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished!")
})
Effect.runFork(parent)
/*
Output:
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!
← child автоматически завершается с parent!
*/
Визуализация жизненного цикла
Время ────────────────────────────────────────────────►
Parent ╔══════════════════════════════════════╗
║ started! finished! ║
╚══════════════════════════════════════╝
│
Child ╔════════════════════════════════════╗ │
║ running! running! running! ║──┘ interrupted
╚════════════════════════════════════╝
0s 1s 2s 3s
Это поведение распространяется на любой уровень вложенности файберов, обеспечивая предсказуемый и контролируемый жизненный цикл.
Жизненный цикл Fiber
Каждый файбер имеет чётко определённый жизненный цикл, основанный на выполняемом эффекте.
Состояния жизненного цикла
┌─────────────────────────────────────────────────────────────────┐
│ FIBER LIFECYCLE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────┐ │
│ │ Created │ ─── Файбер создан, но не запущен │
│ └────┬────┘ │
│ │ fork/runFork │
│ ▼ │
│ ┌─────────┐ │
│ │ Running │ ─── Активное выполнение │
│ └────┬────┘ │
│ │ │
│ ┌────────┼────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌───────┐ ┌────────┐ ┌───────────┐ │
│ │Succeed│ │ Failed │ │Interrupted│ │
│ └───┬───┘ └───┬────┘ └─────┬─────┘ │
│ │ │ │ │
│ └─────────┴────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────┐ │
│ │ Done │ ─── Финальное состояние │
│ └─────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Exit — результат завершения
Каждый файбер завершается с одним из двух результатов:
// Успешное завершение
type SuccessExit<A> = Exit.Success<A>
// Завершение с ошибкой (включая прерывание)
type FailureExit<E> = Exit.Failure<E>
// Объединённый тип
type Exit<A, E> = Exit.Success<A> | Exit.Failure<E>
Cause внутри Failure может содержать:
Fail— ожидаемая ошибкаDie— неожиданный defectInterrupt— прерываниеSequential— последовательные ошибкиParallel— параллельные ошибки
FiberId — идентификация файберов
Каждый файбер имеет уникальный идентификатор для отслеживания и отладки.
Структура FiberId
// Runtime FiberId — для реальных выполняющихся файберов
interface RuntimeFiberId {
readonly _tag: "Runtime"
readonly id: number // Уникальный числовой ID
readonly startTimeMillis: number // Время создания
}
// Composite FiberId — для скомбинированных файберов
interface CompositeFiberId {
readonly _tag: "Composite"
readonly left: FiberId
readonly right: FiberId
}
// None — отсутствие ID
interface NoneFiberId {
readonly _tag: "None"
}
Получение FiberId
const program = Effect.gen(function* () {
// Получить ID текущего файбера
const currentFiberId = yield* Effect.fiberId
console.log("Current fiber:", FiberId.threadName(currentFiberId))
// Fork и получить ID дочернего файбера
const fiber = yield* Effect.fork(Effect.succeed(42))
const childFiberId = fiber.id()
console.log("Child fiber:", FiberId.threadName(childFiberId))
})
Effect.runFork(program)
/*
Output:
Current fiber: #0
Child fiber: #1
*/
Операции с FiberId
| Операция | Описание | Сложность |
|---|---|---|
FiberId.none | Пустой идентификатор | O(1) |
FiberId.runtime(id, startTime) | Создать runtime ID | O(1) |
FiberId.combine(left, right) | Объединить два ID | O(1) |
FiberId.ids(fiberId) | Получить все числовые ID | O(n) |
FiberId.threadName(fiberId) | Человекочитаемое имя | O(1) |
FiberStatus — состояния файбера
FiberStatus описывает текущее состояние файбера в любой момент времени.
Типы состояний
// Файбер выполняется
const running: FiberStatus.Running = FiberStatus.running(
RuntimeFlags.none // флаги среды выполнения
)
// Файбер приостановлен
const suspended: FiberStatus.Suspended = FiberStatus.suspended(
RuntimeFlags.none,
asyncBlockingOn // причина блокировки
)
// Файбер завершён
const done: FiberStatus.Done = FiberStatus.done
Проверка статуса
const checkFiberStatus = Effect.gen(function* () {
// Создаём долгоживущий файбер
const fiber = yield* Effect.fork(
Effect.repeat(
Effect.sleep("100 millis"),
Schedule.recurs(10)
)
)
// Проверяем статус
const status1 = yield* Fiber.status(fiber)
console.log("Status after fork:", status1._tag)
yield* Effect.sleep("500 millis")
const status2 = yield* Fiber.status(fiber)
console.log("Status after delay:", status2._tag)
yield* Fiber.join(fiber)
const status3 = yield* Fiber.status(fiber)
console.log("Status after join:", status3._tag)
})
Effect.runFork(checkFiberStatus)
/*
Output:
Status after fork: Running (или Suspended)
Status after delay: Running (или Suspended)
Status after join: Done
*/
Диаграмма переходов состояний
┌──────────────────────────────────┐
│ │
│ Created │
│ (before execution) │
│ │
└───────────────┬──────────────────┘
│ start
▼
┌───────────────────────────────────────────────────────────┐
│ │
│ ┌─────────────┐ ┌──────────────┐ │
│ │ Running │◄──────────►│ Suspended │ │
│ │ │ yield │ │ │
│ │ (executing) │ resume │ (waiting) │ │
│ └──────┬──────┘ └──────────────┘ │
│ │ │
└──────────┼────────────────────────────────────────────────┘
│ complete / fail / interrupt
▼
┌───────────────────────────────────────────────────────────┐
│ │
│ Done │
│ (Exit<A, E>) │
│ │
└───────────────────────────────────────────────────────────┘
Сравнение с другими моделями
Fiber vs OS Threads
| Аспект | OS Threads | Effect Fibers |
|---|---|---|
| Размер стека | 1-8 MB | ~200 bytes |
| Количество | Тысячи | Миллионы |
| Переключение | Дорогое (kernel) | Дешёвое (userspace) |
| Создание | Медленное (~1ms) | Мгновенное (~1μs) |
| Блокировка | Блокирует поток | Только файбер |
| Отмена | Небезопасная | Безопасная, кооперативная |
Fiber vs Promise
| Аспект | Promise | Effect Fiber |
|---|---|---|
| Отмена | Невозможна | Встроенная |
| Типизация ошибок | Нет | Да |
| Иерархия | Нет | Структурная |
| Ресурсы | Manual cleanup | Автоматический |
| Приоритеты | Нет | Поддерживаются |
| Супервизия | Нет | Встроенная |
Fiber vs Async/Await
// Async/Await — нет отмены, нет иерархии
async function traditionalApproach() {
const result = await fetch("/api/data")
return result.json()
}
// Effect — полный контроль
const effectApproach = Effect.gen(function* () {
const result = yield* Effect.tryPromise(() => fetch("/api/data"))
return yield* Effect.tryPromise(() => result.json())
}).pipe(
Effect.timeout("5 seconds"), // Автоматическая отмена
Effect.retry({ times: 3 }) // Автоматические повторы
)
API Reference
Основные операции создания
| Функция | Сигнатура | Описание |
|---|---|---|
Effect.fork | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R> | Fork в дочерний файбер |
Effect.forkDaemon | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R> | Fork в глобальный scope |
Effect.forkScoped | Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R | Scope> | Fork в текущий scope |
Операции с Fiber
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.join | Fiber<A, E> → Effect<A, E> | Дождаться и получить результат |
Fiber.await | Fiber<A, E> → Effect<Exit<A, E>> | Дождаться и получить Exit |
Fiber.poll | Fiber<A, E> → Effect<Option<Exit<A, E>>> | Неблокирующая проверка |
Fiber.interrupt | Fiber<A, E> → Effect<Exit<A, E>> | Прервать файбер |
Fiber.status | Fiber<A, E> → Effect<FiberStatus> | Получить текущий статус |
Комбинаторы Fiber
| Функция | Сигнатура | Описание |
|---|---|---|
Fiber.zip | (Fiber<A>, Fiber<B>) → Fiber<[A, B]> | Объединить в tuple |
Fiber.zipWith | (Fiber<A>, Fiber<B>, f) → Fiber<C> | Объединить с функцией |
Fiber.orElse | (Fiber<A>, Fiber<A>) → Fiber<A> | Fallback при ошибке |
Fiber.all | Iterable<Fiber<A>> → Fiber<A[]> | Собрать все результаты |
Примеры
Пример 1: Базовое создание и join файбера
// Функция вычисления числа Фибоначчи
const fib = (n: number): Effect.Effect<number> =>
n < 2
? Effect.succeed(n)
: Effect.zipWith(
fib(n - 1),
fib(n - 2),
(a, b) => a + b
)
const program = Effect.gen(function* () {
// Fork вычисления в отдельный файбер
const fiber = yield* Effect.fork(fib(10))
console.log("Fiber started, doing other work...")
yield* Effect.sleep("100 millis")
// Дождаться результата
const result = yield* Fiber.join(fiber)
console.log(`Fibonacci(10) = ${result}`)
})
Effect.runFork(program)
/*
Output:
Fiber started, doing other work...
Fibonacci(10) = 55
*/
Пример 2: Параллельные вычисления с композицией
const fetchUser = (id: number) =>
Effect.gen(function* () {
yield* Effect.sleep(`${100 * id} millis`)
return { id, name: `User ${id}` }
})
const fetchPosts = (userId: number) =>
Effect.gen(function* () {
yield* Effect.sleep(`${50 * userId} millis`)
return [
{ id: 1, title: `Post 1 by user ${userId}` },
{ id: 2, title: `Post 2 by user ${userId}` }
]
})
const program = Effect.gen(function* () {
// Fork оба запроса параллельно
const userFiber = yield* Effect.fork(fetchUser(1))
const postsFiber = yield* Effect.fork(fetchPosts(1))
// Комбинируем файберы
const combinedFiber = Fiber.zip(userFiber, postsFiber)
// Получаем оба результата
const [user, posts] = yield* Fiber.join(combinedFiber)
console.log("User:", user)
console.log("Posts:", posts)
})
Effect.runFork(program)
/*
Output:
User: { id: 1, name: 'User 1' }
Posts: [ { id: 1, title: 'Post 1 by user 1' }, { id: 2, title: 'Post 2 by user 1' } ]
*/
Пример 3: Проверка статуса файбера
const longRunningTask = Effect.repeat(
Effect.gen(function* () {
yield* Effect.log("Working...")
yield* Effect.sleep("200 millis")
}),
Schedule.recurs(5)
)
const monitorFiber = (fiber: Fiber.RuntimeFiber<void, never>) =>
Effect.gen(function* () {
let isDone = false
while (!isDone) {
const status = yield* Fiber.status(fiber)
switch (status._tag) {
case "Done":
console.log("Fiber completed!")
isDone = true
break
case "Running":
console.log("Fiber is running")
break
case "Suspended":
console.log("Fiber is suspended")
break
}
if (!isDone) {
yield* Effect.sleep("100 millis")
}
}
})
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
yield* monitorFiber(fiber)
})
Effect.runFork(program)
Пример 4: Fallback с Fiber.orElse
const unreliableService = Effect.gen(function* () {
const random = Math.random()
if (random < 0.7) {
yield* Effect.fail("Service unavailable" as const)
}
return "Primary result"
})
const backupService = Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return "Backup result"
})
const program = Effect.gen(function* () {
// Fork оба сервиса
const primaryFiber = yield* Effect.fork(unreliableService)
const backupFiber = yield* Effect.fork(backupService)
// Используем backup как fallback
const combinedFiber = Fiber.orElse(primaryFiber, backupFiber)
const result = yield* Fiber.join(combinedFiber)
console.log("Result:", result)
})
// Запустим несколько раз для демонстрации
Effect.runFork(program)
Упражнения
Упражнение 1: Простой fork и join
Создайте эффект, который форкает вычисление суммы чисел от 1 до N и затем присоединяет результат.
import { Effect, Fiber } from "effect"
const sumTo = (n: number): Effect.Effect<number> =>
// Ваша реализация
???
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(sumTo(100))
// Сделайте что-то ещё пока сумма вычисляется
yield* Effect.log("Computing sum...")
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Sum = ${result}`)
})
// Ожидаемый результат: Sum = 5050import { Effect, Fiber } from "effect"
const sumTo = (n: number): Effect.Effect<number> =>
Effect.succeed(
Array.from({ length: n }, (_, i) => i + 1)
.reduce((acc, x) => acc + x, 0)
)
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(sumTo(100))
yield* Effect.log("Computing sum...")
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Sum = ${result}`)
})
Effect.runFork(program)Упражнение 2: Await и обработка Exit
Используйте Fiber.await для получения Exit и обработайте оба случая (успех и ошибка).
import { Effect, Fiber, Exit } from "effect"
const mayFail = (shouldFail: boolean) =>
shouldFail
? Effect.fail("Intentional failure" as const)
: Effect.succeed("Success!")
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(mayFail(Math.random() > 0.5))
const exit = yield* Fiber.await(fiber)
// Обработайте exit
// Ваша реализация
???
})import { Effect, Fiber, Exit, Match } from "effect"
const mayFail = (shouldFail: boolean) =>
shouldFail
? Effect.fail("Intentional failure" as const)
: Effect.succeed("Success!")
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(mayFail(Math.random() > 0.5))
const exit = yield* Fiber.await(fiber)
// Вариант 1: через Exit.match
Exit.match(exit, {
onSuccess: (value) => console.log("Success:", value),
onFailure: (cause) => console.log("Failure:", cause)
})
// Вариант 2: через проверку тега
if (Exit.isSuccess(exit)) {
console.log("Got success:", exit.value)
} else {
console.log("Got failure:", exit.cause)
}
})
Effect.runFork(program)Упражнение 3: Параллельная загрузка с таймаутом
Создайте систему, которая параллельно загружает данные из нескольких источников с таймаутом для каждого.
import { Effect, Fiber, Option } from "effect"
interface DataSource {
readonly name: string
readonly delay: number
readonly data: string
}
const dataSources: ReadonlyArray<DataSource> = [
{ name: "Database", delay: 100, data: "DB Data" },
{ name: "Cache", delay: 50, data: "Cache Data" },
{ name: "External API", delay: 300, data: "API Data" }
] as const
const fetchFromSource = (source: DataSource) =>
// Ваша реализация: имитация загрузки с задержкой
???
const fetchAllWithTimeout = (
sources: ReadonlyArray<DataSource>,
timeout: number
) =>
// Ваша реализация: fork всех источников и poll с таймаутом
???import { Effect, Fiber, Option, Chunk } from "effect"
interface DataSource {
readonly name: string
readonly delay: number
readonly data: string
}
const dataSources: ReadonlyArray<DataSource> = [
{ name: "Database", delay: 100, data: "DB Data" },
{ name: "Cache", delay: 50, data: "Cache Data" },
{ name: "External API", delay: 300, data: "API Data" }
] as const
const fetchFromSource = (source: DataSource) =>
Effect.gen(function* () {
yield* Effect.sleep(`${source.delay} millis`)
return { name: source.name, data: source.data }
})
const fetchAllWithTimeout = (
sources: ReadonlyArray<DataSource>,
timeout: number
) =>
Effect.gen(function* () {
// Fork все источники
const fibers = yield* Effect.forEach(
sources,
(source) => Effect.fork(fetchFromSource(source))
)
// Ждём timeout
yield* Effect.sleep(`${timeout} millis`)
// Poll каждый файбер
const results = yield* Effect.forEach(
fibers,
(fiber) => Effect.gen(function* () {
const maybeExit = yield* Fiber.poll(fiber)
return Option.flatMap(maybeExit, (exit) =>
exit._tag === "Success"
? Option.some(exit.value)
: Option.none()
)
})
)
// Прерываем оставшиеся файберы
yield* Effect.forEach(fibers, Fiber.interrupt, { discard: true })
return results.filter(Option.isSome).map(o => o.value)
})
const program = Effect.gen(function* () {
const results = yield* fetchAllWithTimeout(dataSources, 150)
console.log("Got results within timeout:", results)
})
Effect.runFork(program)
// Output: Got results within timeout: [{ name: 'Cache', data: 'Cache Data' }, { name: 'Database', data: 'DB Data' }]Упражнение 4: Структурная конкурентность с вложенными файберами
Создайте систему обработки задач с иерархией parent → child → grandchild, где завершение parent автоматически завершает всех потомков.
import { Effect, Console, Schedule, Fiber } from "effect"
// Создайте три уровня файберов:
// 1. Parent - живёт 3 секунды
// 2. Child - логирует каждую секунду
// 3. Grandchild - логирует каждые 500ms
const createHierarchy = Effect.gen(function* () {
// Ваша реализация
???
})import { Effect, Console, Schedule, Fiber } from "effect"
// Grandchild: логирует каждые 500ms
const grandchild = Effect.repeat(
Console.log(" grandchild: tick"),
Schedule.fixed("500 millis")
)
// Child: логирует каждую секунду и форкает grandchild
const child = Effect.gen(function* () {
yield* Effect.fork(grandchild)
yield* Effect.repeat(
Console.log(" child: tick"),
Schedule.fixed("1 second")
)
})
// Parent: форкает child и живёт 3 секунды
const parent = Effect.gen(function* () {
console.log("parent: started")
yield* Effect.fork(child)
yield* Effect.sleep("3 seconds")
console.log("parent: finished")
})
const program = Effect.gen(function* () {
yield* parent
yield* Effect.sleep("1 second") // Показать, что child и grandchild завершились
console.log("--- All hierarchy terminated ---")
})
Effect.runFork(program)
/*
Output:
parent: started
grandchild: tick
grandchild: tick
child: tick
grandchild: tick
grandchild: tick
child: tick
grandchild: tick
grandchild: tick
child: tick
parent: finished
--- All hierarchy terminated ---
*/Упражнение 5: Реализация простого Work Stealing Pool
Создайте пул воркеров, который распределяет задачи между файберами и позволяет отслеживать прогресс.
import { Effect, Fiber, Queue, Ref, Chunk } from "effect"
interface Task {
readonly id: number
readonly duration: number
}
interface WorkerPool {
readonly submit: (task: Task) => Effect.Effect<void>
readonly getProgress: Effect.Effect<{
completed: number
pending: number
inProgress: number
}>
readonly shutdown: Effect.Effect<void>
}
const createWorkerPool = (
workerCount: number
): Effect.Effect<WorkerPool, never, Scope> =>
// Ваша реализация
???import { Effect, Fiber, Queue, Ref, Scope, Chunk } from "effect"
interface Task {
readonly id: number
readonly duration: number
}
interface WorkerStats {
readonly completed: number
readonly pending: number
readonly inProgress: number
}
interface WorkerPool {
readonly submit: (task: Task) => Effect.Effect<void>
readonly getProgress: Effect.Effect<WorkerStats>
readonly shutdown: Effect.Effect<void>
}
const createWorkerPool = (
workerCount: number
): Effect.Effect<WorkerPool, never, Scope> =>
Effect.gen(function* () {
// Очередь задач
const taskQueue = yield* Queue.unbounded<Task>()
// Статистика
const completed = yield* Ref.make(0)
const inProgress = yield* Ref.make(0)
// Создаём воркер
const worker = (workerId: number) =>
Effect.gen(function* () {
while (true) {
const task = yield* Queue.take(taskQueue)
yield* Ref.update(inProgress, (n) => n + 1)
yield* Effect.log(`Worker ${workerId}: processing task ${task.id}`)
yield* Effect.sleep(`${task.duration} millis`)
yield* Effect.log(`Worker ${workerId}: completed task ${task.id}`)
yield* Ref.update(inProgress, (n) => n - 1)
yield* Ref.update(completed, (n) => n + 1)
}
}).pipe(Effect.interruptible)
// Массив файберов воркеров
const workerFibers = yield* Effect.forEach(
Array.from({ length: workerCount }, (_, i) => i),
(id) => Effect.forkScoped(worker(id))
)
const pool: WorkerPool = {
submit: (task) => Queue.offer(taskQueue, task),
getProgress: Effect.gen(function* () {
const completedCount = yield* Ref.get(completed)
const inProgressCount = yield* Ref.get(inProgress)
const pendingCount = yield* Queue.size(taskQueue)
return {
completed: completedCount,
pending: pendingCount,
inProgress: inProgressCount
}
}),
shutdown: Effect.gen(function* () {
yield* Effect.forEach(workerFibers, Fiber.interrupt, { discard: true })
yield* Queue.shutdown(taskQueue)
})
}
return pool
})
// Использование
const program = Effect.scoped(
Effect.gen(function* () {
const pool = yield* createWorkerPool(3)
// Отправляем задачи
for (let i = 0; i < 10; i++) {
yield* pool.submit({ id: i, duration: 100 + Math.random() * 200 })
}
// Мониторим прогресс
yield* Effect.repeat(
Effect.gen(function* () {
const stats = yield* pool.getProgress
yield* Effect.log(`Progress: ${JSON.stringify(stats)}`)
}),
Schedule.fixed("200 millis").pipe(Schedule.intersect(Schedule.recurs(10)))
)
// Ждём завершения
yield* Effect.sleep("3 seconds")
const finalStats = yield* pool.getProgress
yield* Effect.log(`Final: ${JSON.stringify(finalStats)}`)
})
)
Effect.runFork(program)Упражнение 6: Race с cleanup и таймаутом
Реализуйте функцию race, которая запускает несколько эффектов параллельно, возвращает первый успешный результат и корректно очищает остальные файберы.
import { Effect, Fiber, Deferred, Exit } from "effect"
const raceWithCleanup = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>,
timeout: Duration.DurationInput
): Effect.Effect<A, E | TimeoutError> =>
// Ваша реализация должна:
// 1. Fork все эффекты
// 2. Дождаться первого успешного
// 3. Прервать остальные
// 4. Обработать таймаут
???import { Effect, Fiber, Deferred, Exit, Duration, Cause } from "effect"
class TimeoutError {
readonly _tag = "TimeoutError"
}
const raceWithCleanup = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>,
timeout: Duration.DurationInput
): Effect.Effect<A, E | TimeoutError> =>
Effect.gen(function* () {
if (effects.length === 0) {
return yield* Effect.die("No effects to race")
}
// Deferred для результата
const result = yield* Deferred.make<A, E | TimeoutError>()
// Fork все эффекты
const fibers = yield* Effect.forEach(
effects,
(effect, index) =>
Effect.fork(
Effect.gen(function* () {
const exit = yield* Effect.exit(effect)
// Пытаемся записать результат (первый выигрывает)
if (Exit.isSuccess(exit)) {
yield* Deferred.succeed(result, exit.value)
} else if (Exit.isFailure(exit) && !Cause.isInterruptedOnly(exit.cause)) {
// Передаём только реальные ошибки, не прерывания
const failure = Cause.failures(exit.cause)
if (failure.length > 0) {
yield* Deferred.fail(result, failure[0]!)
}
}
})
)
)
// Fork таймаут
const timeoutFiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(timeout)
yield* Deferred.fail(result, new TimeoutError())
})
)
// Ждём результат
const finalResult = yield* Effect.gen(function* () {
const value = yield* Deferred.await(result)
return value
}).pipe(
Effect.onExit(() =>
// Cleanup: прерываем все файберы
Effect.gen(function* () {
yield* Effect.forEach([...fibers, timeoutFiber], Fiber.interrupt, {
discard: true
})
})
)
)
return finalResult
})
// Тест
const program = Effect.gen(function* () {
const effects = [
Effect.sleep("100 millis").pipe(Effect.as("fast")),
Effect.sleep("500 millis").pipe(Effect.as("slow")),
Effect.sleep("200 millis").pipe(
Effect.flatMap(() => Effect.fail("error" as const))
)
]
const winner = yield* raceWithCleanup(effects, "300 millis")
yield* Effect.log(`Winner: ${winner}`)
})
Effect.runFork(program)
// Output: Winner: fastЗаключение
Fiber в Effect — это фундаментальная концепция, обеспечивающая:
- Легковесную конкурентность — миллионы файберов вместо тысяч потоков
- Структурную конкурентность — предсказуемый жизненный цикл
- Безопасную отмену — автоматический cleanup ресурсов
- Композицию — комбинаторы для сложных сценариев
В следующей статье мы подробно рассмотрим операции forking — различные способы создания файберов с разными стратегиями жизненного цикла.