Effect Курс FiberMap, FiberSet

FiberMap, FiberSet

Управление коллекциями файберов.

Введение в коллекции файберов

При работе с конкурентными системами часто возникает необходимость управлять динамическими наборами файберов. Например, пул воркеров, активные подключения, параллельные задачи. Effect предоставляет три абстракции для этих целей.

Проблемы ручного управления файберами

┌─────────────────────────────────────────────────────────────────┐
│                    ПРОБЛЕМЫ РУЧНОГО УПРАВЛЕНИЯ                  │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Tracking                                                    │
│     ├── Где хранить ссылки на файберы?                          │
│     ├── Как отслеживать завершённые?                            │
│     └── Как избежать утечек памяти?                             │
│                                                                 │
│  2. Cleanup                                                     │
│     ├── Ручной interrupt каждого файбера                        │
│     ├── Забытые файберы продолжают работать                     │
│     └── Сложно гарантировать cleanup при ошибках                │
│                                                                 │
│  3. Coordination                                                │
│     ├── Ожидание завершения всех файберов                       │
│     ├── Propagation ошибок                                      │
│     └── Graceful shutdown                                       │
│                                                                 │
│  4. Concurrency Safety                                          │
│     ├── Race conditions при добавлении/удалении                 │
│     └── Inconsistent state                                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Решение: FiberSet, FiberMap, FiberHandle

┌─────────────────────────────────────────────────────────────────┐
│                    КОЛЛЕКЦИИ ФАЙБЕРОВ                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  FiberSet<A, E>                                                 │
│  ├── Неупорядоченная коллекция файберов                        │
│  ├── Автоматическое удаление завершённых                       │
│  └── Идеально для: пулов воркеров, параллельных задач          │
│                                                                 │
│  FiberMap<K, A, E>                                              │
│  ├── Коллекция с ключами (как Map)                             │
│  ├── Lookup по ключу O(1)                                      │
│  └── Идеально для: именованных воркеров, по-клиентных задач    │
│                                                                 │
│  FiberHandle<A, E>                                              │
│  ├── Контейнер для одного файбера                              │
│  ├── Замена старого при новом запуске                          │
│  └── Идеально для: singleton задач, текущей операции           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Общие характеристики

Все три абстракции имеют общие свойства:

┌─────────────────────────────────────────────────────────────────┐
│                    ОБЩИЕ ХАРАКТЕРИСТИКИ                         │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ✓ Scope-привязка                                               │
│    └── Автоматический interrupt при закрытии Scope             │
│                                                                 │
│  ✓ Автоматический cleanup                                       │
│    └── Завершённые файберы удаляются автоматически             │
│                                                                 │
│  ✓ Join/Await операции                                          │
│    └── Ожидание всех файберов с propagation ошибок             │
│                                                                 │
│  ✓ Runtime функции                                              │
│    └── makeRuntime, makeRuntimePromise                         │
│                                                                 │
│  ✓ Iterable                                                     │
│    └── Можно итерировать по файберам                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

FiberSet — Неупорядоченная коллекция

FiberSet<A, E> — это коллекция файберов без определённого порядка. Файберы автоматически удаляются из коллекции при завершении.

Создание FiberSet


// FiberSet требует Scope для lifecycle management
const program = Effect.scoped(
  Effect.gen(function* () {
    // Создаём FiberSet
    const set = yield* FiberSet.make<number, Error>()
    
    // Используем set...
    
    // При выходе из scope все файберы будут прерваны
  })
)

Effect.runFork(program)

Сигнатура типа

interface FiberSet<out A = unknown, out E = unknown> 
  extends Iterable<Fiber.RuntimeFiber<A, E>> {
  
  // Deferred для сигнализации о закрытии
  readonly deferred: Deferred.Deferred<void, unknown>
  
  // Внутреннее состояние
  readonly state:
    | { readonly _tag: "Open"; readonly backing: Set<Fiber.RuntimeFiber<A, E>> }
    | { readonly _tag: "Closed" }
}

Добавление файберов


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<number>()
    
    // Способ 1: Добавить существующий файбер
    const fiber = yield* Effect.fork(Effect.succeed(42))
    yield* FiberSet.add(set, fiber)
    
    // Способ 2: Fork и добавить одной операцией (рекомендуется)
    yield* FiberSet.run(set, Effect.succeed(100))
    yield* FiberSet.run(set, Effect.succeed(200))
    
    // Получаем размер
    const size = yield* FiberSet.size(set)
    console.log("Fibers in set:", size)
  })
)

FiberSet.run — Fork и добавление


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<string>()
    
    // run создаёт файбер и добавляет его в set
    // Возвращает RuntimeFiber
    const fiber1 = yield* FiberSet.run(set, 
      Effect.gen(function* () {
        yield* Effect.sleep("100 millis")
        return "task-1"
      })
    )
    
    const fiber2 = yield* FiberSet.run(set,
      Effect.gen(function* () {
        yield* Effect.sleep("200 millis")
        return "task-2"
      })
    )
    
    console.log("Started fibers:", fiber1.id(), fiber2.id())
    
    // Файберы автоматически удаляются при завершении
    yield* Effect.sleep("300 millis")
    
    const size = yield* FiberSet.size(set)
    console.log("Remaining:", size) // 0
  })
)

