Effect Курс Forking

Forking

Стратегии создания файберов.

Введение в Forking

Forking — это процесс создания нового файбера из эффекта. При fork’е эффект начинает выполняться параллельно, и вы получаете ссылку на созданный файбер для дальнейшего управления.

Базовая концепция


const myEffect: Effect.Effect<number> = Effect.succeed(42)

// Fork возвращает Effect, производящий RuntimeFiber
//       ┌─── Effect<RuntimeFiber<number, never>, never, never>
//       ▼
const fiberEffect = Effect.fork(myEffect)

Четыре стратегии жизненного цикла

┌─────────────────────────────────────────────────────────────────┐
│                  СТРАТЕГИИ FORKING                              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Effect.fork (Automatic Supervision)                         │
│     └── Файбер привязан к родителю                              │
│                                                                 │
│  2. Effect.forkDaemon (Global Scope)                            │
│     └── Файбер привязан к глобальному scope                     │
│                                                                 │
│  3. Effect.forkScoped (Local Scope)                             │
│     └── Файбер привязан к текущему Scope                        │
│                                                                 │
│  4. Effect.forkIn (Specific Scope)                              │
│     └── Файбер привязан к указанному Scope                      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Effect.fork — Базовый fork

Effect.fork создаёт дочерний файбер, который автоматически супервизируется родительским файбером. Это структурная конкурентность в действии.

Сигнатура

declare const fork: <A, E, R>(
  self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>

Ключевые характеристики

  • Дочерний файбер наследует контекст родителя (services, FiberRef и т.д.)
  • При завершении родителя все дочерние файберы прерываются
  • Ошибки в дочерних файберах НЕ propagate к родителю автоматически
  • Идеально для краткоживущих параллельных задач

Базовый пример


const fib = (n: number): Effect.Effect<number> =>
  n < 2
    ? Effect.succeed(n)
    : Effect.zipWith(fib(n - 1), fib(n - 2), (a, b) => a + b)

const program = Effect.gen(function* () {
  // Fork вычисления Фибоначчи
  const fiber = yield* Effect.fork(fib(10))
  
  console.log("Fibonacci calculation started...")
  
  // Делаем другую работу
  yield* Effect.sleep("100 millis")
  
  // Получаем результат
  const result = yield* Fiber.join(fiber)
  console.log(`Result: ${result}`)
})

Effect.runFork(program)
/*
Output:
Fibonacci calculation started...
Result: 55
*/

Автоматическая супервизия


// Дочерний файбер, который логирует каждую секунду
const child = Effect.repeat(
  Console.log("child: still running!"),
  Schedule.fixed("1 second")
)

const parent = Effect.gen(function* () {
  console.log("parent: started!")
  
  // Дочерний файбер супервизируется родителем
  yield* Effect.fork(child)
  
  yield* Effect.sleep("3 seconds")
  console.log("parent: finished!")
  // При завершении parent, child автоматически прерывается
})

Effect.runFork(parent)
/*
Output:
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!
*/

Визуализация жизненного цикла

                    Parent Fiber
    ╔═══════════════════════════════════════════╗
    ║  start        running          finished   ║
    ╚═══════════════════════════════════════════╝
         │                              │
         │ fork                         │ interrupt (automatic)
         ▼                              ▼
    ╔═══════════════════════════════════════════╗
    ║        Child Fiber (supervised)           ║
    ║  running...running...running...|STOP|     ║
    ╚═══════════════════════════════════════════╝

Многоуровневая иерархия

Структурная конкурентность работает на любом уровне вложенности:


const grandchild = Effect.repeat(
  Console.log("    grandchild: tick"),
  Schedule.fixed("300 millis")
)

const child = Effect.gen(function* () {
  yield* Effect.fork(grandchild)
  yield* Effect.repeat(
    Console.log("  child: tick"),
    Schedule.fixed("500 millis")
  )
})

const parent = Effect.gen(function* () {
  console.log("parent: start")
  yield* Effect.fork(child)
  yield* Effect.sleep("2 seconds")
  console.log("parent: finish")
})

Effect.runFork(parent)
/*
Output:
parent: start
    grandchild: tick
  child: tick
    grandchild: tick
    grandchild: tick
  child: tick
    grandchild: tick
    grandchild: tick
  child: tick
    grandchild: tick
parent: finish
← grandchild и child прерываются вместе с parent
*/

Effect.forkDaemon — Daemon файберы

Effect.forkDaemon создаёт daemon файбер, который не привязан к родителю. Его время жизни связано с глобальным scope приложения.

Сигнатура

declare const forkDaemon: <A, E, R>(
  self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>

Ключевые характеристики

  • Не имеет родительского файбера (не супервизируется)
  • Продолжает работать после завершения “создателя”
  • Завершается только при shutdown приложения или естественном завершении
  • Идеально для фоновых задач: мониторинг, метрики, healthcheck

Базовый пример daemon файбера


// Daemon файбер для фоновой задачи
const daemon = Effect.repeat(
  Console.log("daemon: still running!"),
  Schedule.fixed("1 second")
)

const parent = Effect.gen(function* () {
  console.log("parent: started!")
  
  // Daemon файбер работает независимо
  yield* Effect.forkDaemon(daemon)
  
  yield* Effect.sleep("3 seconds")
  console.log("parent: finished!")
  // Daemon продолжает работать!
})

Effect.runFork(parent)
/*
Output:
parent: started!
daemon: still running!
daemon: still running!
daemon: still running!
parent: finished!
daemon: still running!
daemon: still running!
...продолжает бесконечно...
*/

Визуализация daemon

                    Parent Fiber
    ╔═══════════════════════════════════╗
    ║  start      running     finished  ║
    ╚═══════════════════════════════════╝

         │ forkDaemon

    ╔═════════════════════════════════════════════════════════════╗
    ║              Daemon Fiber (global scope)                    ║
    ║  running...running...running...running...running...         ║
    ╠═════════════════════════════════════════════════════════════╣
    ║  Живёт до shutdown приложения или естественного завершения  ║
    ╚═════════════════════════════════════════════════════════════╝

Daemon не прерывается при прерывании родителя


const daemon = Effect.repeat(
  Console.log("daemon: still running!"),
  Schedule.fixed("1 second")
)

const parent = Effect.gen(function* () {
  console.log("parent: started!")
  yield* Effect.forkDaemon(daemon)
  yield* Effect.sleep("3 seconds")
  console.log("parent: finished!")
}).pipe(
  Effect.onInterrupt(() => Console.log("parent: interrupted!"))
)

// Программа, которая прерывает parent
const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(parent)
  yield* Effect.sleep("2 seconds")
  yield* Fiber.interrupt(fiber) // Прерываем parent
})

