Effect Курс Параллелизм vs Многопоточность

Параллелизм vs Многопоточность

Fiber, Worker и когда что использовать.

Введение в модели конкурентности

Прежде чем разбирать Fiber и Worker, важно понять фундаментальные различия между конкурентностью и параллелизмом.

Конкурентность vs Параллелизм

┌─────────────────────────────────────────────────────────────────┐
│              КОНКУРЕНТНОСТЬ vs ПАРАЛЛЕЛИЗМ                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  КОНКУРЕНТНОСТЬ (Concurrency)                                   │
│  "Dealing with multiple things at once"                         │
│                                                                 │
│  ┌────────┐     ┌────────┐     ┌────────┐                       │
│  │ Task A │────►│ Task B │────►│ Task A │────► ...              │
│  └────────┘     └────────┘     └────────┘                       │
│                     │                                           │
│                     ▼                                           │
│              Один процессор                                     │
│              переключается между задачами                       │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  ПАРАЛЛЕЛИЗМ (Parallelism)                                      │
│  "Doing multiple things at once"                                │
│                                                                 │
│  Core 1: ┌────────────────────────────────────┐                 │
│          │           Task A                   │                 │
│          └────────────────────────────────────┘                 │
│                                                                 │
│  Core 2: ┌────────────────────────────────────┐                 │
│          │           Task B                   │                 │
│          └────────────────────────────────────┘                 │
│                     │                                           │
│                     ▼                                           │
│              Несколько процессоров                              │
│              работают одновременно                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Терминология

┌─────────────────────────────────────────────────────────────────┐
│                       ТЕРМИНОЛОГИЯ                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Процесс (Process)                                              │
│  ├── Изолированное адресное пространство                        │
│  ├── Собственная память                                         │
│  └── Тяжёлый для создания                                       │
│                                                                 │
│  Поток (Thread)                                                 │
│  ├── Разделяет память с другими потоками процесса               │
│  ├── Собственный стек вызовов                                   │
│  └── Управляется операционной системой                          │
│                                                                 │
│  Fiber (Green Thread / Lightweight Thread)                      │
│  ├── Виртуальный поток, управляемый runtime                     │
│  ├── Очень лёгкий (~200 байт)                                   │
│  └── Кооперативная многозадачность                              │
│                                                                 │
│  Worker                                                         │
│  ├── Отдельный поток OS (в Node.js/Bun)                         │
│  ├── Изолированный контекст V8/JavaScriptCore                   │
│  └── Коммуникация через сообщения                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

JavaScript Event Loop и его ограничения

JavaScript исторически однопоточен. Понимание Event Loop критично для выбора между Fiber и Worker.

Архитектура Event Loop

┌─────────────────────────────────────────────────────────────────┐
│                    JavaScript Runtime                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    Call Stack                           │    │
│  │  ┌─────────┐                                            │    │
│  │  │ func3() │ ◄── Текущая функция                        │    │
│  │  ├─────────┤                                            │    │
│  │  │ func2() │                                            │    │
│  │  ├─────────┤                                            │    │
│  │  │ func1() │                                            │    │
│  │  └─────────┘                                            │    │
│  └─────────────────────────────────────────────────────────┘    │
│                          │                                      │
│                          ▼                                      │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                    Event Loop                           │    │
│  │                                                         │    │
│  │   while (true) {                                        │    │
│  │     if (callStack.isEmpty()) {                          │    │
│  │       if (microtaskQueue.hasItems()) {                  │    │
│  │         callStack.push(microtaskQueue.dequeue())        │    │
│  │       } else if (macrotaskQueue.hasItems()) {           │    │
│  │         callStack.push(macrotaskQueue.dequeue())        │    │
│  │       }                                                 │    │
│  │     }                                                   │    │
│  │   }                                                     │    │
│  │                                                         │    │
│  └─────────────────────────────────────────────────────────┘    │
│                          │                                      │
│          ┌───────────────┴───────────────┐                      │
│          │                               │                      │
│          ▼                               ▼                      │
│  ┌───────────────┐               ┌───────────────┐              │
│  │  Microtasks   │               │  Macrotasks   │              │
│  │  (Promises)   │               │ (setTimeout)  │              │
│  │  (queueMicro) │               │ (setInterval) │              │
│  └───────────────┘               │ (I/O events)  │              │
│                                  └───────────────┘              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Проблема блокировки Event Loop

// ❌ ПЛОХО: Блокирует Event Loop
function computeFibonacci(n: number): number {
  if (n < 2) return n
  return computeFibonacci(n - 1) + computeFibonacci(n - 2)
}

// Это заблокирует ВСЕ асинхронные операции на ~секунды
const result = computeFibonacci(45)
console.log(result)

// HTTP запросы, таймеры, I/O — всё ждёт!
┌─────────────────────────────────────────────────────────────────┐
│                БЛОКИРОВКА EVENT LOOP                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Время ────────────────────────────────────────────────────►    │
│                                                                 │
│  Call Stack:                                                    │
│  ┌────────────────────────────────────────────────────────┐     │
│  │              fib(45) — БЛОКИРУЕТ                       │     │
│  └────────────────────────────────────────────────────────┘     │
│                                                                 │
│  Очередь задач (заблокирована):                                 │
│  ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐                                │
│  │HTTP │ │Timer│ │ I/O │ │Event│  ◄── Ждут, пока fib завершится │
│  └─────┘ └─────┘ └─────┘ └─────┘                                │
│                                                                 │
│  Результат:                                                     │
│  • UI заморожен (в браузере)                                    │
│  • Сервер не отвечает на запросы                                │
│  • Таймеры задерживаются                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Два решения проблемы