FiberSet.join — Ожидание всех файберов


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<number, Error>()
    
    // Запускаем несколько задач
    yield* FiberSet.run(set, Effect.sleep("50 millis").pipe(Effect.as(1)))
    yield* FiberSet.run(set, Effect.sleep("100 millis").pipe(Effect.as(2)))
    yield* FiberSet.run(set, Effect.sleep("150 millis").pipe(Effect.as(3)))
    
    // join ожидает завершения ВСЕХ файберов
    // Если любой файбер завершится с ошибкой, join тоже завершится с ошибкой
    yield* FiberSet.join(set)
    
    console.log("All tasks completed!")
  })
)

FiberSet.join с ошибками


class TaskError {
  readonly _tag = "TaskError"
  constructor(readonly message: string) {}
}

const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<number, TaskError>()
    
    yield* FiberSet.run(set, Effect.sleep("50 millis").pipe(Effect.as(1)))
    yield* FiberSet.run(set, Effect.fail(new TaskError("Task 2 failed")))
    yield* FiberSet.run(set, Effect.sleep("150 millis").pipe(Effect.as(3)))
    
    // join завершится с первой ошибкой
    yield* FiberSet.join(set)
  })
).pipe(
  Effect.catchTag("TaskError", (e) => 
    Effect.log(`Caught error: ${e.message}`)
  )
)

Effect.runFork(program)
// Output: Caught error: Task 2 failed

FiberSet.awaitEmpty — Ожидание опустошения


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<void>()
    
    // Запускаем задачи
    yield* FiberSet.run(set, Effect.sleep("100 millis"))
    yield* FiberSet.run(set, Effect.sleep("200 millis"))
    
    console.log("Waiting for set to become empty...")
    
    // awaitEmpty ожидает когда set станет пустым
    // НЕ propagates ошибки, просто ждёт
    yield* FiberSet.awaitEmpty(set)
    
    console.log("Set is empty!")
  })
)

FiberSet.clear — Очистка с прерыванием


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<void>()
    
    // Запускаем долгие задачи
    yield* FiberSet.run(set, Effect.sleep("10 seconds"))
    yield* FiberSet.run(set, Effect.sleep("10 seconds"))
    
    const before = yield* FiberSet.size(set)
    console.log("Before clear:", before) // 2
    
    // clear прерывает все файберы и очищает set
    yield* FiberSet.clear(set)
    
    const after = yield* FiberSet.size(set)
    console.log("After clear:", after) // 0
  })
)

Итерация по FiberSet


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<number>()
    
    yield* FiberSet.run(set, Effect.succeed(1))
    yield* FiberSet.run(set, Effect.succeed(2))
    yield* FiberSet.run(set, Effect.succeed(3))
    
    // FiberSet — это Iterable
    for (const fiber of set) {
      const status = yield* Fiber.status(fiber)
      console.log(`Fiber ${fiber.id()}: ${status._tag}`)
    }
  })
)

FiberMap — Коллекция с ключами

FiberMap<K, A, E> — это коллекция файберов, индексированная по ключу. Позволяет быстро находить, заменять или удалять файберы по ключу.

Создание FiberMap


const program = Effect.scoped(
  Effect.gen(function* () {
    // Типизированный FiberMap
    // K = string (тип ключа)
    // A = number (тип результата)
    // E = Error (тип ошибки)
    const map = yield* FiberMap.make<string, number, Error>()
    
    // Используем map...
  })
)

Сигнатура типа

interface FiberMap<in out K, out A = unknown, out E = unknown>
  extends Iterable<[K, Fiber.RuntimeFiber<A, E>]> {
  
  readonly deferred: Deferred.Deferred<void, unknown>
  
  readonly state:
    | { 
        readonly _tag: "Open"
        readonly backing: MutableHashMap.MutableHashMap<K, Fiber.RuntimeFiber<A, E>>
      }
    | { readonly _tag: "Closed" }
}

Добавление и запуск файберов


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, number>()
    
    // Способ 1: Добавить существующий файбер
    const fiber = yield* Effect.fork(Effect.succeed(42))
    yield* FiberMap.set(map, "task-1", fiber)
    
    // Способ 2: Fork и добавить (рекомендуется)
    yield* FiberMap.run(map, "task-2", Effect.succeed(100))
    yield* FiberMap.run(map, "task-3", Effect.succeed(200))
    
    const size = yield* FiberMap.size(map)
    console.log("Fibers in map:", size) // 3
  })
)

FiberMap.run с заменой