Effect.runFork(program)
/*
Output:
parent: started!
daemon: still running!
daemon: still running!
parent: interrupted!
daemon: still running!  ← Daemon продолжает работать!
daemon: still running!
...
*/

Практический пример: Health Check Service


// Глобальное состояние здоровья системы
const createHealthMonitor = Effect.gen(function* () {
  const healthStatus = yield* Ref.make({ 
    healthy: true, 
    lastCheck: Date.now() 
  })
  
  // Daemon для периодических health checks
  const healthChecker = Effect.repeat(
    Effect.gen(function* () {
      // Симуляция проверки
      const isHealthy = Math.random() > 0.1
      yield* Ref.set(healthStatus, {
        healthy: isHealthy,
        lastCheck: Date.now()
      })
      yield* Effect.log(`Health check: ${isHealthy ? "OK" : "DEGRADED"}`)
    }),
    Schedule.fixed("5 seconds")
  )
  
  yield* Effect.forkDaemon(healthChecker)
  
  return {
    getStatus: Ref.get(healthStatus)
  }
})

Предупреждения

┌─────────────────────────────────────────────────────────────────┐
│               ⚠️ ОСТОРОЖНО С DAEMON ФАЙБЕРАМИ                   │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  1. Утечки ресурсов                                             │
│     └── Daemon файберы могут держать ресурсы бесконечно         │
│                                                                 │
│  2. Нет автоматического cleanup                                 │
│     └── Вы ответственны за их завершение                        │
│                                                                 │
│  3. Сложность отладки                                           │
│     └── "Призрачные" файберы сложно отследить                   │
│                                                                 │
│  Рекомендация: Используйте только для истинно глобальных        │
│  фоновых задач (logging, metrics, health checks)                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Effect.forkScoped — Привязка к Scope