┌─────────────────────────────────────────────────────────────────┐
│                    ДВА РЕШЕНИЯ                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  РЕШЕНИЕ 1: Fiber (Кооперативная многозадачность)               │
│  ──────────────────────────────────────────────────────────     │
│  • Разбиваем долгие вычисления на мелкие части                  │
│  • Добровольно уступаем управление (yield)                      │
│  • Event Loop может обрабатывать другие задачи                  │
│  • НО: всё ещё один поток, CPU-bound задачи медленные           │
│                                                                 │
│  РЕШЕНИЕ 2: Worker (Истинная многопоточность)                   │
│  ──────────────────────────────────────────────────────────     │
│  • Запускаем код в отдельном потоке OS                          │
│  • Main thread не блокируется                                   │
│  • Используем все ядра CPU                                      │
│  • НО: overhead на создание и коммуникацию                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Fiber — Кооперативная многозадачность

Fiber — это виртуальные потоки, управляемые Effect runtime. Они работают внутри одного потока OS, но позволяют эффективно чередовать выполнение множества задач.

Как работает Effect Fiber

┌─────────────────────────────────────────────────────────────────┐
│                    Effect Runtime                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                  Fiber Scheduler                        │    │
│  │                                                         │    │
│  │   Ready Queue:                                          │    │
│  │   ┌────────┐ ┌────────┐ ┌────────┐                      │    │
│  │   │Fiber #1│ │Fiber #2│ │Fiber #3│                      │    │
│  │   └────────┘ └────────┘ └────────┘                      │    │
│  │                                                         │    │
│  │   Suspended (waiting for I/O, sleep, etc.):             │    │
│  │   ┌────────┐ ┌────────┐                                 │    │
│  │   │Fiber #4│ │Fiber #5│                                 │    │
│  │   └────────┘ └────────┘                                 │    │
│  │                                                         │    │
│  └─────────────────────────────────────────────────────────┘    │
│                          │                                      │
│                          ▼                                      │
│              ┌────────────────────┐                             │
│              │   Execute Step     │                             │
│              │   (один Effect)    │                             │
│              └────────────────────┘                             │
│                          │                                      │
│          ┌───────────────┼───────────────┐                      │
│          ▼               ▼               ▼                      │
│    ┌──────────┐    ┌──────────┐    ┌──────────┐                 │
│    │ Complete │    │  Yield   │    │ Suspend  │                 │
│    │ (done)   │    │(continue)│    │ (async)  │                 │
│    └──────────┘    └──────────┘    └──────────┘                 │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Точки кооперации

Fiber уступает управление (yield) в определённых точках:


const program = Effect.gen(function* () {
  // Точка yield: flatMap
  const a = yield* Effect.succeed(1)
  
  // Точка yield: async операция
  yield* Effect.sleep("100 millis")
  
  // Точка yield: fork
  const fiber = yield* Effect.fork(someEffect)
  
  // Точка yield: any I/O
  const data = yield* readFile("data.txt")
  
  return a + 1
})
┌─────────────────────────────────────────────────────────────────┐
│                    ТОЧКИ КООПЕРАЦИИ                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Автоматические yield points:                                   │
│  ├── Effect.flatMap / yield*                                   │
│  ├── Effect.fork                                               │
│  ├── Effect.sleep                                              │
│  ├── Effect.async (I/O операции)                               │
│  ├── Effect.yieldNow (явный yield)                             │
│  └── Interruption check points                                 │
│                                                                 │
│  НЕ являются yield points:                                      │
│  ├── Чистые вычисления внутри Effect.sync                      │
│  ├── Обычные функции                                           │
│  └── Циклы без yield*                                          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Пример: Много файберов в одном потоке


// Запускаем 10,000 файберов!
const manyFibers = Effect.gen(function* () {
  const fibers = yield* Effect.forEach(
    Array.from({ length: 10_000 }, (_, i) => i),
    (i) => Effect.fork(
      Effect.gen(function* () {
        yield* Effect.sleep(`${Math.random() * 100} millis`)
        return i * 2
      })
    ),
    { concurrency: "unbounded" }
  )
  
  // Все 10,000 работают "параллельно" в одном потоке
  const results = yield* Effect.forEach(
    fibers,
    Fiber.join
  )
  
  return results.length
})

Effect.runPromise(manyFibers).then(console.log)
// Output: 10000

Преимущества Fiber

┌─────────────────────────────────────────────────────────────────┐
│                    ПРЕИМУЩЕСТВА FIBER                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Лёгкость создания                                           │
│     • ~200 байт на файбер vs ~1MB на поток OS                   │
│     • Можно создать миллионы файберов                           │
│                                                                 │
│  2. Быстрое переключение                                        │
│     • Нет context switch на уровне OS                           │
│     • Переключение — просто смена указателя                     │
│                                                                 │
│  3. Shared Memory                                               │
│     • Все файберы видят общую память                            │
│     • Нет overhead на сериализацию                              │
│                                                                 │
│  4. Структурная конкурентность                                  │
│     • Автоматический cleanup дочерних файберов                  │
│     • Гарантированное освобождение ресурсов                     │
│                                                                 │
│  5. Детерминированное поведение                                 │
│     • Предсказуемый порядок выполнения                          │
│     • Проще отлаживать                                          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Ограничения Fiber