При запуске файбера с уже существующим ключом, старый файбер прерывается:


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, string>()
    
    // Запускаем долгую задачу
    yield* FiberMap.run(map, "current-job",
      Effect.gen(function* () {
        yield* Effect.log("Job 1 started")
        yield* Effect.sleep("10 seconds")
        return "Job 1 done"
      })
    )
    
    yield* Effect.sleep("100 millis")
    
    // Запускаем новую задачу с тем же ключом
    // Job 1 будет прервана!
    yield* FiberMap.run(map, "current-job",
      Effect.gen(function* () {
        yield* Effect.log("Job 2 started")
        yield* Effect.sleep("50 millis")
        return "Job 2 done"
      })
    )
    
    yield* FiberMap.join(map)
    // Output:
    // Job 1 started
    // Job 2 started
    // (Job 1 была прервана, Job 2 завершилась)
  })
)

FiberMap.run с onlyIfMissing


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, string>()
    
    // Запускаем первую задачу
    yield* FiberMap.run(map, "singleton",
      Effect.gen(function* () {
        yield* Effect.sleep("1 second")
        return "first"
      })
    )
    
    // Пытаемся запустить вторую с тем же ключом
    // С onlyIfMissing: true — НЕ заменит существующую
    yield* FiberMap.run(map, "singleton",
      Effect.gen(function* () {
        yield* Effect.sleep("100 millis")
        return "second"
      }),
      { onlyIfMissing: true }
    )
    
    yield* FiberMap.join(map)
    // Выполнится только первая задача
  })
)

FiberMap.get — Получение файбера


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, number>()
    
    yield* FiberMap.run(map, "task-1", Effect.succeed(42))
    
    // get возвращает Effect с ошибкой NoSuchElementException если ключа нет
    const fiber = yield* FiberMap.get(map, "task-1")
    const result = yield* Fiber.join(fiber)
    console.log("Result:", result) // 42
    
    // Безопасный вариант через Effect.option
    const maybeFiber = yield* FiberMap.get(map, "unknown").pipe(
      Effect.option
    )
    console.log("Maybe:", Option.isNone(maybeFiber)) // true
  })
)

FiberMap.has — Проверка наличия


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, void>()
    
    yield* FiberMap.run(map, "active-task", Effect.never)
    
    const hasActive = yield* FiberMap.has(map, "active-task")
    const hasOther = yield* FiberMap.has(map, "other-task")
    
    console.log("Has active-task:", hasActive) // true
    console.log("Has other-task:", hasOther)   // false
  })
)

FiberMap.remove — Удаление с прерыванием


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, void>()
    
    yield* FiberMap.run(map, "task-to-remove",
      Effect.gen(function* () {
        yield* Effect.log("Task started")
        yield* Effect.sleep("10 seconds")
        yield* Effect.log("Task completed") // Не выполнится
      })
    )
    
    yield* Effect.sleep("100 millis")
    
    // remove прерывает файбер и удаляет его из map
    yield* FiberMap.remove(map, "task-to-remove")
    
    const has = yield* FiberMap.has(map, "task-to-remove")
    console.log("Has after remove:", has) // false
  })
)

FiberMap.join и awaitEmpty


class WorkerError {
  readonly _tag = "WorkerError"
  constructor(readonly workerId: string) {}
}

const program = Effect.scoped(
  Effect.gen(function* () {
    const workers = yield* FiberMap.make<string, void, WorkerError>()
    
    // Запускаем воркеры
    yield* FiberMap.run(workers, "worker-1",
      Effect.sleep("100 millis")
    )
    yield* FiberMap.run(workers, "worker-2",
      Effect.sleep("200 millis")
    )
    yield* FiberMap.run(workers, "worker-3",
      Effect.fail(new WorkerError("worker-3"))
    )
    
    // join завершится с ошибкой от worker-3
    yield* FiberMap.join(workers).pipe(
      Effect.catchTag("WorkerError", (e) =>
        Effect.log(`Worker ${e.workerId} failed`)
      )
    )
  })
)

Итерация по FiberMap


const program = Effect.scoped(
  Effect.gen(function* () {
    const map = yield* FiberMap.make<string, number>()
    
    yield* FiberMap.run(map, "a", Effect.succeed(1))
    yield* FiberMap.run(map, "b", Effect.succeed(2))
    yield* FiberMap.run(map, "c", Effect.succeed(3))
    
    // FiberMap — это Iterable<[K, RuntimeFiber<A, E>]>
    for (const [key, fiber] of map) {
      const status = yield* Fiber.status(fiber)
      console.log(`${key}: ${status._tag}`)
    }
  })
)

FiberHandle — Единичный файбер

FiberHandle<A, E> — это контейнер для одного файбера. При запуске нового файбера старый автоматически прерывается. Идеально для singleton-задач.

Создание FiberHandle


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<string, Error>()
    
    // handle содержит максимум один файбер
  })
)

Сигнатура типа

interface FiberHandle<out A = unknown, out E = unknown> {
  readonly deferred: Deferred.Deferred<void, unknown>
  
  readonly state:
    | { 
        readonly _tag: "Open"
        readonly fiber: Fiber.RuntimeFiber<A, E> | undefined 
      }
    | { readonly _tag: "Closed" }
}