Effect.forkScoped создаёт файбер, привязанный к текущему Scope. Файбер может пережить своего “создателя”, но будет прерван при закрытии Scope.

Сигнатура

declare const forkScoped: <A, E, R>(
  self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R | Scope>

Ключевые характеристики

  • Требует Scope в зависимостях (R включает Scope)
  • Не привязан к родительскому файберу
  • Прерывается при закрытии Scope
  • Идеально для задач, привязанных к ресурсам

Базовый пример


// Дочерний файбер
const child = Effect.repeat(
  Console.log("child: still running!"),
  Schedule.fixed("1 second")
)

//      ┌─── Effect<void, never, Scope>
//      ▼
const parent = Effect.gen(function* () {
  console.log("parent: started!")
  
  // Файбер привязан к Scope, НЕ к parent
  yield* Effect.forkScoped(child)
  
  yield* Effect.sleep("3 seconds")
  console.log("parent: finished!")
})

// Программа с локальным scope
const program = Effect.scoped(
  Effect.gen(function* () {
    console.log("Local scope started!")
    yield* Effect.fork(parent) // fork parent в своём файбере
    yield* Effect.sleep("5 seconds")
    console.log("Leaving the local scope!")
  })
)

Effect.runFork(program)
/*
Output:
Local scope started!
parent: started!
child: still running!
child: still running!
child: still running!
parent: finished!     ← parent завершён, но child продолжает!
child: still running!
child: still running!
Leaving the local scope!  ← Scope закрывается, child прерывается
*/

Визуализация

Scope Boundary
═══════════════════════════════════════════════════════════════════
║                                                                 ║
║     Parent Fiber                                                ║
║     ╔═══════════════════════╗                                   ║
║     ║ start    finish       ║                                   ║
║     ╚═══════════════════════╝                                   ║
║          │                                                      ║
║          │ forkScoped                                           ║
║          ▼                                                      ║
║     ╔═══════════════════════════════════════════════════════╗   ║
║     ║           Child Fiber (scoped)                        ║   ║
║     ║  running...running...running...running...|STOP|       ║   ║
║     ╚═══════════════════════════════════════════════════════╝   ║
║                                                        │        ║
═══════════════════════════════════════════════════════════════════

                                              Scope closes

Практический пример: Ресурс с фоновой задачей


// Ресурс подключения к БД с фоновым heartbeat
const createDbConnection = Effect.gen(function* () {
  console.log("Opening DB connection...")
  
  // Фоновый heartbeat привязан к scope соединения
  yield* Effect.forkScoped(
    Effect.repeat(
      Effect.gen(function* () {
        yield* Effect.log("DB heartbeat: ping")
        // Симуляция ping
      }),
      Schedule.fixed("2 seconds")
    )
  )
  
  yield* Effect.addFinalizer(() =>
    Effect.sync(() => console.log("Closing DB connection..."))
  )
  
  return {
    query: (sql: string) => Effect.succeed(`Result for: ${sql}`)
  }
})

const program = Effect.scoped(
  Effect.gen(function* () {
    const db = yield* createDbConnection
    
    const result = yield* db.query("SELECT * FROM users")
    console.log(result)
    
    yield* Effect.sleep("5 seconds")
    console.log("Done with database work")
  })
)

Effect.runFork(program)
/*
Output:
Opening DB connection...
Result for: SELECT * FROM users
DB heartbeat: ping
DB heartbeat: ping
Done with database work
Closing DB connection...
← heartbeat прекращается с закрытием scope
*/

Effect.forkIn — Fork в конкретный Scope

Effect.forkIn даёт максимальный контроль — вы явно указываете Scope, к которому будет привязан файбер.

Сигнатура

declare const forkIn: (
  scope: Scope.Scope
) => <A, E, R>(
  self: Effect.Effect<A, E, R>
) => Effect.Effect<Fiber.RuntimeFiber<A, E>, never, R>

Ключевые характеристики

  • Явное указание Scope
  • Файбер может пережить и родителя, и текущий scope
  • Максимальная гибкость в управлении временем жизни
  • Используется для сложных сценариев

Пример с вложенными scope


const child = Effect.repeat(
  Console.log("child: still running!"),
  Schedule.fixed("1 second")
)

const program = Effect.scoped(
  Effect.gen(function* () {
    yield* Effect.addFinalizer(() =>
      Console.log("The outer scope is about to be closed!")
    )

    // Захватываем внешний scope
    const outerScope = yield* Effect.scope

    // Создаём внутренний scope
    yield* Effect.scoped(
      Effect.gen(function* () {
        yield* Effect.addFinalizer(() =>
          Console.log("The inner scope is about to be closed!")
        )
        
        // Fork child во ВНЕШНИЙ scope
        yield* Effect.forkIn(child, outerScope)
        
        yield* Effect.sleep("3 seconds")
      })
    )
    // Inner scope закрыт, но child продолжает!

    yield* Effect.sleep("5 seconds")
  })
)

Effect.runFork(program)
/*
Output:
child: still running!
child: still running!
child: still running!
The inner scope is about to be closed!  ← Inner scope закрыт
child: still running!                   ← Child продолжает!
child: still running!
child: still running!
child: still running!
child: still running!
child: still running!
The outer scope is about to be closed!  ← Outer scope закрыт
← Child прерван
*/

Визуализация forkIn

Outer Scope
╔═════════════════════════════════════════════════════════════════════╗
║                                                                     ║
║   Inner Scope                                                       ║
║   ╔═════════════════════════╗                                       ║
║   ║                         ║                                       ║
║   ║  forkIn(child, outer)   ║                                       ║
║   ║         │               ║                                       ║
║   ╚═════════│═══════════════╝                                       ║
║             │                   ◄── Inner scope закрыт              ║
║             │                                                       ║
║   ╔═════════▼═════════════════════════════════════════════════════╗ ║
║   ║              Child Fiber (owned by outer scope)               ║ ║
║   ║  running...running...running...running...running...|STOP|     ║ ║
║   ╚═══════════════════════════════════════════════════════════════╝ ║
║                                                                     ║
╚═════════════════════════════════════════════════════════════════════╝

                                              Outer scope closes ▼

Практический пример: Pool с переживающими воркерами


interface Task {
  readonly id: number
  readonly work: Effect.Effect<void>
}

const createWorkerPool = (size: number) =>
  Effect.gen(function* () {
    const poolScope = yield* Effect.scope
    const taskQueue = yield* Queue.unbounded<Task>()
    
    // Воркеры привязаны к pool scope, а не к caller scope
    const workers = yield* Effect.forEach(
      Array.from({ length: size }, (_, i) => i),
      (workerId) =>
        Effect.forkIn(
          Effect.forever(
            Effect.gen(function* () {
              const task = yield* Queue.take(taskQueue)
              yield* Effect.log(`Worker ${workerId}: executing task ${task.id}`)
              yield* task.work
            })
          ),
          poolScope
        )
    )
    
    return {
      submit: (task: Task) => Queue.offer(taskQueue, task),
      shutdown: Effect.forEach(workers, Fiber.interrupt, { discard: true })
    }
  })

Effect.forkAll — Массовый fork

Effect.forkAll форкает коллекцию эффектов одновременно.

Сигнатура

declare const forkAll: <A, E, R>(
  effects: Iterable<Effect.Effect<A, E, R>>,
  options?: { readonly discard?: boolean }
) => Effect.Effect<Fiber.Fiber<A[], E>, never, R>

Пример использования


const tasks = [
  Effect.sleep("100 millis").pipe(Effect.as(1)),
  Effect.sleep("200 millis").pipe(Effect.as(2)),
  Effect.sleep("150 millis").pipe(Effect.as(3))
]

const program = Effect.gen(function* () {
  // Fork все задачи одновременно
  const fiber = yield* Effect.forkAll(tasks)
  
  console.log("All tasks forked!")
  
  // Join composite fiber
  const results = yield* Fiber.join(fiber)
  console.log("Results:", results)
})

Effect.runFork(program)
/*
Output:
All tasks forked!
Results: [1, 2, 3]
*/

Опция discard


const effects = [
  Effect.log("Task 1"),
  Effect.log("Task 2"),
  Effect.log("Task 3")
]

// Результаты не нужны, только side effects
const program = Effect.gen(function* () {
  const fiber = yield* Effect.forkAll(effects, { discard: true })
  yield* Fiber.join(fiber)
})

Когда файберы начинают выполнение

Важно понимать, что форкнутые файберы начинают выполнение после того, как текущий файбер завершит текущую операцию или выполнит yield.

Проблема позднего старта


const program = Effect.gen(function* () {
  const ref = yield* SubscriptionRef.make(0)
  
  yield* ref.changes.pipe(
    Stream.tap((n) => Console.log(`Changed to ${n}`)),
    Stream.runDrain,
    Effect.fork  // Fork здесь
  )
  
  // Эти обновления происходят ДО старта файбера!
  yield* SubscriptionRef.set(ref, 1)
  yield* SubscriptionRef.set(ref, 2)
})

Effect.runFork(program)
/*
Output:
Changed to 2
← Видим только последнее значение!
*/

Решение: Effect.yieldNow или Effect.sleep


const program = Effect.gen(function* () {
  const ref = yield* SubscriptionRef.make(0)
  
  yield* ref.changes.pipe(
    Stream.tap((n) => Console.log(`Changed to ${n}`)),
    Stream.runDrain,
    Effect.fork
  )
  
  // Даём файберу время запуститься
  yield* Effect.sleep("100 millis")
  // или: yield* Effect.yieldNow()
  
  yield* SubscriptionRef.set(ref, 1)
  yield* SubscriptionRef.set(ref, 2)
})

Effect.runFork(program)
/*
Output:
Changed to 0
Changed to 1
Changed to 2
*/

Недетерминированность

┌─────────────────────────────────────────────────────────────────┐
│           ⚠️ ВЫПОЛНЕНИЕ ФАЙБЕРОВ НЕДЕТЕРМИНИРОВАНО              │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Время старта файбера зависит от многих факторов:               │
│                                                                 │
│  • Планировщик runtime                                          │
│  • Текущая нагрузка                                             │
│  • Количество других файберов                                   │
│  • Внешние факторы                                              │
│                                                                 │
│  НЕ полагайтесь на то, что один yield гарантирует               │
│  старт файбера в определённый момент.                           │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Сравнение стратегий

Таблица сравнения

ОперацияПривязкаСупервизияScope в RUse Case
forkParent fiberДаНетКраткоживущие параллельные задачи
forkDaemonGlobalНетНетФоновые системные задачи
forkScopedCurrent ScopeНетДаЗадачи с временем жизни ресурса
forkInSpecified ScopeНетНетТочный контроль времени жизни

Дерево решений

Нужен ли файбер, привязанный к родителю?

    ├── Да ──────────► Effect.fork

    └── Нет

          Нужен ли файбер в глобальном scope?

          ├── Да ──────────► Effect.forkDaemon

          └── Нет

                Нужна привязка к конкретному scope?

                ├── К текущему ────► Effect.forkScoped

                └── К другому ─────► Effect.forkIn(scope)

Пример выбора стратегии


// Сценарий: HTTP сервер с различными типами задач

// 1. fork — обработка запроса (умирает с handler'ом)
const handleRequest = Effect.gen(function* () {
  yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.log("Processing request...")
      yield* Effect.sleep("100 millis")
      yield* Effect.log("Request processed")
    })
  )
})