┌─────────────────────────────────────────────────────────────────┐
│                    ОГРАНИЧЕНИЯ FIBER                            │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Один поток CPU                                              │
│     • Не использует многоядерность                              │
│     • CPU-bound задачи последовательны                          │
│                                                                 │
│  2. Блокирующие операции блокируют ВСЁ                          │
│     • Синхронный код блокирует все файберы                      │
│     • Long-running sync computations = проблема                 │
│                                                                 │
│  3. Кооперативность требует дисциплины                          │
│     • Если не yield — другие файберы ждут                       │
│     • Нужно избегать долгих синхронных вычислений               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Worker — Истинная многопоточность

Worker threads позволяют выполнять JavaScript код в отдельных потоках OS, обеспечивая истинный параллелизм.

Архитектура Worker в Node.js / Bun

┌─────────────────────────────────────────────────────────────────┐
│                    Worker Architecture                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────────────────────────────┐                    │
│  │           Main Thread                    │                    │
│  │  ┌─────────────────────────────────┐    │                    │
│  │  │       V8 / JSC Engine            │    │                    │
│  │  │  ┌────────────────────────────┐ │    │                    │
│  │  │  │    Your Application        │ │    │                    │
│  │  │  │    Effect Runtime          │ │    │                    │
│  │  │  └────────────────────────────┘ │    │                    │
│  │  └─────────────────────────────────┘    │                    │
│  └─────────────────────────────────────────┘                    │
│           │              │              │                        │
│           │ postMessage  │ postMessage  │                        │
│           ▼              ▼              ▼                        │
│  ┌─────────────┐ ┌─────────────┐ ┌─────────────┐                │
│  │  Worker #1  │ │  Worker #2  │ │  Worker #n  │                │
│  │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │                │
│  │ │   V8    │ │ │ │   V8    │ │ │ │   V8    │ │                │
│  │ │ Engine  │ │ │ │ Engine  │ │ │ │ Engine  │ │                │
│  │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │                │
│  │ (CPU Core) │ │ (CPU Core) │ │ (CPU Core) │                │
│  └─────────────┘ └─────────────┘ └─────────────┘                │
│                                                                 │
│  Каждый Worker:                                                 │
│  • Отдельный V8/JSC instance                                    │
│  • Собственный Event Loop                                       │
│  • Изолированная память                                         │
│  • Собственный Call Stack                                       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Базовый пример Worker в Bun

// main.ts
const worker = new Worker("./worker.ts")

worker.postMessage({ type: "compute", n: 45 })

worker.onmessage = (event) => {
  console.log("Result:", event.data)
}

// worker.ts
declare const self: Worker

self.onmessage = (event) => {
  if (event.data.type === "compute") {
    // Тяжёлое вычисление в отдельном потоке
    const result = fibonacci(event.data.n)
    self.postMessage(result)
  }
}

function fibonacci(n: number): number {
  if (n < 2) return n
  return fibonacci(n - 1) + fibonacci(n - 2)
}

Коммуникация через сообщения

┌─────────────────────────────────────────────────────────────────┐
│                MESSAGE PASSING                                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Main Thread                          Worker Thread             │
│  ┌─────────────────┐                  ┌─────────────────┐       │
│  │                 │  postMessage()   │                 │       │
│  │  const worker = │ ────────────────►│  self.onmessage │       │
│  │    new Worker() │                  │  = (event) => { │       │
│  │                 │                  │    // process   │       │
│  │  worker.onmsg = │ ◄────────────────│    postMessage()│       │
│  │  (event) => {   │  postMessage()   │  }              │       │
│  │    // result    │                  │                 │       │
│  │  }              │                  │                 │       │
│  └─────────────────┘                  └─────────────────┘       │
│                                                                 │
│  Данные копируются через Structured Clone Algorithm:            │
│  • Примитивы, объекты, массивы ✓                                │
│  • Functions ✗                                                  │
│  • Symbols ✗                                                    │
│  • DOM nodes ✗                                                  │
│                                                                 │
│  Или передаются через Transferable Objects:                     │
│  • ArrayBuffer                                                  │
│  • MessagePort                                                  │
│  • ImageBitmap (browser)                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Преимущества Worker

┌─────────────────────────────────────────────────────────────────┐
│                    ПРЕИМУЩЕСТВА WORKER                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Истинный параллелизм                                        │
│     • Использует все ядра CPU                                   │
│     • CPU-bound задачи выполняются параллельно                  │
│                                                                 │
│  2. Изоляция                                                    │
│     • Ошибка в Worker не crash'ит main thread                   │
│     • Безопасность памяти                                       │
│                                                                 │
│  3. Не блокирует Main Thread                                    │
│     • UI остаётся responsive                                    │
│     • Сервер продолжает отвечать                                │
│                                                                 │
│  4. Линейное масштабирование                                    │
│     • N workers ≈ N-кратное ускорение (для parallelizable tasks)│
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Ограничения Worker

┌─────────────────────────────────────────────────────────────────┐
│                    ОГРАНИЧЕНИЯ WORKER                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Overhead создания                                           │
│     • ~10-50ms на создание Worker                               │
│     • ~1-10MB память на каждый Worker                           │
│     • Ограниченное количество (обычно = CPU cores)              │
│                                                                 │
│  2. Сериализация данных                                         │
│     • Structured Clone Algorithm имеет overhead                 │
│     • Нельзя передавать функции                                 │
│     • Большие данные = большой latency                          │
│                                                                 │
│  3. Нет shared memory (по умолчанию)                            │
│     • Каждый Worker — изолированный контекст                    │
│     • SharedArrayBuffer для shared memory (сложно)              │
│                                                                 │
│  4. Сложность кода                                              │
│     • Асинхронная коммуникация                                  │
│     • Отдельные файлы для worker кода                           │
│     • Сложнее отлаживать                                        │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Сравнение Fiber и Worker