FiberHandle.run — Запуск с заменой


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<string>()
    
    // Запускаем первую задачу
    yield* FiberHandle.run(handle,
      Effect.gen(function* () {
        yield* Effect.log("Task 1 started")
        yield* Effect.sleep("5 seconds")
        yield* Effect.log("Task 1 done") // Не выполнится
        return "result-1"
      })
    )
    
    yield* Effect.sleep("100 millis")
    
    // Запускаем вторую задачу — первая прерывается!
    yield* FiberHandle.run(handle,
      Effect.gen(function* () {
        yield* Effect.log("Task 2 started")
        yield* Effect.sleep("100 millis")
        yield* Effect.log("Task 2 done")
        return "result-2"
      })
    )
    
    yield* FiberHandle.join(handle)
  })
)

Effect.runFork(program)
/*
Output:
Task 1 started
Task 2 started
Task 2 done
*/

FiberHandle.get — Получение текущего файбера


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<number>()
    
    // Изначально пусто
    const empty = yield* FiberHandle.get(handle).pipe(Effect.option)
    console.log("Before run:", Option.isNone(empty)) // true
    
    yield* FiberHandle.run(handle, Effect.succeed(42))
    
    // Теперь есть файбер
    const fiber = yield* FiberHandle.get(handle)
    const result = yield* Fiber.join(fiber)
    console.log("Result:", result) // 42
  })
)

FiberHandle.clear — Очистка


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<void>()
    
    yield* FiberHandle.run(handle,
      Effect.gen(function* () {
        yield* Effect.log("Long task started")
        yield* Effect.sleep("10 seconds")
        yield* Effect.log("Long task done")
      })
    )
    
    yield* Effect.sleep("100 millis")
    
    // clear прерывает текущий файбер
    yield* FiberHandle.clear(handle)
    
    yield* Effect.log("Handle cleared")
  })
)

FiberHandle.join и awaitEmpty


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<string>()
    
    yield* FiberHandle.run(handle,
      Effect.gen(function* () {
        yield* Effect.sleep("100 millis")
        return "done"
      })
    )
    
    // join ожидает текущий файбер
    yield* FiberHandle.join(handle)
    console.log("Joined!")
    
    // awaitEmpty ожидает когда handle станет пустым
    // (полезно после clear или когда файбер завершился)
    yield* FiberHandle.awaitEmpty(handle)
    console.log("Empty!")
  })
)

Паттерн: Текущая операция


interface SearchService {
  readonly search: (query: string) => Effect.Effect<ReadonlyArray<string>>
  readonly cancel: Effect.Effect<void>
}

const SearchService = Effect.gen(function* () {
  const handle = yield* FiberHandle.make<ReadonlyArray<string>>()
  
  const search = (query: string) =>
    Effect.gen(function* () {
      // Запуск нового поиска отменяет предыдущий
      const fiber = yield* FiberHandle.run(handle,
        Effect.gen(function* () {
          yield* Effect.log(`Searching: ${query}`)
          yield* Effect.sleep("500 millis") // Симуляция API
          return [`Result for "${query}" 1`, `Result for "${query}" 2`]
        })
      )
      
      return yield* Fiber.join(fiber)
    })
  
  const cancel = FiberHandle.clear(handle)
  
  return { search, cancel } as const
})

Lifecycle и Scope

Все три коллекции привязаны к Scope. При закрытии Scope все файберы в коллекции автоматически прерываются.

Визуализация Lifecycle

┌─────────────────────────────────────────────────────────────────┐
│                        Scope Lifecycle                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Effect.scoped(                                                 │
│    Effect.gen(function* () {                                    │
│      const set = yield* FiberSet.make()                         │
│      ─────────────────────────────────────────────────────────  │
│      │                                                          │
│      │  FiberSet.run(set, task1)  ──► Fiber #1 running          │
│      │  FiberSet.run(set, task2)  ──► Fiber #2 running          │
│      │  FiberSet.run(set, task3)  ──► Fiber #3 running          │
│      │                                                          │
│      │         ... работа с файберами ...                       │
│      │                                                          │
│      │  Fiber #1 completes ──► removed from set                 │
│      │  Fiber #2 still running                                  │
│      │  Fiber #3 still running                                  │
│      │                                                          │
│      ─────────────────────────────────────────────────────────  │
│    })  ◄── Scope closes                                         │
│  )                                                              │
│    │                                                            │
│    └── Fiber #2 interrupted                                     │
│        Fiber #3 interrupted                                     │
│        FiberSet closed                                          │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Практический пример: HTTP сервер с активными соединениями


interface Connection {
  readonly id: string
  readonly handle: (data: string) => Effect.Effect<void>
}

