Параллелизм vs Многопоточность
Fiber, Worker и когда что использовать.
Введение в модели конкурентности
Прежде чем разбирать Fiber и Worker, важно понять фундаментальные различия между конкурентностью и параллелизмом.
Конкурентность vs Параллелизм
┌─────────────────────────────────────────────────────────────────┐
│ КОНКУРЕНТНОСТЬ vs ПАРАЛЛЕЛИЗМ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ КОНКУРЕНТНОСТЬ (Concurrency) │
│ "Dealing with multiple things at once" │
│ │
│ ┌────────┐ ┌────────┐ ┌────────┐ │
│ │ Task A │────►│ Task B │────►│ Task A │────► ... │
│ └────────┘ └────────┘ └────────┘ │
│ │ │
│ ▼ │
│ Один процессор │
│ переключается между задачами │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ ПАРАЛЛЕЛИЗМ (Parallelism) │
│ "Doing multiple things at once" │
│ │
│ Core 1: ┌────────────────────────────────────┐ │
│ │ Task A │ │
│ └────────────────────────────────────┘ │
│ │
│ Core 2: ┌────────────────────────────────────┐ │
│ │ Task B │ │
│ └────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Несколько процессоров │
│ работают одновременно │
│ │
└─────────────────────────────────────────────────────────────────┘
Терминология
┌─────────────────────────────────────────────────────────────────┐
│ ТЕРМИНОЛОГИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Процесс (Process) │
│ ├── Изолированное адресное пространство │
│ ├── Собственная память │
│ └── Тяжёлый для создания │
│ │
│ Поток (Thread) │
│ ├── Разделяет память с другими потоками процесса │
│ ├── Собственный стек вызовов │
│ └── Управляется операционной системой │
│ │
│ Fiber (Green Thread / Lightweight Thread) │
│ ├── Виртуальный поток, управляемый runtime │
│ ├── Очень лёгкий (~200 байт) │
│ └── Кооперативная многозадачность │
│ │
│ Worker │
│ ├── Отдельный поток OS (в Node.js/Bun) │
│ ├── Изолированный контекст V8/JavaScriptCore │
│ └── Коммуникация через сообщения │
│ │
└─────────────────────────────────────────────────────────────────┘
JavaScript Event Loop и его ограничения
JavaScript исторически однопоточен. Понимание Event Loop критично для выбора между Fiber и Worker.
Архитектура Event Loop
┌─────────────────────────────────────────────────────────────────┐
│ JavaScript Runtime │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Call Stack │ │
│ │ ┌─────────┐ │ │
│ │ │ func3() │ ◄── Текущая функция │ │
│ │ ├─────────┤ │ │
│ │ │ func2() │ │ │
│ │ ├─────────┤ │ │
│ │ │ func1() │ │ │
│ │ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Event Loop │ │
│ │ │ │
│ │ while (true) { │ │
│ │ if (callStack.isEmpty()) { │ │
│ │ if (microtaskQueue.hasItems()) { │ │
│ │ callStack.push(microtaskQueue.dequeue()) │ │
│ │ } else if (macrotaskQueue.hasItems()) { │ │
│ │ callStack.push(macrotaskQueue.dequeue()) │ │
│ │ } │ │
│ │ } │ │
│ │ } │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ┌───────────────┴───────────────┐ │
│ │ │ │
│ ▼ ▼ │
│ ┌───────────────┐ ┌───────────────┐ │
│ │ Microtasks │ │ Macrotasks │ │
│ │ (Promises) │ │ (setTimeout) │ │
│ │ (queueMicro) │ │ (setInterval) │ │
│ └───────────────┘ │ (I/O events) │ │
│ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Проблема блокировки Event Loop
// ❌ ПЛОХО: Блокирует Event Loop
function computeFibonacci(n: number): number {
if (n < 2) return n
return computeFibonacci(n - 1) + computeFibonacci(n - 2)
}
// Это заблокирует ВСЕ асинхронные операции на ~секунды
const result = computeFibonacci(45)
console.log(result)
// HTTP запросы, таймеры, I/O — всё ждёт!
┌─────────────────────────────────────────────────────────────────┐
│ БЛОКИРОВКА EVENT LOOP │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Время ────────────────────────────────────────────────────► │
│ │
│ Call Stack: │
│ ┌────────────────────────────────────────────────────────┐ │
│ │ fib(45) — БЛОКИРУЕТ │ │
│ └────────────────────────────────────────────────────────┘ │
│ │
│ Очередь задач (заблокирована): │
│ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │HTTP │ │Timer│ │ I/O │ │Event│ ◄── Ждут, пока fib завершится │
│ └─────┘ └─────┘ └─────┘ └─────┘ │
│ │
│ Результат: │
│ • UI заморожен (в браузере) │
│ • Сервер не отвечает на запросы │
│ • Таймеры задерживаются │
│ │
└─────────────────────────────────────────────────────────────────┘
Два решения проблемы
┌─────────────────────────────────────────────────────────────────┐
│ ДВА РЕШЕНИЯ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ РЕШЕНИЕ 1: Fiber (Кооперативная многозадачность) │
│ ────────────────────────────────────────────────────────── │
│ • Разбиваем долгие вычисления на мелкие части │
│ • Добровольно уступаем управление (yield) │
│ • Event Loop может обрабатывать другие задачи │
│ • НО: всё ещё один поток, CPU-bound задачи медленные │
│ │
│ РЕШЕНИЕ 2: Worker (Истинная многопоточность) │
│ ────────────────────────────────────────────────────────── │
│ • Запускаем код в отдельном потоке OS │
│ • Main thread не блокируется │
│ • Используем все ядра CPU │
│ • НО: overhead на создание и коммуникацию │
│ │
└─────────────────────────────────────────────────────────────────┘
Fiber — Кооперативная многозадачность
Fiber — это виртуальные потоки, управляемые Effect runtime. Они работают внутри одного потока OS, но позволяют эффективно чередовать выполнение множества задач.
Как работает Effect Fiber
┌─────────────────────────────────────────────────────────────────┐
│ Effect Runtime │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Fiber Scheduler │ │
│ │ │ │
│ │ Ready Queue: │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Fiber #1│ │Fiber #2│ │Fiber #3│ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ │ │ │
│ │ Suspended (waiting for I/O, sleep, etc.): │ │
│ │ ┌────────┐ ┌────────┐ │ │
│ │ │Fiber #4│ │Fiber #5│ │ │
│ │ └────────┘ └────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌────────────────────┐ │
│ │ Execute Step │ │
│ │ (один Effect) │ │
│ └────────────────────┘ │
│ │ │
│ ┌───────────────┼───────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Complete │ │ Yield │ │ Suspend │ │
│ │ (done) │ │(continue)│ │ (async) │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Точки кооперации
Fiber уступает управление (yield) в определённых точках:
const program = Effect.gen(function* () {
// Точка yield: flatMap
const a = yield* Effect.succeed(1)
// Точка yield: async операция
yield* Effect.sleep("100 millis")
// Точка yield: fork
const fiber = yield* Effect.fork(someEffect)
// Точка yield: any I/O
const data = yield* readFile("data.txt")
return a + 1
})
┌─────────────────────────────────────────────────────────────────┐
│ ТОЧКИ КООПЕРАЦИИ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Автоматические yield points: │
│ ├── Effect.flatMap / yield* │
│ ├── Effect.fork │
│ ├── Effect.sleep │
│ ├── Effect.async (I/O операции) │
│ ├── Effect.yieldNow (явный yield) │
│ └── Interruption check points │
│ │
│ НЕ являются yield points: │
│ ├── Чистые вычисления внутри Effect.sync │
│ ├── Обычные функции │
│ └── Циклы без yield* │
│ │
└─────────────────────────────────────────────────────────────────┘
Пример: Много файберов в одном потоке
// Запускаем 10,000 файберов!
const manyFibers = Effect.gen(function* () {
const fibers = yield* Effect.forEach(
Array.from({ length: 10_000 }, (_, i) => i),
(i) => Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(`${Math.random() * 100} millis`)
return i * 2
})
),
{ concurrency: "unbounded" }
)
// Все 10,000 работают "параллельно" в одном потоке
const results = yield* Effect.forEach(
fibers,
Fiber.join
)
return results.length
})
Effect.runPromise(manyFibers).then(console.log)
// Output: 10000
Преимущества Fiber
┌─────────────────────────────────────────────────────────────────┐
│ ПРЕИМУЩЕСТВА FIBER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Лёгкость создания │
│ • ~200 байт на файбер vs ~1MB на поток OS │
│ • Можно создать миллионы файберов │
│ │
│ 2. Быстрое переключение │
│ • Нет context switch на уровне OS │
│ • Переключение — просто смена указателя │
│ │
│ 3. Shared Memory │
│ • Все файберы видят общую память │
│ • Нет overhead на сериализацию │
│ │
│ 4. Структурная конкурентность │
│ • Автоматический cleanup дочерних файберов │
│ • Гарантированное освобождение ресурсов │
│ │
│ 5. Детерминированное поведение │
│ • Предсказуемый порядок выполнения │
│ • Проще отлаживать │
│ │
└─────────────────────────────────────────────────────────────────┘
Ограничения Fiber
┌─────────────────────────────────────────────────────────────────┐
│ ОГРАНИЧЕНИЯ FIBER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Один поток CPU │
│ • Не использует многоядерность │
│ • CPU-bound задачи последовательны │
│ │
│ 2. Блокирующие операции блокируют ВСЁ │
│ • Синхронный код блокирует все файберы │
│ • Long-running sync computations = проблема │
│ │
│ 3. Кооперативность требует дисциплины │
│ • Если не yield — другие файберы ждут │
│ • Нужно избегать долгих синхронных вычислений │
│ │
└─────────────────────────────────────────────────────────────────┘
Worker — Истинная многопоточность
Worker threads позволяют выполнять JavaScript код в отдельных потоках OS, обеспечивая истинный параллелизм.
Архитектура Worker в Node.js / Bun
┌─────────────────────────────────────────────────────────────────┐
│ Worker Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ Main Thread │ │
│ │ ┌─────────────────────────────────┐ │ │
│ │ │ V8 / JSC Engine │ │ │
│ │ │ ┌────────────────────────────┐ │ │ │
│ │ │ │ Your Application │ │ │ │
│ │ │ │ Effect Runtime │ │ │ │
│ │ │ └────────────────────────────┘ │ │ │
│ │ └─────────────────────────────────┘ │ │
│ └─────────────────────────────────────────┘ │
│ │ │ │ │
│ │ postMessage │ postMessage │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Worker #1 │ │ Worker #2 │ │ Worker #n │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ V8 │ │ │ │ V8 │ │ │ │ V8 │ │ │
│ │ │ Engine │ │ │ │ Engine │ │ │ │ Engine │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ │ (CPU Core) │ │ (CPU Core) │ │ (CPU Core) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Каждый Worker: │
│ • Отдельный V8/JSC instance │
│ • Собственный Event Loop │
│ • Изолированная память │
│ • Собственный Call Stack │
│ │
└─────────────────────────────────────────────────────────────────┘
Базовый пример Worker в Bun
// main.ts
const worker = new Worker("./worker.ts")
worker.postMessage({ type: "compute", n: 45 })
worker.onmessage = (event) => {
console.log("Result:", event.data)
}
// worker.ts
declare const self: Worker
self.onmessage = (event) => {
if (event.data.type === "compute") {
// Тяжёлое вычисление в отдельном потоке
const result = fibonacci(event.data.n)
self.postMessage(result)
}
}
function fibonacci(n: number): number {
if (n < 2) return n
return fibonacci(n - 1) + fibonacci(n - 2)
}
Коммуникация через сообщения
┌─────────────────────────────────────────────────────────────────┐
│ MESSAGE PASSING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Main Thread Worker Thread │
│ ┌─────────────────┐ ┌─────────────────┐ │
│ │ │ postMessage() │ │ │
│ │ const worker = │ ────────────────►│ self.onmessage │ │
│ │ new Worker() │ │ = (event) => { │ │
│ │ │ │ // process │ │
│ │ worker.onmsg = │ ◄────────────────│ postMessage()│ │
│ │ (event) => { │ postMessage() │ } │ │
│ │ // result │ │ │ │
│ │ } │ │ │ │
│ └─────────────────┘ └─────────────────┘ │
│ │
│ Данные копируются через Structured Clone Algorithm: │
│ • Примитивы, объекты, массивы ✓ │
│ • Functions ✗ │
│ • Symbols ✗ │
│ • DOM nodes ✗ │
│ │
│ Или передаются через Transferable Objects: │
│ • ArrayBuffer │
│ • MessagePort │
│ • ImageBitmap (browser) │
│ │
└─────────────────────────────────────────────────────────────────┘
Преимущества Worker
┌─────────────────────────────────────────────────────────────────┐
│ ПРЕИМУЩЕСТВА WORKER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Истинный параллелизм │
│ • Использует все ядра CPU │
│ • CPU-bound задачи выполняются параллельно │
│ │
│ 2. Изоляция │
│ • Ошибка в Worker не crash'ит main thread │
│ • Безопасность памяти │
│ │
│ 3. Не блокирует Main Thread │
│ • UI остаётся responsive │
│ • Сервер продолжает отвечать │
│ │
│ 4. Линейное масштабирование │
│ • N workers ≈ N-кратное ускорение (для parallelizable tasks)│
│ │
└─────────────────────────────────────────────────────────────────┘
Ограничения Worker
┌─────────────────────────────────────────────────────────────────┐
│ ОГРАНИЧЕНИЯ WORKER │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Overhead создания │
│ • ~10-50ms на создание Worker │
│ • ~1-10MB память на каждый Worker │
│ • Ограниченное количество (обычно = CPU cores) │
│ │
│ 2. Сериализация данных │
│ • Structured Clone Algorithm имеет overhead │
│ • Нельзя передавать функции │
│ • Большие данные = большой latency │
│ │
│ 3. Нет shared memory (по умолчанию) │
│ • Каждый Worker — изолированный контекст │
│ • SharedArrayBuffer для shared memory (сложно) │
│ │
│ 4. Сложность кода │
│ • Асинхронная коммуникация │
│ • Отдельные файлы для worker кода │
│ • Сложнее отлаживать │
│ │
└─────────────────────────────────────────────────────────────────┘
Сравнение Fiber и Worker
Таблица сравнения
| Характеристика | Fiber | Worker |
|---|---|---|
| Модель | Кооперативная многозадачность | Истинная многопоточность |
| Потоки OS | 1 | N (по числу Workers) |
| Overhead создания | ~200 байт, микросекунды | ~10-50ms, мегабайты |
| Максимум экземпляров | Миллионы | Десятки |
| CPU-bound задачи | Последовательно | Параллельно |
| I/O-bound задачи | Эффективно | Избыточно |
| Shared memory | Да | Нет (без SharedArrayBuffer) |
| Коммуникация | Мгновенная | Через сообщения |
| Interruption | Мгновенная, кооперативная | Через сообщения |
| Debugging | Простой | Сложный |
| Структурная конкурентность | Встроенная | Нет |
Визуальное сравнение
┌─────────────────────────────────────────────────────────────────┐
│ FIBER vs WORKER: Визуальное сравнение │
├─────────────────────────────────────────────────────────────────┤
│ │
│ FIBER (1000 конкурентных задач) │
│ ───────────────────────────────────────────────────────────── │
│ │
│ Thread 1: ░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │
│ └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘ │
│ Быстрое переключение между задачами │
│ │
│ ───────────────────────────────────────────────────────────── │
│ │
│ WORKER (4 параллельных задачи на 4-core CPU) │
│ ───────────────────────────────────────────────────────────── │
│ │
│ Thread 1: ████████████████████████████████████████ │
│ Thread 2: ████████████████████████████████████████ │
│ Thread 3: ████████████████████████████████████████ │
│ Thread 4: ████████████████████████████████████████ │
│ │ │ │
│ └──────────────────────────────────────┘ │
│ Истинное параллельное выполнение │
│ │
│ Время ─────────────────────────────────────────► │
│ │
└─────────────────────────────────────────────────────────────────┘
Когда использовать Fiber
Идеальные случаи для Fiber
┌─────────────────────────────────────────────────────────────────┐
│ КОГДА FIBER — ЛУЧШИЙ ВЫБОР │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. I/O-bound операции │
│ ├── HTTP запросы │
│ ├── База данных │
│ ├── Файловая система │
│ └── WebSocket соединения │
│ │
│ 2. Координация и оркестрация │
│ ├── Параллельные запросы к API │
│ ├── Retry логика │
│ ├── Timeout и cancellation │
│ └── Rate limiting │
│ │
│ 3. Много лёгких задач │
│ ├── WebSocket connections (тысячи) │
│ ├── Event handlers │
│ └── Background polling │
│ │
│ 4. Когда важна shared memory │
│ ├── Общие caches │
│ ├── Connection pools │
│ └── Shared state │
│ │
│ 5. Структурная конкурентность критична │
│ ├── Graceful shutdown │
│ ├── Resource cleanup │
│ └── Parent-child relationships │
│ │
└─────────────────────────────────────────────────────────────────┘
Примеры кода для Fiber
// ✅ ИДЕАЛЬНО для Fiber: Параллельные HTTP запросы
const fetchMultipleAPIs = Effect.gen(function* () {
const [users, products, orders] = yield* Effect.all([
fetchUsers,
fetchProducts,
fetchOrders
], { concurrency: "unbounded" })
return { users, products, orders }
})
// ✅ ИДЕАЛЬНО для Fiber: WebSocket handler
const handleConnections = Effect.gen(function* () {
const connections = yield* FiberSet.make<void>()
// Каждое соединение — отдельный файбер
for await (const conn of server.connections) {
yield* FiberSet.run(connections, handleConnection(conn))
}
})
// ✅ ИДЕАЛЬНО для Fiber: Retry с backoff
const resilientFetch = fetchData.pipe(
Effect.retry(
Schedule.exponential("100 millis").pipe(
Schedule.intersect(Schedule.recurs(5))
)
)
)
// ✅ ИДЕАЛЬНО для Fiber: Parallel with timeout
const fastestResult = Effect.race(
fetchFromServer1,
fetchFromServer2
).pipe(Effect.timeout("5 seconds"))
Когда использовать Worker
Идеальные случаи для Worker
┌─────────────────────────────────────────────────────────────────┐
│ КОГДА WORKER — ЛУЧШИЙ ВЫБОР │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. CPU-bound вычисления │
│ ├── Криптография │
│ ├── Сжатие данных │
│ ├── Парсинг больших файлов │
│ ├── Image/Video processing │
│ └── Machine Learning inference │
│ │
│ 2. Долгие синхронные операции │
│ ├── JSON.parse больших файлов │
│ ├── Сортировка больших массивов │
│ └── Regex на больших строках │
│ │
│ 3. Параллелизируемые алгоритмы │
│ ├── Map-Reduce │
│ ├── Parallel search │
│ └── Monte Carlo simulations │
│ │
│ 4. Изоляция критичного кода │
│ ├── Untrusted code execution │
│ ├── Sandbox environments │
│ └── Error isolation │
│ │
│ 5. Когда нужен весь CPU │
│ ├── Data processing pipelines │
│ ├── Scientific computing │
│ └── Rendering │
│ │
└─────────────────────────────────────────────────────────────────┘
Примеры кода для Worker (Bun)
// main.ts
// ✅ ИДЕАЛЬНО для Worker: CPU-intensive hashing
const hashPassword = (password: string) =>
Effect.async<string, Error>((resume) => {
const worker = new Worker("./hash-worker.ts")
worker.postMessage({ password })
worker.onmessage = (e) => {
resume(Effect.succeed(e.data.hash))
worker.terminate()
}
worker.onerror = (e) => {
resume(Effect.fail(new Error(e.message)))
worker.terminate()
}
})
// hash-worker.ts
declare const self: Worker
self.onmessage = async (e) => {
const hash = await Bun.password.hash(e.data.password)
self.postMessage({ hash })
}
// ✅ ИДЕАЛЬНО для Worker: Image processing
const resizeImage = (imageBuffer: ArrayBuffer, width: number, height: number) =>
Effect.async<ArrayBuffer, Error>((resume) => {
const worker = new Worker("./image-worker.ts")
// Передаём ArrayBuffer через transfer (zero-copy)
worker.postMessage(
{ imageBuffer, width, height },
[imageBuffer] // Transferable
)
worker.onmessage = (e) => {
resume(Effect.succeed(e.data.resizedBuffer))
worker.terminate()
}
})
// ✅ ИДЕАЛЬНО для Worker: Parallel computation
const parallelSort = <T>(array: ReadonlyArray<T>, workers: number) =>
Effect.gen(function* () {
const chunkSize = Math.ceil(array.length / workers)
const chunks = Array.from(
{ length: workers },
(_, i) => array.slice(i * chunkSize, (i + 1) * chunkSize)
)
// Параллельная сортировка в Worker'ах
const sortedChunks = yield* Effect.all(
chunks.map((chunk) => sortInWorker(chunk)),
{ concurrency: workers }
)
// Merge отсортированных частей
return mergeSort(sortedChunks)
})
Effect Worker API
Effect предоставляет высокоуровневый API для работы с Worker через @effect/platform.
Архитектура Effect Worker
┌─────────────────────────────────────────────────────────────────┐
│ Effect Worker Architecture │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Main Thread │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Worker.makePool │ │ │
│ │ │ │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │Worker #1│ │Worker #2│ │Worker #3│ │ │ │
│ │ │ └─────────┘ └─────────┘ └─────────┘ │ │ │
│ │ │ │ │ │
│ │ │ pool.execute(request) ────────────► │ │ │
│ │ │ ◄──────────────────── Stream<Response> │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ Worker Schema (Type-safe messages): │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Schema.TaggedRequest ───► Request/Response types │ │
│ │ Automatic serialization/deserialization │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Определение Worker с Schema
// Определяем типизированные запросы
class ComputeFibonacci extends Schema.TaggedRequest<ComputeFibonacci>()(
"ComputeFibonacci",
{
failure: Schema.Never,
success: Schema.Number,
payload: { n: Schema.Number }
}
) {}
class HashPassword extends Schema.TaggedRequest<HashPassword>()(
"HashPassword",
{
failure: WorkerError,
success: Schema.String,
payload: { password: Schema.String, rounds: Schema.Number }
}
) {}
// Union всех запросов
type WorkerRequest = ComputeFibonacci | HashPassword
Создание Worker Pool
// Worker Pool layer
const WorkerPoolLive = Worker.makePoolSerialized<WorkerRequest>({
// Путь к worker файлу
spawn: () => new globalThis.Worker("./worker.ts"),
// Размер пула
size: 4,
// Максимум задач на worker
permits: 10
}).pipe(
Layer.provide(BunWorker.layerManager)
)
// Использование
const program = Effect.gen(function* () {
const pool = yield* Worker.SerializedWorkerPool<WorkerRequest>()
// Выполнение запроса
const result = yield* pool.executeEffect(
new ComputeFibonacci({ n: 40 })
)
console.log("Fibonacci(40) =", result)
})
Effect.runPromise(
program.pipe(Effect.provide(WorkerPoolLive))
)
Реализация Worker
// worker.ts
// Те же схемы, что и в main
class ComputeFibonacci extends Schema.TaggedRequest<ComputeFibonacci>()(
"ComputeFibonacci",
{
failure: Schema.Never,
success: Schema.Number,
payload: { n: Schema.Number }
}
) {}
// Fibonacci implementation
const fibonacci = (n: number): number => {
if (n < 2) return n
return fibonacci(n - 1) + fibonacci(n - 2)
}
// Worker runner
const WorkerLive = WorkerRunner.layerSerialized(
ComputeFibonacci,
{
ComputeFibonacci: (req) => Effect.succeed(fibonacci(req.n))
}
)
// Запуск worker
WorkerRunner.launch.pipe(
Layer.provide(WorkerLive),
Layer.provide(BunWorkerRunner.layer),
Layer.launch,
Effect.runFork
)
Streaming результаты
// Запрос, возвращающий Stream
class ProcessLargeFile extends Schema.TaggedRequest<ProcessLargeFile>()(
"ProcessLargeFile",
{
failure: Schema.String,
success: Schema.Chunk(Schema.String), // Chunks of processed lines
payload: { path: Schema.String }
}
) {}
// В main thread
const processFile = (path: string) =>
Effect.gen(function* () {
const pool = yield* Worker.SerializedWorkerPool<ProcessLargeFile>()
// execute возвращает Stream
const stream = pool.execute(new ProcessLargeFile({ path }))
// Обрабатываем chunks по мере поступления
yield* stream.pipe(
Stream.tap((chunk) => Effect.log(`Processed ${chunk.length} lines`)),
Stream.runDrain
)
})
Паттерны гибридного использования
В реальных приложениях часто комбинируют Fiber и Worker для максимальной эффективности.
Паттерн 1: Offload CPU-intensive в Worker
interface ImageProcessor {
readonly resize: (id: string, buffer: ArrayBuffer) => Effect.Effect<ArrayBuffer>
}
const ImageProcessorLive = Effect.gen(function* () {
// Worker pool для CPU-intensive операций
const workerPool = yield* createWorkerPool(4)
return {
resize: (id, buffer) =>
// Отправляем в Worker, результат — Effect
workerPool.execute(new ResizeImage({ id, buffer }))
} satisfies ImageProcessor
})
// HTTP handler использует Fiber для I/O + Worker для CPU
const handleUpload = Effect.gen(function* () {
const processor = yield* ImageProcessor
const storage = yield* StorageService
// I/O в файберах
const uploads = yield* FiberSet.make<void>()
for (const file of request.files) {
yield* FiberSet.run(uploads,
Effect.gen(function* () {
// Read file (I/O) - Fiber
const buffer = yield* readFile(file)
// Resize (CPU) - Worker
const resized = yield* processor.resize(file.id, buffer)
// Upload (I/O) - Fiber
yield* storage.upload(file.id, resized)
})
)
}
yield* FiberSet.join(uploads)
})
Паттерн 2: Worker Pool с Fiber координацией
interface Task<A> {
readonly id: string
readonly execute: Effect.Effect<A>
readonly cpuIntensive: boolean
}
const createHybridExecutor = <A>() =>
Effect.gen(function* () {
// Worker pool для CPU tasks
const workerPool = yield* createWorkerPool(4)
// Fiber set для I/O tasks
const fiberTasks = yield* FiberSet.make<A>()
// Rate limiting для Workers
const workerSemaphore = yield* Semaphore.make(4)
const execute = (task: Task<A>) =>
task.cpuIntensive
? // CPU task → Worker
workerSemaphore.withPermits(1)(
workerPool.execute(task)
)
: // I/O task → Fiber
FiberSet.run(fiberTasks, task.execute).pipe(
Effect.flatMap(Fiber.join)
)
return { execute }
})
Паттерн 3: Graceful degradation
// Попробовать Worker, fallback на Fiber
const computeWithFallback = <A>(
task: Effect.Effect<A>,
workerTask: Effect.Effect<A>,
timeout: Duration.DurationInput
) =>
workerTask.pipe(
// Таймаут на Worker
Effect.timeout(timeout),
// Fallback на Fiber (медленнее, но работает)
Effect.orElse(() => task),
// Логирование
Effect.tap(() => Effect.log("Computation completed"))
)
// Использование
const result = computeWithFallback(
// Fiber version (медленная, но не блокирует)
computeInFiber(data),
// Worker version (быстрая, но с overhead)
computeInWorker(data),
"5 seconds"
)
Производительность и бенчмарки
Когда Fiber быстрее Worker
┌─────────────────────────────────────────────────────────────────┐
│ FIBER БЫСТРЕЕ WORKER КОГДА: │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. Много мелких задач │
│ ───────────────────────────────────────────────────────── │
│ Fiber: 1000 tasks × 0.001ms = 1ms │
│ Worker: 1000 tasks × 1ms (message overhead) = 1000ms │
│ │
│ 2. Небольшие данные для передачи │
│ ───────────────────────────────────────────────────────── │
│ Fiber: Direct memory access = 0ms │
│ Worker: Serialize + Transfer + Deserialize = 1-10ms │
│ │
│ 3. Много shared state │
│ ───────────────────────────────────────────────────────── │
│ Fiber: Direct reads/writes │
│ Worker: Каждый доступ = message roundtrip │
│ │
│ 4. Короткие вычисления (< 10ms) │
│ ───────────────────────────────────────────────────────── │
│ Fiber: 10ms computation │
│ Worker: 10ms + 5ms overhead = 15ms (50% slower!) │
│ │
└─────────────────────────────────────────────────────────────────┘
Когда Worker быстрее Fiber
┌─────────────────────────────────────────────────────────────────┐
│ WORKER БЫСТРЕЕ FIBER КОГДА: │
├─────────────────────────────────────────────────────────────────┤
│ │
│ 1. CPU-bound задачи > 50ms │
│ ───────────────────────────────────────────────────────── │
│ На 4-core CPU: │
│ Fiber: 4 × 100ms sequential = 400ms │
│ Worker: 4 × 100ms parallel = 100ms + overhead ≈ 110ms │
│ │
│ 2. Большие независимые вычисления │
│ ───────────────────────────────────────────────────────── │
│ Processing 1GB data: │
│ Fiber: 1 thread = X seconds │
│ Worker: 4 threads = X/4 seconds (linear scaling) │
│ │
│ 3. Blocking operations неизбежны │
│ ───────────────────────────────────────────────────────── │
│ Fiber: Blocks ALL fibers │
│ Worker: Only blocks that worker │
│ │
└─────────────────────────────────────────────────────────────────┘
Бенчмарк: Fibonacci
// Бенчмарк: вычисление Fibonacci(40)
// На типичном CPU занимает ~1-2 секунды
// FIBER: Последовательно
const fiberBenchmark = Effect.gen(function* () {
const start = Date.now()
yield* Effect.all([
computeFibonacciFiber(40),
computeFibonacciFiber(40),
computeFibonacciFiber(40),
computeFibonacciFiber(40)
])
console.log(`Fiber: ${Date.now() - start}ms`)
// Результат: ~4-8 секунд (последовательно)
})
// WORKER: Параллельно
const workerBenchmark = Effect.gen(function* () {
const start = Date.now()
const pool = yield* WorkerPool
yield* Effect.all([
pool.execute(new Fibonacci({ n: 40 })),
pool.execute(new Fibonacci({ n: 40 })),
pool.execute(new Fibonacci({ n: 40 })),
pool.execute(new Fibonacci({ n: 40 }))
])
console.log(`Worker: ${Date.now() - start}ms`)
// Результат: ~1-2 секунды (параллельно на 4 cores)
})
Best Practices
Decision Tree
┌─────────────────────────────────────────────────────────────────┐
│ DECISION TREE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Задача требует вычислений? │
│ │ │
│ ├── НЕТ (I/O bound) ────────────────────► FIBER │
│ │ • HTTP requests │
│ │ • Database queries │
│ │ • File reads (small) │
│ │ │
│ └── ДА (CPU bound) │
│ │ │
│ └── Вычисление занимает > 50ms? │
│ │ │
│ ├── НЕТ ────────────────────────► FIBER │
│ │ • Overhead Worker > выигрыш │
│ │ │
│ └── ДА │
│ │ │
│ └── Можно параллелить? │
│ │ │
│ ├── ДА ─────────────────► WORKER │
│ │ • Image processing │
│ │ • Data transformation │
│ │ • Cryptography │
│ │ │
│ └── НЕТ │
│ │ │
│ └── Блокирует Event Loop? │
│ │ │
│ ├── ДА ─────────► WORKER │
│ │ • Нельзя блокировать main │
│ │ │
│ └── НЕТ ────────► FIBER │
│ • Можно разбить на chunks │
│ │
└─────────────────────────────────────────────────────────────────┘
Guidelines
┌─────────────────────────────────────────────────────────────────┐
│ BEST PRACTICES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ FIBER: │
│ ──────────────────────────────────────────────────────────── │
│ ✓ Используй для I/O bound операций │
│ ✓ Используй для координации и оркестрации │
│ ✓ Используй для структурной конкурентности │
│ ✓ Используй для graceful shutdown │
│ ✗ Не выполняй CPU-intensive код > 50ms │
│ ✗ Не используй blocking APIs │
│ │
│ WORKER: │
│ ──────────────────────────────────────────────────────────── │
│ ✓ Используй для CPU-bound вычислений │
│ ✓ Используй Worker Pool для амортизации overhead │
│ ✓ Используй Transferable Objects для больших данных │
│ ✓ Используй Schema для type-safe коммуникации │
│ ✗ Не создавай Worker для каждой мелкой задачи │
│ ✗ Не передавай огромные объекты через postMessage │
│ │
│ ГИБРИД: │
│ ──────────────────────────────────────────────────────────── │
│ ✓ I/O координация в Fiber, CPU в Worker │
│ ✓ Worker Pool + Fiber для task distribution │
│ ✓ Fallback стратегии (Worker → Fiber) │
│ │
└─────────────────────────────────────────────────────────────────┘
Примеры
Пример 1: Image Processing Pipeline
// Schema для Worker
class ResizeImage extends Schema.TaggedRequest<ResizeImage>()(
"ResizeImage",
{
failure: Schema.String,
success: Schema.instanceOf(ArrayBuffer),
payload: {
buffer: Schema.instanceOf(ArrayBuffer),
width: Schema.Number,
height: Schema.Number
}
}
) {}
// Image Processing Service
interface ImageService {
readonly processImages: (
images: ReadonlyArray<{ id: string; buffer: ArrayBuffer }>
) => Effect.Effect<void>
}
const ImageServiceLive = Layer.effect(
ImageService,
Effect.gen(function* () {
// Worker pool для resize (CPU-intensive)
const workerPool = yield* Worker.SerializedWorkerPool<ResizeImage>()
// Storage service для upload (I/O)
const storage = yield* StorageService
return {
processImages: (images) =>
Effect.gen(function* () {
const uploads = yield* FiberSet.make<void>()
for (const image of images) {
yield* FiberSet.run(uploads,
Effect.gen(function* () {
// 1. Resize в Worker (CPU)
const resized = yield* workerPool.executeEffect(
new ResizeImage({
buffer: image.buffer,
width: 800,
height: 600
})
)
// 2. Upload в Fiber (I/O)
yield* storage.upload(image.id, resized)
yield* Effect.log(`Processed: ${image.id}`)
})
)
}
yield* FiberSet.join(uploads)
})
}
})
)
Пример 2: Data Processing с Partitioning
interface DataProcessor {
readonly process: (data: ReadonlyArray<number>) => Effect.Effect<number>
}
// Гибридный процессор: разбивает данные для параллельной обработки
const createDataProcessor = (workerCount: number) =>
Effect.gen(function* () {
const workerPool = yield* createWorkerPool(workerCount)
return {
process: (data) =>
Effect.gen(function* () {
// Разбиваем на chunks для Workers
const chunkSize = Math.ceil(data.length / workerCount)
const chunks = Chunk.chunksOf(Chunk.fromIterable(data), chunkSize)
// Параллельная обработка в Workers
const partialResults = yield* Effect.forEach(
chunks,
(chunk) => workerPool.execute(
new ProcessChunk({ data: Chunk.toArray(chunk) })
),
{ concurrency: workerCount }
)
// Aggregation в Fiber (быстро)
return partialResults.reduce((a, b) => a + b, 0)
})
} satisfies DataProcessor
})
Пример 3: Real-time Analytics
interface AnalyticsService {
readonly trackEvent: (event: Event) => Effect.Effect<void>
readonly getStats: Effect.Effect<Stats>
}
const AnalyticsServiceLive = Effect.gen(function* () {
// Queue для событий (Fiber)
const eventQueue = yield* Queue.unbounded<Event>()
// Worker для агрегации (CPU)
const aggregationWorker = yield* createAggregationWorker()
// Периодическая агрегация
const aggregatorFiber = yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep("1 second")
// Забираем все события
const events = yield* Queue.takeAll(eventQueue)
if (Chunk.size(events) > 0) {
// Агрегируем в Worker (CPU-intensive)
yield* aggregationWorker.execute(
new AggregateEvents({ events: Chunk.toArray(events) })
)
}
})
)
)
return {
trackEvent: (event) => Queue.offer(eventQueue, event),
getStats: aggregationWorker.getStats()
}
})
Упражнения
Упражнение 1: Определить подход
Для каждой задачи определите, что лучше использовать — Fiber или Worker:
- HTTP запросы к 100 разным API
- Сжатие 1GB файла
- WebSocket сервер на 10,000 соединений
- Парсинг 100MB JSON
- Rate-limited API polling
- HTTP запросы к 100 API → FIBER
- I/O bound, не CPU intensive
- Нужна координация и timeout
- Сжатие 1GB файла → WORKER
- CPU intensive
- Заблокирует Event Loop
- WebSocket на 10,000 соединений → FIBER
- I/O bound
- Много лёгких задач
- Shared state (connections)
- Парсинг 100MB JSON → WORKER
- CPU intensive
- JSON.parse блокирующий
- Rate-limited API polling → FIBER
- I/O bound
- Нужна координация (rate limiting)
Упражнение 2: Hybrid Processor
Реализуйте процессор, который выбирает между Fiber и Worker в зависимости от размера данных.
import { Effect } from "effect"
interface Processor {
readonly process: (data: ReadonlyArray<number>) => Effect.Effect<number>
}
const createHybridProcessor = (): Effect.Effect<Processor, never, Scope.Scope> =>
???
// Если data.length < 1000 → Fiber
// Если data.length >= 1000 → Workerimport { Effect, Scope } from "effect"
interface Processor {
readonly process: (data: ReadonlyArray<number>) => Effect.Effect<number>
}
const THRESHOLD = 1000
const createHybridProcessor = (): Effect.Effect<Processor, never, Scope.Scope> =>
Effect.gen(function* () {
// Worker pool для больших данных
const workerPool = yield* createWorkerPool(4)
// Fiber implementation
const processInFiber = (data: ReadonlyArray<number>) =>
Effect.gen(function* () {
// Разбиваем на chunks для кооперации
let sum = 0
for (let i = 0; i < data.length; i++) {
sum += data[i]! * 2 // Какая-то обработка
// Yield каждые 100 элементов
if (i % 100 === 0) {
yield* Effect.yieldNow()
}
}
return sum
})
// Worker implementation
const processInWorker = (data: ReadonlyArray<number>) =>
workerPool.executeEffect(new ProcessData({ data }))
return {
process: (data) =>
data.length < THRESHOLD
? processInFiber(data)
: processInWorker(data)
}
})Упражнение 3: Worker Pool с Backpressure
Реализуйте Worker Pool с ограничением очереди задач.
import { Effect, Queue, Semaphore, Scope } from "effect"
interface BoundedWorkerPool<Req, Res> {
readonly execute: (request: Req) => Effect.Effect<Res, QueueFullError>
readonly stats: Effect.Effect<{ pending: number; active: number }>
}
class QueueFullError {
readonly _tag = "QueueFullError"
}
const createBoundedWorkerPool = <Req, Res>(config: {
readonly workers: number
readonly maxQueueSize: number
readonly handler: (req: Req) => Effect.Effect<Res>
}): Effect.Effect<BoundedWorkerPool<Req, Res>, never, Scope.Scope> =>
???import { Effect, Queue, Semaphore, Ref, FiberSet, Scope, Deferred } from "effect"
interface BoundedWorkerPool<Req, Res> {
readonly execute: (request: Req) => Effect.Effect<Res, QueueFullError>
readonly stats: Effect.Effect<{ pending: number; active: number }>
}
class QueueFullError {
readonly _tag = "QueueFullError"
}
interface Task<Req, Res> {
readonly request: Req
readonly deferred: Deferred.Deferred<Res, never>
}
const createBoundedWorkerPool = <Req, Res>(config: {
readonly workers: number
readonly maxQueueSize: number
readonly handler: (req: Req) => Effect.Effect<Res>
}): Effect.Effect<BoundedWorkerPool<Req, Res>, never, Scope.Scope> =>
Effect.gen(function* () {
// Bounded queue
const taskQueue = yield* Queue.bounded<Task<Req, Res>>(config.maxQueueSize)
// Active tasks counter
const activeCount = yield* Ref.make(0)
// Workers
const workers = yield* FiberSet.make<void>()
// Создаём workers
for (let i = 0; i < config.workers; i++) {
yield* FiberSet.run(workers,
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(taskQueue)
yield* Ref.update(activeCount, (n) => n + 1)
const result = yield* config.handler(task.request)
yield* Deferred.succeed(task.deferred, result)
yield* Ref.update(activeCount, (n) => n - 1)
})
).pipe(Effect.interruptible)
)
}
return {
execute: (request) =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<Res, never>()
// Try to offer (fails if queue full)
const offered = yield* Queue.offer(taskQueue, { request, deferred }).pipe(
Effect.timeout("0 millis"),
Effect.option
)
if (Option.isNone(offered)) {
return yield* Effect.fail(new QueueFullError())
}
return yield* Deferred.await(deferred)
}),
stats: Effect.gen(function* () {
const pending = yield* Queue.size(taskQueue)
const active = yield* Ref.get(activeCount)
return { pending, active }
})
}
})
import { Option } from "effect"Заключение
Выбор между Fiber и Worker — это не “или/или”, а “когда что”:
Fiber — ваш default choice:
- Большинство веб-приложений I/O bound
- Легковесность позволяет масштабировать конкурентность
- Структурная конкурентность упрощает управление ресурсами
Worker — для CPU-intensive задач:
- Когда нужен истинный параллелизм
- Когда вычисления занимают > 50ms
- Когда нельзя блокировать Event Loop
Гибридный подход — для production систем:
- Fiber для координации и I/O
- Worker для CPU-intensive offloading
- Effect предоставляет инструменты для обоих
Ключевые метрики для принятия решения:
- Время выполнения задачи
- Размер передаваемых данных
- Количество конкурентных задач
- Требования к latency
Effect-ts делает работу с обоими подходами удобной и type-safe, позволяя фокусироваться на бизнес-логике, а не на низкоуровневых деталях конкурентности.