Таблица сравнения

ХарактеристикаFiberWorker
МодельКооперативная многозадачностьИстинная многопоточность
Потоки OS1N (по числу Workers)
Overhead создания~200 байт, микросекунды~10-50ms, мегабайты
Максимум экземпляровМиллионыДесятки
CPU-bound задачиПоследовательноПараллельно
I/O-bound задачиЭффективноИзбыточно
Shared memoryДаНет (без SharedArrayBuffer)
КоммуникацияМгновеннаяЧерез сообщения
InterruptionМгновенная, кооперативнаяЧерез сообщения
DebuggingПростойСложный
Структурная конкурентностьВстроеннаяНет

Визуальное сравнение

┌─────────────────────────────────────────────────────────────────┐
│            FIBER vs WORKER: Визуальное сравнение                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  FIBER (1000 конкурентных задач)                                │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  Thread 1: ░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓░░▓▓             │
│            │  │  │  │  │  │  │  │  │  │  │  │  │  │  │          │
│            └──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┘          │
│                 Быстрое переключение между задачами             │
│                                                                 │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  WORKER (4 параллельных задачи на 4-core CPU)                   │
│  ─────────────────────────────────────────────────────────────  │
│                                                                 │
│  Thread 1: ████████████████████████████████████████             │
│  Thread 2: ████████████████████████████████████████             │
│  Thread 3: ████████████████████████████████████████             │
│  Thread 4: ████████████████████████████████████████             │
│            │                                      │             │
│            └──────────────────────────────────────┘             │
│                 Истинное параллельное выполнение                │
│                                                                 │
│  Время ─────────────────────────────────────────►               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Когда использовать Fiber

Идеальные случаи для Fiber

┌─────────────────────────────────────────────────────────────────┐
│                    КОГДА FIBER — ЛУЧШИЙ ВЫБОР                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. I/O-bound операции                                          │
│     ├── HTTP запросы                                            │
│     ├── База данных                                             │
│     ├── Файловая система                                        │
│     └── WebSocket соединения                                    │
│                                                                 │
│  2. Координация и оркестрация                                   │
│     ├── Параллельные запросы к API                              │
│     ├── Retry логика                                            │
│     ├── Timeout и cancellation                                  │
│     └── Rate limiting                                           │
│                                                                 │
│  3. Много лёгких задач                                          │
│     ├── WebSocket connections (тысячи)                          │
│     ├── Event handlers                                          │
│     └── Background polling                                      │
│                                                                 │
│  4. Когда важна shared memory                                   │
│     ├── Общие caches                                            │
│     ├── Connection pools                                        │
│     └── Shared state                                            │
│                                                                 │
│  5. Структурная конкурентность критична                         │
│     ├── Graceful shutdown                                       │
│     ├── Resource cleanup                                        │
│     └── Parent-child relationships                              │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Примеры кода для Fiber


// ✅ ИДЕАЛЬНО для Fiber: Параллельные HTTP запросы
const fetchMultipleAPIs = Effect.gen(function* () {
  const [users, products, orders] = yield* Effect.all([
    fetchUsers,
    fetchProducts,
    fetchOrders
  ], { concurrency: "unbounded" })
  
  return { users, products, orders }
})

// ✅ ИДЕАЛЬНО для Fiber: WebSocket handler
const handleConnections = Effect.gen(function* () {
  const connections = yield* FiberSet.make<void>()
  
  // Каждое соединение — отдельный файбер
  for await (const conn of server.connections) {
    yield* FiberSet.run(connections, handleConnection(conn))
  }
})

// ✅ ИДЕАЛЬНО для Fiber: Retry с backoff
const resilientFetch = fetchData.pipe(
  Effect.retry(
    Schedule.exponential("100 millis").pipe(
      Schedule.intersect(Schedule.recurs(5))
    )
  )
)

// ✅ ИДЕАЛЬНО для Fiber: Parallel with timeout
const fastestResult = Effect.race(
  fetchFromServer1,
  fetchFromServer2
).pipe(Effect.timeout("5 seconds"))

Когда использовать Worker

Идеальные случаи для Worker

┌─────────────────────────────────────────────────────────────────┐
│                    КОГДА WORKER — ЛУЧШИЙ ВЫБОР                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. CPU-bound вычисления                                        │
│     ├── Криптография                                            │
│     ├── Сжатие данных                                           │
│     ├── Парсинг больших файлов                                  │
│     ├── Image/Video processing                                  │
│     └── Machine Learning inference                              │
│                                                                 │
│  2. Долгие синхронные операции                                  │
│     ├── JSON.parse больших файлов                               │
│     ├── Сортировка больших массивов                             │
│     └── Regex на больших строках                                │
│                                                                 │
│  3. Параллелизируемые алгоритмы                                 │
│     ├── Map-Reduce                                              │
│     ├── Parallel search                                         │
│     └── Monte Carlo simulations                                 │
│                                                                 │
│  4. Изоляция критичного кода                                    │
│     ├── Untrusted code execution                                │
│     ├── Sandbox environments                                    │
│     └── Error isolation                                         │
│                                                                 │
│  5. Когда нужен весь CPU                                        │
│     ├── Data processing pipelines                               │
│     ├── Scientific computing                                    │
│     └── Rendering                                               │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Примеры кода для Worker (Bun)