const ConnectionManager = Effect.gen(function* () {
  // FiberMap для активных соединений
  const connections = yield* FiberMap.make<string, void>()
  
  const addConnection = (conn: Connection) =>
    FiberMap.run(connections, conn.id,
      Effect.gen(function* () {
        yield* Effect.log(`Connection ${conn.id} established`)
        
        // Симуляция обработки данных
        yield* Effect.forever(
          Effect.gen(function* () {
            yield* Effect.sleep("1 second")
            yield* conn.handle(`ping from ${conn.id}`)
          })
        )
      }).pipe(
        Effect.onInterrupt(() =>
          Effect.log(`Connection ${conn.id} closed`)
        )
      )
    )
  
  const removeConnection = (id: string) =>
    FiberMap.remove(connections, id)
  
  const getActiveConnections = Effect.gen(function* () {
    const ids: string[] = []
    for (const [id] of connections) {
      ids.push(id)
    }
    return ids
  })
  
  const shutdown = Effect.gen(function* () {
    yield* Effect.log("Shutting down all connections...")
    yield* FiberMap.clear(connections)
  })
  
  return {
    addConnection,
    removeConnection,
    getActiveConnections,
    shutdown
  } as const
})

// Использование
const program = Effect.scoped(
  Effect.gen(function* () {
    const manager = yield* ConnectionManager
    
    // Добавляем соединения
    yield* manager.addConnection({
      id: "conn-1",
      handle: (data) => Effect.log(`Received: ${data}`)
    })
    yield* manager.addConnection({
      id: "conn-2", 
      handle: (data) => Effect.log(`Received: ${data}`)
    })
    
    yield* Effect.sleep("3 seconds")
    
    const active = yield* manager.getActiveConnections
    console.log("Active connections:", active)
    
    // При выходе из scope все соединения закроются
  })
)

Propagation of Interruption

По умолчанию join не propagates прерывания. Можно включить это поведение через опцию propagateInterruption.

Без propagation (по умолчанию)


const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<void>()
    
    // Файбер, который будет прерван
    yield* FiberSet.run(set, Effect.interrupt)
    
    // join НЕ завершится с ошибкой прерывания
    // Просто дождётся завершения
    yield* FiberSet.join(set)
    
    console.log("Completed (interruption was swallowed)")
  })
)

С propagateInterruption


const program = Effect.scoped(
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<void>()
    
    // Запускаем с propagateInterruption
    yield* FiberHandle.run(handle, Effect.interrupt, {
      propagateInterruption: true
    })
    
    // join теперь propagates прерывание
    yield* FiberHandle.join(handle)
  })
).pipe(
  Effect.catchAllCause((cause) =>
    Effect.log(`Caught cause: ${cause}`)
  )
)

Effect.runFork(program)
// Output: Caught cause: Interrupt(...)

Паттерн: Отмена всех задач при ошибке


class CriticalError {
  readonly _tag = "CriticalError"
}

const program = Effect.scoped(
  Effect.gen(function* () {
    const workers = yield* FiberSet.make<void, CriticalError>()
    
    // Запускаем несколько воркеров
    yield* FiberSet.run(workers,
      Effect.gen(function* () {
        yield* Effect.sleep("5 seconds")
        yield* Effect.log("Worker 1 done")
      })
    )
    
    yield* FiberSet.run(workers,
      Effect.gen(function* () {
        yield* Effect.sleep("100 millis")
        // Критическая ошибка
        return yield* Effect.fail(new CriticalError())
      }),
      { propagateInterruption: true }
    )
    
    yield* FiberSet.run(workers,
      Effect.gen(function* () {
        yield* Effect.sleep("5 seconds")
        yield* Effect.log("Worker 3 done")
      })
    )
    
    // join завершится с CriticalError
    // Все остальные файберы будут прерваны при выходе из scope
    yield* FiberSet.join(workers)
  })
).pipe(
  Effect.catchTag("CriticalError", () =>
    Effect.log("Critical error occurred, all workers stopped")
  )
)

Runtime функции

FiberSet, FiberMap и FiberHandle предоставляют функции для создания runtime с привязкой к коллекции.

FiberSet.makeRuntime


interface Logger {
  readonly log: (msg: string) => Effect.Effect<void>
}

const Logger = Context.GenericTag<Logger>("Logger")

const LoggerLive = Logger.of({
  log: (msg) => Effect.log(msg)
})

const program = Effect.scoped(
  Effect.gen(function* () {
    // makeRuntime создаёт функцию run с контекстом
    const run = yield* FiberSet.makeRuntime<Logger>().pipe(
      Effect.provideService(Logger, LoggerLive)
    )
    
    // Теперь можно запускать эффекты с доступом к Logger
    const fiber1 = run(
      Effect.gen(function* () {
        const logger = yield* Logger
        yield* logger.log("Task 1 running")
        yield* Effect.sleep("100 millis")
        return "result-1"
      })
    )
    
    const fiber2 = run(
      Effect.gen(function* () {
        const logger = yield* Logger
        yield* logger.log("Task 2 running")
        return "result-2"
      })
    )
    
    // Файберы добавлены в FiberSet автоматически
  })
)

FiberMap.makeRuntime


const program = Effect.scoped(
  Effect.gen(function* () {
    // makeRuntime для FiberMap принимает ключ
    const run = yield* FiberMap.makeRuntime<never, string>()
    
    // Первый аргумент — ключ
    run("task-a", Effect.succeed(1))
    run("task-b", Effect.succeed(2))
    run("task-c", Effect.succeed(3))
  })
)

makeRuntimePromise — Для Promise-based кода