// 2. forkDaemon — глобальный health monitor
const startHealthMonitor = Effect.forkDaemon(
  Effect.repeat(
    Effect.log("Health: OK"),
    Schedule.fixed("30 seconds")
  )
)

// 3. forkScoped — connection pool worker
const createPoolWorker = Effect.forkScoped(
  Effect.forever(
    Effect.gen(function* () {
      yield* Effect.log("Pool worker: waiting for connection...")
      yield* Effect.sleep("1 second")
    })
  )
)

// 4. forkIn — кастомный scope для группы воркеров
const createWorkerGroup = (groupScope: Scope.Scope) =>
  Effect.forkIn(
    Effect.repeat(
      Effect.log("Worker group task"),
      Schedule.fixed("500 millis")
    ),
    groupScope
  )

API Reference

Основные функции forking

ФункцияСигнатураОписание
Effect.forkEffect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R>Fork с автосупервизией
Effect.forkDaemonEffect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R>Fork в глобальный scope
Effect.forkScopedEffect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R | Scope>Fork в текущий scope
Effect.forkIn(Scope) → Effect<A, E, R> → Effect<RuntimeFiber<A, E>, never, R>Fork в указанный scope
Effect.forkAllIterable<Effect<A, E, R>> → Effect<Fiber<A[], E>, never, R>Fork коллекции