// main.ts

// ✅ ИДЕАЛЬНО для Worker: CPU-intensive hashing
const hashPassword = (password: string) =>
  Effect.async<string, Error>((resume) => {
    const worker = new Worker("./hash-worker.ts")
    
    worker.postMessage({ password })
    
    worker.onmessage = (e) => {
      resume(Effect.succeed(e.data.hash))
      worker.terminate()
    }
    
    worker.onerror = (e) => {
      resume(Effect.fail(new Error(e.message)))
      worker.terminate()
    }
  })

// hash-worker.ts
declare const self: Worker

self.onmessage = async (e) => {
  const hash = await Bun.password.hash(e.data.password)
  self.postMessage({ hash })
}

// ✅ ИДЕАЛЬНО для Worker: Image processing
const resizeImage = (imageBuffer: ArrayBuffer, width: number, height: number) =>
  Effect.async<ArrayBuffer, Error>((resume) => {
    const worker = new Worker("./image-worker.ts")
    
    // Передаём ArrayBuffer через transfer (zero-copy)
    worker.postMessage(
      { imageBuffer, width, height },
      [imageBuffer] // Transferable
    )
    
    worker.onmessage = (e) => {
      resume(Effect.succeed(e.data.resizedBuffer))
      worker.terminate()
    }
  })

// ✅ ИДЕАЛЬНО для Worker: Parallel computation
const parallelSort = <T>(array: ReadonlyArray<T>, workers: number) =>
  Effect.gen(function* () {
    const chunkSize = Math.ceil(array.length / workers)
    const chunks = Array.from(
      { length: workers },
      (_, i) => array.slice(i * chunkSize, (i + 1) * chunkSize)
    )
    
    // Параллельная сортировка в Worker'ах
    const sortedChunks = yield* Effect.all(
      chunks.map((chunk) => sortInWorker(chunk)),
      { concurrency: workers }
    )
    
    // Merge отсортированных частей
    return mergeSort(sortedChunks)
  })

Effect Worker API

Effect предоставляет высокоуровневый API для работы с Worker через @effect/platform.

Архитектура Effect Worker

┌─────────────────────────────────────────────────────────────────┐
│                    Effect Worker Architecture                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Main Thread                                                    │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                                                         │    │
│  │   ┌─────────────────────────────────────────────────┐   │    │
│  │   │              Worker.makePool                    │   │    │
│  │   │                                                 │   │    │
│  │   │   ┌─────────┐ ┌─────────┐ ┌─────────┐          │   │    │
│  │   │   │Worker #1│ │Worker #2│ │Worker #3│          │   │    │
│  │   │   └─────────┘ └─────────┘ └─────────┘          │   │    │
│  │   │                                                 │   │    │
│  │   │   pool.execute(request) ────────────►          │   │    │
│  │   │   ◄──────────────────── Stream<Response>       │   │    │
│  │   │                                                 │   │    │
│  │   └─────────────────────────────────────────────────┘   │    │
│  │                                                         │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                 │
│  Worker Schema (Type-safe messages):                            │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │   Schema.TaggedRequest ───► Request/Response types      │    │
│  │   Automatic serialization/deserialization               │    │
│  └─────────────────────────────────────────────────────────┘    │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Определение Worker с Schema


// Определяем типизированные запросы
class ComputeFibonacci extends Schema.TaggedRequest<ComputeFibonacci>()(
  "ComputeFibonacci",
  {
    failure: Schema.Never,
    success: Schema.Number,
    payload: { n: Schema.Number }
  }
) {}

class HashPassword extends Schema.TaggedRequest<HashPassword>()(
  "HashPassword",
  {
    failure: WorkerError,
    success: Schema.String,
    payload: { password: Schema.String, rounds: Schema.Number }
  }
) {}

// Union всех запросов
type WorkerRequest = ComputeFibonacci | HashPassword

Создание Worker Pool


// Worker Pool layer
const WorkerPoolLive = Worker.makePoolSerialized<WorkerRequest>({
  // Путь к worker файлу
  spawn: () => new globalThis.Worker("./worker.ts"),
  
  // Размер пула
  size: 4,
  
  // Максимум задач на worker
  permits: 10
}).pipe(
  Layer.provide(BunWorker.layerManager)
)

// Использование
const program = Effect.gen(function* () {
  const pool = yield* Worker.SerializedWorkerPool<WorkerRequest>()
  
  // Выполнение запроса
  const result = yield* pool.executeEffect(
    new ComputeFibonacci({ n: 40 })
  )
  
  console.log("Fibonacci(40) =", result)
})

Effect.runPromise(
  program.pipe(Effect.provide(WorkerPoolLive))
)

Реализация Worker

// worker.ts

// Те же схемы, что и в main
class ComputeFibonacci extends Schema.TaggedRequest<ComputeFibonacci>()(
  "ComputeFibonacci",
  {
    failure: Schema.Never,
    success: Schema.Number,
    payload: { n: Schema.Number }
  }
) {}

// Fibonacci implementation
const fibonacci = (n: number): number => {
  if (n < 2) return n
  return fibonacci(n - 1) + fibonacci(n - 2)
}

// Worker runner
const WorkerLive = WorkerRunner.layerSerialized(
  ComputeFibonacci,
  {
    ComputeFibonacci: (req) => Effect.succeed(fibonacci(req.n))
  }
)