const program = Effect.scoped(
  Effect.gen(function* () {
    // makeRuntimePromise возвращает функцию, возвращающую Promise
    const runPromise = yield* FiberSet.makeRuntimePromise<never>()
    
    // Можно использовать с async/await
    const result1 = await runPromise(Effect.succeed(42))
    console.log("Result 1:", result1)
    
    const result2 = await runPromise(Effect.succeed("hello"))
    console.log("Result 2:", result2)
  })
)

Паттерны использования

Паттерн 1: Worker Pool


interface Task<A> {
  readonly id: string
  readonly execute: Effect.Effect<A>
}

interface WorkerPool<A> {
  readonly submit: (task: Task<A>) => Effect.Effect<void>
  readonly shutdown: Effect.Effect<void>
  readonly stats: Effect.Effect<{ active: number; completed: number }>
}

const createWorkerPool = <A>(
  workerCount: number
): Effect.Effect<WorkerPool<A>, never, Scope.Scope> =>
  Effect.gen(function* () {
    const workers = yield* FiberSet.make<void>()
    const taskQueue = yield* Queue.unbounded<Task<A>>()
    const completedCount = yield* Ref.make(0)
    
    // Создаём воркеры
    for (let i = 0; i < workerCount; i++) {
      yield* FiberSet.run(workers,
        Effect.gen(function* () {
          while (true) {
            const task = yield* Queue.take(taskQueue)
            yield* Effect.log(`Worker ${i}: executing ${task.id}`)
            yield* task.execute
            yield* Ref.update(completedCount, (n) => n + 1)
          }
        }).pipe(Effect.interruptible)
      )
    }
    
    return {
      submit: (task) => Queue.offer(taskQueue, task),
      
      shutdown: Effect.gen(function* () {
        yield* Queue.shutdown(taskQueue)
        yield* FiberSet.clear(workers)
      }),
      
      stats: Effect.gen(function* () {
        const active = yield* FiberSet.size(workers)
        const completed = yield* Ref.get(completedCount)
        return { active, completed }
      })
    }
  })


const program = Effect.scoped(
  Effect.gen(function* () {
    const pool = yield* createWorkerPool<string>(3)
    
    // Отправляем задачи
    for (let i = 0; i < 10; i++) {
      yield* pool.submit({
        id: `task-${i}`,
        execute: Effect.sleep(`${50 + Math.random() * 100} millis`).pipe(
          Effect.as(`Result ${i}`)
        )
      })
    }
    
    yield* Effect.sleep("1 second")
    
    const stats = yield* pool.stats
    console.log("Stats:", stats)
  })
)

Паттерн 2: Per-User Tasks


interface UserTask {
  readonly userId: string
  readonly task: Effect.Effect<void>
}

interface UserTaskManager {
  readonly startTask: (task: UserTask) => Effect.Effect<void>
  readonly cancelTask: (userId: string) => Effect.Effect<void>
  readonly isTaskRunning: (userId: string) => Effect.Effect<boolean>
}

const createUserTaskManager = (): Effect.Effect<
  UserTaskManager,
  never,
  Scope.Scope
> =>
  Effect.gen(function* () {
    const tasks = yield* FiberMap.make<string, void>()
    
    return {
      startTask: ({ userId, task }) =>
        FiberMap.run(tasks, userId,
          task.pipe(
            Effect.onInterrupt(() =>
              Effect.log(`Task for user ${userId} was cancelled`)
            )
          )
        ).pipe(Effect.asVoid),
      
      cancelTask: (userId) =>
        FiberMap.remove(tasks, userId),
      
      isTaskRunning: (userId) =>
        FiberMap.has(tasks, userId)
    }
  })


const program = Effect.scoped(
  Effect.gen(function* () {
    const manager = yield* createUserTaskManager()
    
    // Запускаем задачи для разных пользователей
    yield* manager.startTask({
      userId: "user-1",
      task: Effect.gen(function* () {
        yield* Effect.log("User 1: working...")
        yield* Effect.sleep("500 millis")
        yield* Effect.log("User 1: done!")
      })
    })
    
    yield* manager.startTask({
      userId: "user-2",
      task: Effect.gen(function* () {
        yield* Effect.log("User 2: working...")
        yield* Effect.sleep("1 second")
        yield* Effect.log("User 2: done!")
      })
    })
    
    yield* Effect.sleep("200 millis")
    
    // Отменяем задачу user-2
    yield* manager.cancelTask("user-2")
    
    yield* Effect.sleep("1 second")
  })
)

interface SearchResult {
  readonly query: string
  readonly results: ReadonlyArray<string>
}

const createDebouncedSearch = (
  debounceMs: number,
  search: (query: string) => Effect.Effect<ReadonlyArray<string>>
): Effect.Effect<
  (query: string) => Effect.Effect<SearchResult>,
  never,
  Scope.Scope