Вспомогательные функции

ФункцияОписание
Effect.forkWithErrorHandlerFork с обработчиком ошибок
Effect.ensuringГарантировать выполнение finalizer

Примеры

Пример 1: Параллельная загрузка с прогрессом


interface DownloadTask {
  readonly url: string
  readonly size: number
}

const createDownloader = Effect.gen(function* () {
  const progress = yield* Ref.make(new Map<string, number>())
  
  const download = (task: DownloadTask) =>
    Effect.gen(function* () {
      let downloaded = 0
      while (downloaded < task.size) {
        yield* Effect.sleep("50 millis")
        downloaded += Math.min(10, task.size - downloaded)
        yield* Ref.update(progress, (map) =>
          new Map(map).set(task.url, (downloaded / task.size) * 100)
        )
      }
      return task.url
    })
  
  const getProgress = Ref.get(progress)
  
  return { download, getProgress }
})

const program = Effect.gen(function* () {
  const { download, getProgress } = yield* createDownloader
  
  const tasks: ReadonlyArray<DownloadTask> = [
    { url: "file1.txt", size: 100 },
    { url: "file2.txt", size: 50 },
    { url: "file3.txt", size: 150 }
  ]
  
  // Fork все загрузки
  const fibers = yield* Effect.forEach(tasks, (task) =>
    Effect.fork(download(task))
  )
  
  // Мониторинг прогресса
  yield* Effect.fork(
    Effect.repeat(
      Effect.gen(function* () {
        const p = yield* getProgress
        console.log("Progress:", Object.fromEntries(p))
      }),
      Schedule.fixed("100 millis").pipe(Schedule.intersect(Schedule.recurs(20)))
    )
  )
  
  // Ждём завершения всех загрузок
  const results = yield* Effect.forEach(fibers, Fiber.join)
  console.log("Completed:", results)
})