// Запуск worker
WorkerRunner.launch.pipe(
  Layer.provide(WorkerLive),
  Layer.provide(BunWorkerRunner.layer),
  Layer.launch,
  Effect.runFork
)

Streaming результаты


// Запрос, возвращающий Stream
class ProcessLargeFile extends Schema.TaggedRequest<ProcessLargeFile>()(
  "ProcessLargeFile",
  {
    failure: Schema.String,
    success: Schema.Chunk(Schema.String), // Chunks of processed lines
    payload: { path: Schema.String }
  }
) {}

// В main thread
const processFile = (path: string) =>
  Effect.gen(function* () {
    const pool = yield* Worker.SerializedWorkerPool<ProcessLargeFile>()
    
    // execute возвращает Stream
    const stream = pool.execute(new ProcessLargeFile({ path }))
    
    // Обрабатываем chunks по мере поступления
    yield* stream.pipe(
      Stream.tap((chunk) => Effect.log(`Processed ${chunk.length} lines`)),
      Stream.runDrain
    )
  })

Паттерны гибридного использования

В реальных приложениях часто комбинируют Fiber и Worker для максимальной эффективности.

Паттерн 1: Offload CPU-intensive в Worker


interface ImageProcessor {
  readonly resize: (id: string, buffer: ArrayBuffer) => Effect.Effect<ArrayBuffer>
}

const ImageProcessorLive = Effect.gen(function* () {
  // Worker pool для CPU-intensive операций
  const workerPool = yield* createWorkerPool(4)
  
  return {
    resize: (id, buffer) =>
      // Отправляем в Worker, результат — Effect
      workerPool.execute(new ResizeImage({ id, buffer }))
  } satisfies ImageProcessor
})

// HTTP handler использует Fiber для I/O + Worker для CPU
const handleUpload = Effect.gen(function* () {
  const processor = yield* ImageProcessor
  const storage = yield* StorageService
  
  // I/O в файберах
  const uploads = yield* FiberSet.make<void>()
  
  for (const file of request.files) {
    yield* FiberSet.run(uploads,
      Effect.gen(function* () {
        // Read file (I/O) - Fiber
        const buffer = yield* readFile(file)
        
        // Resize (CPU) - Worker
        const resized = yield* processor.resize(file.id, buffer)
        
        // Upload (I/O) - Fiber
        yield* storage.upload(file.id, resized)
      })
    )
  }
  
  yield* FiberSet.join(uploads)
})

Паттерн 2: Worker Pool с Fiber координацией


interface Task<A> {
  readonly id: string
  readonly execute: Effect.Effect<A>
  readonly cpuIntensive: boolean
}

const createHybridExecutor = <A>() =>
  Effect.gen(function* () {
    // Worker pool для CPU tasks
    const workerPool = yield* createWorkerPool(4)
    
    // Fiber set для I/O tasks
    const fiberTasks = yield* FiberSet.make<A>()
    
    // Rate limiting для Workers
    const workerSemaphore = yield* Semaphore.make(4)
    
    const execute = (task: Task<A>) =>
      task.cpuIntensive
        ? // CPU task → Worker
          workerSemaphore.withPermits(1)(
            workerPool.execute(task)
          )
        : // I/O task → Fiber
          FiberSet.run(fiberTasks, task.execute).pipe(
            Effect.flatMap(Fiber.join)
          )
    
    return { execute }
  })

Паттерн 3: Graceful degradation


// Попробовать Worker, fallback на Fiber
const computeWithFallback = <A>(
  task: Effect.Effect<A>,
  workerTask: Effect.Effect<A>,
  timeout: Duration.DurationInput
) =>
  workerTask.pipe(
    // Таймаут на Worker
    Effect.timeout(timeout),
    // Fallback на Fiber (медленнее, но работает)
    Effect.orElse(() => task),
    // Логирование
    Effect.tap(() => Effect.log("Computation completed"))
  )

// Использование
const result = computeWithFallback(
  // Fiber version (медленная, но не блокирует)
  computeInFiber(data),
  // Worker version (быстрая, но с overhead)
  computeInWorker(data),
  "5 seconds"
)

Производительность и бенчмарки

Когда Fiber быстрее Worker

┌─────────────────────────────────────────────────────────────────┐
│                FIBER БЫСТРЕЕ WORKER КОГДА:                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Много мелких задач                                          │
│     ─────────────────────────────────────────────────────────   │
│     Fiber:  1000 tasks × 0.001ms = 1ms                          │
│     Worker: 1000 tasks × 1ms (message overhead) = 1000ms        │
│                                                                 │
│  2. Небольшие данные для передачи                               │
│     ─────────────────────────────────────────────────────────   │
│     Fiber:  Direct memory access = 0ms                          │
│     Worker: Serialize + Transfer + Deserialize = 1-10ms         │
│                                                                 │
│  3. Много shared state                                          │
│     ─────────────────────────────────────────────────────────   │
│     Fiber:  Direct reads/writes                                 │
│     Worker: Каждый доступ = message roundtrip                   │
│                                                                 │
│  4. Короткие вычисления (< 10ms)                                │
│     ─────────────────────────────────────────────────────────   │
│     Fiber:  10ms computation                                    │
│     Worker: 10ms + 5ms overhead = 15ms (50% slower!)            │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Когда Worker быстрее Fiber