> =>
  Effect.gen(function* () {
    const handle = yield* FiberHandle.make<SearchResult>()
    
    return (query: string) =>
      Effect.gen(function* () {
        // Отменяем предыдущий поиск
        yield* FiberHandle.clear(handle)
        
        // Запускаем новый с debounce
        const fiber = yield* FiberHandle.run(handle,
          Effect.gen(function* () {
            yield* Effect.sleep(`${debounceMs} millis`)
            const results = yield* search(query)
            return { query, results }
          })
        )
        
        return yield* Fiber.join(fiber)
      })
  })


const program = Effect.scoped(
  Effect.gen(function* () {
    const searchDebounced = yield* createDebouncedSearch(
      300,
      (query) => Effect.succeed([`Result 1 for ${query}`, `Result 2 for ${query}`])
    )
    
    // Быстрые последовательные запросы
    const results = yield* Effect.race(
      searchDebounced("a"),
      Effect.gen(function* () {
        yield* Effect.sleep("50 millis")
        return yield* searchDebounced("ab")
      })
    ).pipe(
      Effect.race(
        Effect.gen(function* () {
          yield* Effect.sleep("100 millis")
          return yield* searchDebounced("abc")
        })
      )
    )
    
    // Только последний поиск "abc" выполнится
    console.log("Results:", results)
  })
)

API Reference

FiberSet API

ФункцияСигнатураОписание
FiberSet.make() => Effect<FiberSet<A, E>, never, Scope>Создать FiberSet
FiberSet.add(set, fiber, options?) => Effect<void>Добавить файбер
FiberSet.run(set, effect, options?) => Effect<RuntimeFiber>Fork и добавить
FiberSet.join(set) => Effect<void, E>Ждать все файберы
FiberSet.awaitEmpty(set) => Effect<void>Ждать опустошения
FiberSet.clear(set) => Effect<void>Очистить с interrupt
FiberSet.size(set) => Effect<number>Количество файберов
FiberSet.makeRuntime<R>() => Effect<RunFn, never, Scope | R>Runtime функция
FiberSet.makeRuntimePromise<R>() => Effect<RunPromiseFn, never, Scope | R>Promise runtime

FiberMap API

ФункцияСигнатураОписание
FiberMap.make() => Effect<FiberMap<K, A, E>, never, Scope>Создать FiberMap
FiberMap.set(map, key, fiber) => Effect<void>Добавить файбер
FiberMap.run(map, key, effect, options?) => Effect<RuntimeFiber>Fork и добавить
FiberMap.get(map, key) => Effect<RuntimeFiber, NoSuchElementException>Получить файбер
FiberMap.has(map, key) => Effect<boolean>Проверить наличие
FiberMap.remove(map, key) => Effect<void>Удалить с interrupt
FiberMap.join(map) => Effect<void, E>Ждать все файберы
FiberMap.awaitEmpty(map) => Effect<void>Ждать опустошения
FiberMap.clear(map) => Effect<void>Очистить с interrupt
FiberMap.size(map) => Effect<number>Количество файберов

FiberHandle API

ФункцияСигнатураОписание
FiberHandle.make() => Effect<FiberHandle<A, E>, never, Scope>Создать FiberHandle
FiberHandle.run(handle, effect, options?) => Effect<RuntimeFiber>Запустить (заменить)
FiberHandle.get(handle) => Effect<RuntimeFiber, NoSuchElementException>Получить файбер
FiberHandle.join(handle) => Effect<void, E>Ждать файбер
FiberHandle.awaitEmpty(handle) => Effect<void>Ждать опустошения
FiberHandle.clear(handle) => Effect<void>Очистить с interrupt

Общие опции

interface RunOptions {
  // Если true, join propagates прерывания
  readonly propagateInterruption?: boolean
}

interface FiberMapRunOptions extends RunOptions {
  // Если true, не заменяет существующий файбер
  readonly onlyIfMissing?: boolean
}

Примеры

Пример 1: Параллельная загрузка файлов


interface FileDownload {
  readonly url: string
  readonly content: string
}

const downloadFile = (url: string): Effect.Effect<FileDownload> =>
  Effect.gen(function* () {
    yield* Effect.log(`Downloading: ${url}`)
    yield* Effect.sleep(`${100 + Math.random() * 200} millis`)
    return { url, content: `Content of ${url}` }
  })

const downloadAllFiles = (
  urls: ReadonlyArray<string>
): Effect.Effect<ReadonlyArray<FileDownload>, never, Scope.Scope> =>
  Effect.gen(function* () {
    const downloads = yield* FiberSet.make<FileDownload>()
    const results = yield* Ref.make<Chunk.Chunk<FileDownload>>(Chunk.empty())
    
    // Запускаем загрузки параллельно
    for (const url of urls) {
      yield* FiberSet.run(downloads,
        downloadFile(url).pipe(
          Effect.tap((file) =>
            Ref.update(results, Chunk.append(file))
          )
        )
      )
    }
    
    // Ждём завершения всех
    yield* FiberSet.join(downloads)
    
    return yield* Ref.get(results).pipe(Effect.map(Chunk.toArray))
  })


const program = Effect.scoped(
  Effect.gen(function* () {
    const files = yield* downloadAllFiles([
      "https://example.com/file1.txt",
      "https://example.com/file2.txt",
      "https://example.com/file3.txt"
    ])
    
    console.log("Downloaded files:", files.map((f) => f.url))
  })
)