Effect.runFork(program)

Пример 2: Background job processor с daemon


interface Job {
  readonly id: string
  readonly payload: unknown
}

const createJobProcessor = Effect.gen(function* () {
  const jobQueue = yield* Queue.unbounded<Job>()
  const processedCount = yield* Ref.make(0)
  
  // Daemon processor - живёт пока живёт приложение
  yield* Effect.forkDaemon(
    Effect.forever(
      Effect.gen(function* () {
        const job = yield* Queue.take(jobQueue)
        yield* Effect.log(`Processing job: ${job.id}`)
        yield* Effect.sleep("100 millis") // Симуляция работы
        yield* Ref.update(processedCount, (n) => n + 1)
        yield* Effect.log(`Completed job: ${job.id}`)
      })
    )
  )
  
  return {
    submit: (job: Job) => Queue.offer(jobQueue, job),
    getProcessedCount: Ref.get(processedCount)
  }
})

const program = Effect.gen(function* () {
  const processor = yield* createJobProcessor
  
  // Отправляем задачи
  for (let i = 0; i < 5; i++) {
    yield* processor.submit({ id: `job-${i}`, payload: { data: i } })
  }
  
  yield* Effect.sleep("1 second")
  
  const count = yield* processor.getProcessedCount
  console.log(`Processed ${count} jobs`)
})

Effect.runFork(program)

Пример 3: Resource pool с forkScoped


interface Connection {
  readonly id: number
  readonly execute: (sql: string) => Effect.Effect<string>
}

const createConnectionPool = (size: number) =>
  Effect.gen(function* () {
    const connectionId = yield* Ref.make(0)
    const available = yield* Queue.bounded<Connection>(size)
    
    // Создаём соединения
    for (let i = 0; i < size; i++) {
      const id = yield* Ref.updateAndGet(connectionId, (n) => n + 1)
      const conn: Connection = {
        id,
        execute: (sql) => Effect.succeed(`[Conn ${id}] Result: ${sql}`)
      }
      yield* Queue.offer(available, conn)
    }
    
    // Heartbeat worker привязан к scope пула
    yield* Effect.forkScoped(
      Effect.repeat(
        Effect.log("Pool heartbeat: connections alive"),
        Schedule.fixed("2 seconds")
      )
    )
    
    const withConnection = <A, E, R>(
      use: (conn: Connection) => Effect.Effect<A, E, R>
    ) =>
      Effect.gen(function* () {
        const conn = yield* Queue.take(available)
        const result = yield* use(conn).pipe(
          Effect.ensuring(Queue.offer(available, conn))
        )
        return result
      })
    
    return { withConnection }
  })