┌─────────────────────────────────────────────────────────────────┐
│                WORKER БЫСТРЕЕ FIBER КОГДА:                      │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. CPU-bound задачи > 50ms                                     │
│     ─────────────────────────────────────────────────────────   │
│     На 4-core CPU:                                              │
│     Fiber:  4 × 100ms sequential = 400ms                        │
│     Worker: 4 × 100ms parallel = 100ms + overhead ≈ 110ms       │
│                                                                 │
│  2. Большие независимые вычисления                              │
│     ─────────────────────────────────────────────────────────   │
│     Processing 1GB data:                                        │
│     Fiber:  1 thread = X seconds                                │
│     Worker: 4 threads = X/4 seconds (linear scaling)            │
│                                                                 │
│  3. Blocking operations неизбежны                               │
│     ─────────────────────────────────────────────────────────   │
│     Fiber:  Blocks ALL fibers                                   │
│     Worker: Only blocks that worker                             │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Бенчмарк: Fibonacci


// Бенчмарк: вычисление Fibonacci(40)
// На типичном CPU занимает ~1-2 секунды

// FIBER: Последовательно
const fiberBenchmark = Effect.gen(function* () {
  const start = Date.now()
  
  yield* Effect.all([
    computeFibonacciFiber(40),
    computeFibonacciFiber(40),
    computeFibonacciFiber(40),
    computeFibonacciFiber(40)
  ])
  
  console.log(`Fiber: ${Date.now() - start}ms`)
  // Результат: ~4-8 секунд (последовательно)
})

// WORKER: Параллельно
const workerBenchmark = Effect.gen(function* () {
  const start = Date.now()
  const pool = yield* WorkerPool
  
  yield* Effect.all([
    pool.execute(new Fibonacci({ n: 40 })),
    pool.execute(new Fibonacci({ n: 40 })),
    pool.execute(new Fibonacci({ n: 40 })),
    pool.execute(new Fibonacci({ n: 40 }))
  ])
  
  console.log(`Worker: ${Date.now() - start}ms`)
  // Результат: ~1-2 секунды (параллельно на 4 cores)
})

Best Practices

Decision Tree

┌─────────────────────────────────────────────────────────────────┐
│                    DECISION TREE                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Задача требует вычислений?                                     │
│  │                                                              │
│  ├── НЕТ (I/O bound) ────────────────────► FIBER                │
│  │   • HTTP requests                                            │
│  │   • Database queries                                         │
│  │   • File reads (small)                                       │
│  │                                                              │
│  └── ДА (CPU bound)                                             │
│      │                                                          │
│      └── Вычисление занимает > 50ms?                            │
│          │                                                      │
│          ├── НЕТ ────────────────────────► FIBER                │
│          │   • Overhead Worker > выигрыш                        │
│          │                                                      │
│          └── ДА                                                 │
│              │                                                  │
│              └── Можно параллелить?                             │
│                  │                                              │
│                  ├── ДА ─────────────────► WORKER               │
│                  │   • Image processing                         │
│                  │   • Data transformation                      │
│                  │   • Cryptography                             │
│                  │                                              │
│                  └── НЕТ                                        │
│                      │                                          │
│                      └── Блокирует Event Loop?                  │
│                          │                                      │
│                          ├── ДА ─────────► WORKER               │
│                          │   • Нельзя блокировать main          │
│                          │                                      │
│                          └── НЕТ ────────► FIBER                │
│                              • Можно разбить на chunks          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Guidelines

┌─────────────────────────────────────────────────────────────────┐
│                    BEST PRACTICES                               │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  FIBER:                                                         │
│  ────────────────────────────────────────────────────────────   │
│  ✓ Используй для I/O bound операций                             │
│  ✓ Используй для координации и оркестрации                      │
│  ✓ Используй для структурной конкурентности                     │
│  ✓ Используй для graceful shutdown                              │
│  ✗ Не выполняй CPU-intensive код > 50ms                         │
│  ✗ Не используй blocking APIs                                   │
│                                                                 │
│  WORKER:                                                        │
│  ────────────────────────────────────────────────────────────   │
│  ✓ Используй для CPU-bound вычислений                           │
│  ✓ Используй Worker Pool для амортизации overhead               │
│  ✓ Используй Transferable Objects для больших данных            │
│  ✓ Используй Schema для type-safe коммуникации                  │
│  ✗ Не создавай Worker для каждой мелкой задачи                  │
│  ✗ Не передавай огромные объекты через postMessage              │
│                                                                 │
│  ГИБРИД:                                                        │
│  ────────────────────────────────────────────────────────────   │
│  ✓ I/O координация в Fiber, CPU в Worker                        │
│  ✓ Worker Pool + Fiber для task distribution                    │
│  ✓ Fallback стратегии (Worker → Fiber)                          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Примеры

Пример 1: Image Processing Pipeline


// Schema для Worker
class ResizeImage extends Schema.TaggedRequest<ResizeImage>()(
  "ResizeImage",
  {
    failure: Schema.String,
    success: Schema.instanceOf(ArrayBuffer),
    payload: {
      buffer: Schema.instanceOf(ArrayBuffer),
      width: Schema.Number,
      height: Schema.Number
    }
  }
) {}

// Image Processing Service
interface ImageService {
  readonly processImages: (
    images: ReadonlyArray<{ id: string; buffer: ArrayBuffer }>
  ) => Effect.Effect<void>
}