Пример 2: Rate-Limited API Calls


interface ApiClient {
  readonly call: (endpoint: string) => Effect.Effect<string>
}

const createRateLimitedClient = (
  maxConcurrent: number
): Effect.Effect<ApiClient, never, Scope.Scope> =>
  Effect.gen(function* () {
    const activeCalls = yield* FiberMap.make<string, string>()
    const semaphore = yield* Semaphore.make(maxConcurrent)
    
    const call = (endpoint: string) =>
      Effect.gen(function* () {
        // Проверяем, есть ли уже активный вызов к этому endpoint
        const alreadyRunning = yield* FiberMap.has(activeCalls, endpoint)
        if (alreadyRunning) {
          // Ждём завершения существующего
          yield* FiberMap.join(activeCalls).pipe(Effect.ignore)
        }
        
        // Запускаем новый вызов с rate limiting
        const fiber = yield* FiberMap.run(activeCalls, endpoint,
          semaphore.withPermits(1)(
            Effect.gen(function* () {
              yield* Effect.log(`Calling API: ${endpoint}`)
              yield* Effect.sleep("200 millis")
              return `Response from ${endpoint}`
            })
          )
        )
        
        return yield* Fiber.join(fiber)
      })
    
    return { call }
  })


const program = Effect.scoped(
  Effect.gen(function* () {
    const client = yield* createRateLimitedClient(2)
    
    // Параллельные вызовы (но максимум 2 одновременно)
    const results = yield* Effect.all([
      client.call("/users"),
      client.call("/products"),
      client.call("/orders"),
      client.call("/stats")
    ], { concurrency: "unbounded" })
    
    console.log("Results:", results)
  })
)

Упражнения

Упражнение

Упражнение 1: Простой FiberSet

Легко

Создайте FiberSet и запустите несколько задач параллельно.

import { Effect, FiberSet } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    const set = yield* FiberSet.make<number>()
    
    // Запустите 5 задач, каждая возвращает свой номер
    // Дождитесь завершения всех
    ???
  })
)
Упражнение

Упражнение 2: FiberMap с ключами

Легко

Создайте FiberMap для управления задачами по имени.

import { Effect, FiberMap, Fiber } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    const tasks = yield* FiberMap.make<string, string>()
    
    // Запустите задачи "task-a", "task-b", "task-c"
    // Получите результат задачи "task-b"
    ???
  })
)
Упражнение

Упражнение 3: Отмена предыдущей задачи

Средне

Используйте FiberHandle для реализации поиска с отменой предыдущего.

import { Effect, FiberHandle, Fiber } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    const searchHandle = yield* FiberHandle.make<string>()
    
    // Реализуйте search, который отменяет предыдущий поиск
    const search = (query: string): Effect.Effect<string> =>
      ???
    
    // Быстрые последовательные поиски
    yield* Effect.fork(search("a"))
    yield* Effect.sleep("10 millis")
    yield* Effect.fork(search("ab"))
    yield* Effect.sleep("10 millis")
    const result = yield* search("abc")
    
    // Только "abc" должен завершиться
    console.log("Result:", result)
  })
)
Упражнение

Упражнение 4: Task Queue с Worker Pool

Сложно

Реализуйте очередь задач с пулом воркеров.

import { Effect, FiberSet, Queue, Ref, Scope } from "effect"

interface TaskQueue<T, R> {
  readonly submit: (task: Effect.Effect<T>) => Effect.Effect<void>
  readonly getStats: Effect.Effect<{ pending: number; completed: number }>
  readonly shutdown: Effect.Effect<void>
}

const createTaskQueue = <T>(
  workerCount: number
): Effect.Effect<TaskQueue<T, never>, never, Scope.Scope> =>
  ???

// Использование
const program = Effect.scoped(
  Effect.gen(function* () {
    const queue = yield* createTaskQueue<number>(3)
    
    // Отправляем 10 задач
    for (let i = 0; i < 10; i++) {
      yield* queue.submit(
        Effect.gen(function* () {
          yield* Effect.sleep(`${Math.random() * 100} millis`)
          return i
        })
      )
    }
    
    yield* Effect.sleep("500 millis")
    
    const stats = yield* queue.getStats
    console.log("Stats:", stats)
  })
)

Заключение

FiberMap, FiberSet и FiberHandle предоставляют высокоуровневые абстракции для управления коллекциями файберов:

  • FiberSet — для неупорядоченных коллекций (пулы, параллельные задачи)
  • FiberMap — для коллекций с ключами (именованные воркеры, per-user задачи)
  • FiberHandle — для единичных файберов (текущая операция, singleton)

Ключевые преимущества:

  • Автоматический lifecycle management через Scope
  • Автоматическое удаление завершённых файберов
  • Удобные операции join/await
  • Runtime функции для создания эффектов с контекстом
  • Настраиваемое поведение прерываний

В следующей статье мы рассмотрим подробно, когда использовать Fiber, а когда Worker для настоящего параллелизма.