const program = Effect.scoped(
  Effect.gen(function* () {
    const pool = yield* createConnectionPool(3)
    
    const result = yield* pool.withConnection((conn) =>
      conn.execute("SELECT * FROM users")
    )
    console.log(result)
    
    yield* Effect.sleep("5 seconds")
    console.log("Closing pool...")
  })
)

Effect.runFork(program)

Упражнения

Упражнение

Упражнение 1: Простой fork и join

Легко

Создайте программу, которая форкает три эффекта с разными задержками и собирает результаты.

import { Effect, Fiber } from "effect"

const task = (id: number, delay: number) =>
  Effect.gen(function* () {
    yield* Effect.sleep(`${delay} millis`)
    return `Task ${id} completed`
  })

const program = Effect.gen(function* () {
  // Ваша реализация:
  // 1. Fork три задачи с разными задержками
  // 2. Соберите все результаты
  // 3. Выведите результаты
  ???
})
Упражнение

Упражнение 2: Daemon logger

Легко

Создайте daemon logger, который периодически выводит timestamp.

import { Effect, Schedule } from "effect"

const createDaemonLogger = () =>
  // Ваша реализация
  ???

const program = Effect.gen(function* () {
  yield* createDaemonLogger()
  
  // Основная работа
  yield* Effect.log("Main: doing work...")
  yield* Effect.sleep("3 seconds")
  yield* Effect.log("Main: finished!")
  // Logger должен продолжить работать
})
Упражнение

Упражнение 3: Connection с scoped heartbeat

Средне

Создайте ресурс соединения с heartbeat, привязанным к scope.

import { Effect, Schedule, Scope } from "effect"

interface Connection {
  readonly send: (msg: string) => Effect.Effect<void>
  readonly isAlive: Effect.Effect<boolean>
}

const createConnection = (): Effect.Effect<Connection, never, Scope> =>
  // Ваша реализация:
  // 1. Создайте heartbeat, который проверяет соединение каждые 500ms
  // 2. Heartbeat должен быть forkScoped
  // 3. При закрытии scope heartbeat должен прекратиться
  ???
Упражнение

Упражнение 4: Nested scopes с forkIn

Средне

Реализуйте систему с тремя уровнями scopes, где файбер форкается в middle scope.

import { Effect, Scope } from "effect"

const program = Effect.scoped(
  Effect.gen(function* () {
    yield* Effect.log("Outer scope started")
    
    // Ваша реализация:
    // 1. Создайте middle scope внутри outer
    // 2. Внутри middle создайте inner scope
    // 3. В inner scope форкните файбер в middle scope
    // 4. Покажите, что файбер переживает inner scope
    ???
  })
)
Упражнение

Упражнение 5: Worker Pool с динамическим масштабированием

Сложно

Создайте pool воркеров, который автоматически масштабируется в зависимости от нагрузки.

import { Effect, Queue, Fiber, Ref, Scope } from "effect"

interface ScalablePool {
  readonly submit: <A>(task: Effect.Effect<A>) => Effect.Effect<A>
  readonly getWorkerCount: Effect.Effect<number>
  readonly shutdown: Effect.Effect<void>
}

const createScalablePool = (config: {
  readonly minWorkers: number
  readonly maxWorkers: number
  readonly scaleUpThreshold: number  // Добавить воркера если очередь > threshold
  readonly scaleDownThreshold: number // Убрать воркера если очередь < threshold
}): Effect.Effect<ScalablePool, never, Scope> =>
  // Ваша реализация
  ???

Заключение

Forking в Effect предоставляет четыре стратегии управления жизненным циклом файберов:

  1. Effect.fork — структурная конкурентность с автосупервизией
  2. Effect.forkDaemon — глобальные фоновые задачи
  3. Effect.forkScoped — привязка к времени жизни ресурса
  4. Effect.forkIn — точный контроль над scope

Выбор правильной стратегии критичен для:

  • Предотвращения утечек ресурсов
  • Корректной обработки ошибок
  • Предсказуемого поведения приложения

В следующей статье мы подробно рассмотрим операции для работы с уже созданными файберами: join, await, poll и interrupt.