const ImageServiceLive = Layer.effect(
  ImageService,
  Effect.gen(function* () {
    // Worker pool для resize (CPU-intensive)
    const workerPool = yield* Worker.SerializedWorkerPool<ResizeImage>()
    
    // Storage service для upload (I/O)
    const storage = yield* StorageService
    
    return {
      processImages: (images) =>
        Effect.gen(function* () {
          const uploads = yield* FiberSet.make<void>()
          
          for (const image of images) {
            yield* FiberSet.run(uploads,
              Effect.gen(function* () {
                // 1. Resize в Worker (CPU)
                const resized = yield* workerPool.executeEffect(
                  new ResizeImage({
                    buffer: image.buffer,
                    width: 800,
                    height: 600
                  })
                )
                
                // 2. Upload в Fiber (I/O)
                yield* storage.upload(image.id, resized)
                
                yield* Effect.log(`Processed: ${image.id}`)
              })
            )
          }
          
          yield* FiberSet.join(uploads)
        })
    }
  })
)

Пример 2: Data Processing с Partitioning


interface DataProcessor {
  readonly process: (data: ReadonlyArray<number>) => Effect.Effect<number>
}

// Гибридный процессор: разбивает данные для параллельной обработки
const createDataProcessor = (workerCount: number) =>
  Effect.gen(function* () {
    const workerPool = yield* createWorkerPool(workerCount)
    
    return {
      process: (data) =>
        Effect.gen(function* () {
          // Разбиваем на chunks для Workers
          const chunkSize = Math.ceil(data.length / workerCount)
          const chunks = Chunk.chunksOf(Chunk.fromIterable(data), chunkSize)
          
          // Параллельная обработка в Workers
          const partialResults = yield* Effect.forEach(
            chunks,
            (chunk) => workerPool.execute(
              new ProcessChunk({ data: Chunk.toArray(chunk) })
            ),
            { concurrency: workerCount }
          )
          
          // Aggregation в Fiber (быстро)
          return partialResults.reduce((a, b) => a + b, 0)
        })
    } satisfies DataProcessor
  })

Пример 3: Real-time Analytics


interface AnalyticsService {
  readonly trackEvent: (event: Event) => Effect.Effect<void>
  readonly getStats: Effect.Effect<Stats>
}

const AnalyticsServiceLive = Effect.gen(function* () {
  // Queue для событий (Fiber)
  const eventQueue = yield* Queue.unbounded<Event>()
  
  // Worker для агрегации (CPU)
  const aggregationWorker = yield* createAggregationWorker()
  
  // Периодическая агрегация
  const aggregatorFiber = yield* Effect.fork(
    Effect.forever(
      Effect.gen(function* () {
        yield* Effect.sleep("1 second")
        
        // Забираем все события
        const events = yield* Queue.takeAll(eventQueue)
        
        if (Chunk.size(events) > 0) {
          // Агрегируем в Worker (CPU-intensive)
          yield* aggregationWorker.execute(
            new AggregateEvents({ events: Chunk.toArray(events) })
          )
        }
      })
    )
  )
  
  return {
    trackEvent: (event) => Queue.offer(eventQueue, event),
    
    getStats: aggregationWorker.getStats()
  }
})

Упражнения

Упражнение

Упражнение 1: Определить подход

Легко

Для каждой задачи определите, что лучше использовать — Fiber или Worker:

  1. HTTP запросы к 100 разным API
  2. Сжатие 1GB файла
  3. WebSocket сервер на 10,000 соединений
  4. Парсинг 100MB JSON
  5. Rate-limited API polling
Упражнение

Упражнение 2: Hybrid Processor

Средне

Реализуйте процессор, который выбирает между Fiber и Worker в зависимости от размера данных.

import { Effect } from "effect"

interface Processor {
  readonly process: (data: ReadonlyArray<number>) => Effect.Effect<number>
}

const createHybridProcessor = (): Effect.Effect<Processor, never, Scope.Scope> =>
  ???

// Если data.length < 1000 → Fiber
// Если data.length >= 1000 → Worker
Упражнение

Упражнение 3: Worker Pool с Backpressure

Сложно

Реализуйте Worker Pool с ограничением очереди задач.

import { Effect, Queue, Semaphore, Scope } from "effect"

interface BoundedWorkerPool<Req, Res> {
  readonly execute: (request: Req) => Effect.Effect<Res, QueueFullError>
  readonly stats: Effect.Effect<{ pending: number; active: number }>
}

class QueueFullError {
  readonly _tag = "QueueFullError"
}

const createBoundedWorkerPool = <Req, Res>(config: {
  readonly workers: number
  readonly maxQueueSize: number
  readonly handler: (req: Req) => Effect.Effect<Res>
}): Effect.Effect<BoundedWorkerPool<Req, Res>, never, Scope.Scope> =>
  ???

Заключение

Выбор между Fiber и Worker — это не “или/или”, а “когда что”:

Fiber — ваш default choice:

  • Большинство веб-приложений I/O bound
  • Легковесность позволяет масштабировать конкурентность
  • Структурная конкурентность упрощает управление ресурсами

Worker — для CPU-intensive задач:

  • Когда нужен истинный параллелизм
  • Когда вычисления занимают > 50ms
  • Когда нельзя блокировать Event Loop

Гибридный подход — для production систем:

  • Fiber для координации и I/O
  • Worker для CPU-intensive offloading
  • Effect предоставляет инструменты для обоих

Ключевые метрики для принятия решения:

  • Время выполнения задачи
  • Размер передаваемых данных
  • Количество конкурентных задач
  • Требования к latency

Effect-ts делает работу с обоими подходами удобной и type-safe, позволяя фокусироваться на бизнес-логике, а не на низкоуровневых деталях конкурентности.