Effect Курс Joining

Joining

Операции ожидания и композиции файберов.

Введение в операции с файберами

После создания файбера через fork вам нужны способы взаимодействия с ним. Effect предоставляет богатый набор операций для:

  • Ожидания завершения и получения результата
  • Проверки состояния без блокировки
  • Прерывания выполнения
  • Композиции нескольких файберов

Обзор операций

┌─────────────────────────────────────────────────────────────────┐
│                    ОПЕРАЦИИ С FIBER                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Fiber.join                                                     │
│  ├── Блокирующее ожидание                                       │
│  ├── Возвращает A (re-raises E)                                 │
│  └── Самый простой способ получить результат                    │
│                                                                 │
│  Fiber.await                                                    │
│  ├── Блокирующее ожидание                                       │
│  ├── Возвращает Exit<A, E>                                      │
│  └── Полная информация о завершении                             │
│                                                                 │
│  Fiber.poll                                                     │
│  ├── Неблокирующая проверка                                     │
│  ├── Возвращает Option<Exit<A, E>>                              │
│  └── Для polling и мониторинга                                  │
│                                                                 │
│  Fiber.interrupt                                                │
│  ├── Прерывание файбера                                         │
│  ├── Возвращает Exit<A, E>                                      │
│  └── Безопасная отмена с cleanup                                │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Fiber.join — Получение результата

Fiber.join — самая распространённая операция для получения результата файбера. Она блокирует текущий файбер до завершения целевого и возвращает результат или пробрасывает ошибку.

Сигнатура

declare const join: <A, E>(
  fiber: Fiber<A, E>
) => Effect.Effect<A, E>

Ключевые характеристики

  • Блокирующая — текущий файбер приостанавливается до завершения
  • Re-raises ошибки — ошибка из файбера становится ошибкой эффекта
  • Unwraps результат — возвращает A, а не Exit<A, E>
  • Не передаёт прерывания — если файбер прерван, join завершится с interrupted Cause

Базовый пример


const fib = (n: number): Effect.Effect<number> =>
  n < 2
    ? Effect.succeed(n)
    : Effect.zipWith(fib(n - 1), fib(n - 2), (a, b) => a + b)

const program = Effect.gen(function* () {
  // Fork вычисления
  const fiber = yield* Effect.fork(fib(10))
  
  console.log("Calculation running in background...")
  yield* Effect.sleep("100 millis")
  
  // Join — дожидаемся результата
  const result = yield* Fiber.join(fiber)
  console.log(`Fibonacci(10) = ${result}`)
})

Effect.runFork(program)
/*
Output:
Calculation running in background...
Fibonacci(10) = 55
*/

Обработка ошибок через join


const failingTask = Effect.gen(function* () {
  yield* Effect.sleep("100 millis")
  return yield* Effect.fail("Task failed!" as const)
})

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(failingTask)
  
  // Join пробрасывает ошибку
  const result = yield* Fiber.join(fiber)
  // Этот код не выполнится при ошибке
  console.log("Result:", result)
}).pipe(
  Effect.catchAll((error) =>
    Effect.succeed(`Caught error: ${error}`)
  )
)

Effect.runPromise(program).then(console.log)
// Output: Caught error: Task failed!

Диаграмма join

Main Fiber                          Forked Fiber
     │                                   │
     │── fork ──────────────────────────►│
     │                                   │
     │   (doing other work...)           │ (computing...)
     │                                   │
     │◄───────────── join ───────────────│
     │   (blocked)                       │
     │                                   │
     │◄────────────── result ────────────│
     │                                   ▼
     │                              (completed)

(continues with result)

Fiber.await — Получение Exit

Fiber.await также ожидает завершения файбера, но возвращает Exit<A, E>, давая полную информацию о том, как файбер завершился.

Сигнатура

declare const await: <A, E>(
  fiber: Fiber<A, E>
) => Effect.Effect<Exit<A, E>, never, never>

Ключевые характеристики

  • Блокирующая — как join
  • Never fails — возвращает Effect<Exit<A, E>, never, never>
  • Полная информация — Success, Failure, или Interrupted
  • Inspection-friendly — можно проанализировать причину завершения

Базовый пример


const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.sleep("100 millis")
      return 42
    })
  )
  
  // Await возвращает Exit
  const exit = yield* Fiber.await(fiber)
  console.log(exit)
})

Effect.runFork(program)
/*
Output:
{ _id: 'Exit', _tag: 'Success', value: 42 }
*/

Обработка различных Exit состояний


const analyzeExit = <A, E>(exit: Exit.Exit<A, E>): string => {
  if (Exit.isSuccess(exit)) {
    return `Success: ${exit.value}`
  }
  
  const cause = exit.cause
  
  if (Cause.isFailType(cause)) {
    return `Failed with: ${cause.error}`
  }
  
  if (Cause.isInterruptedOnly(cause)) {
    return `Interrupted`
  }
  
  if (Cause.isDieType(cause)) {
    return `Died with defect: ${cause.defect}`
  }
  
  return `Complex failure: ${Cause.pretty(cause)}`
}

const program = Effect.gen(function* () {
  // Успешный файбер
  const fiber1 = yield* Effect.fork(Effect.succeed(42))
  const exit1 = yield* Fiber.await(fiber1)
  console.log("Fiber 1:", analyzeExit(exit1))
  
  // Файбер с ошибкой
  const fiber2 = yield* Effect.fork(Effect.fail("Oops"))
  const exit2 = yield* Fiber.await(fiber2)
  console.log("Fiber 2:", analyzeExit(exit2))
  
  // Файбер с прерыванием
  const fiber3 = yield* Effect.fork(
    Effect.sleep("1 hour")
  )
  yield* Fiber.interrupt(fiber3)
  const exit3 = yield* Fiber.await(fiber3)
  console.log("Fiber 3:", analyzeExit(exit3))
})

Effect.runFork(program)
/*
Output:
Fiber 1: Success: 42
Fiber 2: Failed with: Oops
Fiber 3: Interrupted
*/

Паттерн: Изоляция ошибок


// Изолируем ошибку одного файбера от остальных
const isolatedExecution = <A, E>(
  effect: Effect.Effect<A, E>
) =>
  Effect.gen(function* () {
    const fiber = yield* Effect.fork(effect)
    const exit = yield* Fiber.await(fiber)
    return exit
  })

const program = Effect.gen(function* () {
  const results = yield* Effect.all([
    isolatedExecution(Effect.succeed(1)),
    isolatedExecution(Effect.fail("error")),
    isolatedExecution(Effect.succeed(3))
  ])
  
  // Все три результата получены, несмотря на ошибку во втором
  for (const [index, exit] of results.entries()) {
    console.log(`Task ${index}: ${Exit.isSuccess(exit) ? "Success" : "Failure"}`)
  }
})

Effect.runFork(program)
/*
Output:
Task 0: Success
Task 1: Failure
Task 2: Success
*/

Fiber.poll — Неблокирующая проверка

Fiber.poll позволяет проверить состояние файбера без блокировки. Возвращает Option<Exit<A, E>>None если файбер ещё выполняется, Some(exit) если завершился.

Сигнатура

declare const poll: <A, E>(
  fiber: Fiber<A, E>
) => Effect.Effect<Option<Exit<A, E>>, never, never>

Ключевые характеристики

  • Неблокирующая — возвращается немедленно
  • Idempotent — можно вызывать многократно
  • Never fails — всегда возвращает успешный Effect
  • Monitoring-friendly — идеально для polling loops

Базовый пример


const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.sleep("500 millis")
      return "done"
    })
  )
  
  // Немедленная проверка — файбер ещё работает
  const poll1 = yield* Fiber.poll(fiber)
  console.log("Poll 1:", Option.isNone(poll1) ? "Still running" : "Completed")
  
  yield* Effect.sleep("300 millis")
  
  // Ещё одна проверка
  const poll2 = yield* Fiber.poll(fiber)
  console.log("Poll 2:", Option.isNone(poll2) ? "Still running" : "Completed")
  
  yield* Effect.sleep("300 millis")
  
  // Теперь должен быть завершён
  const poll3 = yield* Fiber.poll(fiber)
  if (Option.isSome(poll3)) {
    const result = Exit.isSuccess(poll3.value) 
      ? poll3.value.value 
      : "error"
    console.log("Poll 3: Completed with", result)
  }
})

Effect.runFork(program)
/*
Output:
Poll 1: Still running
Poll 2: Still running
Poll 3: Completed with done
*/

Polling loop с таймаутом


const pollWithTimeout = <A, E>(
  fiber: Fiber.Fiber<A, E>,
  timeout: Duration.DurationInput,
  pollInterval: Duration.DurationInput
): Effect.Effect<A, E | "Timeout"> =>
  Effect.gen(function* () {
    const deadline = Date.now() + Duration.toMillis(timeout)
    
    while (Date.now() < deadline) {
      const maybeExit = yield* Fiber.poll(fiber)
      
      if (Option.isSome(maybeExit)) {
        const exit = maybeExit.value
        if (Exit.isSuccess(exit)) {
          return exit.value
        } else {
          return yield* Effect.failCause(exit.cause)
        }
      }
      
      yield* Effect.sleep(pollInterval)
    }
    
    // Таймаут — прерываем файбер
    yield* Fiber.interrupt(fiber)
    return yield* Effect.fail("Timeout" as const)
  })

const program = Effect.gen(function* () {
  // Быстрая задача — успеет завершиться
  const fiber1 = yield* Effect.fork(
    Effect.sleep("200 millis").pipe(Effect.as("fast result"))
  )
  const result1 = yield* pollWithTimeout(fiber1, "1 second", "50 millis")
  console.log("Result 1:", result1)
  
  // Медленная задача — таймаут
  const fiber2 = yield* Effect.fork(
    Effect.sleep("5 seconds").pipe(Effect.as("slow result"))
  )
  const result2 = yield* pollWithTimeout(fiber2, "500 millis", "100 millis").pipe(
    Effect.catchAll((e) => Effect.succeed(`Caught: ${e}`))
  )
  console.log("Result 2:", result2)
})

Effect.runFork(program)
/*
Output:
Result 1: fast result
Result 2: Caught: Timeout
*/

Мониторинг нескольких файберов


const monitorFibers = <A, E>(
  fibers: ReadonlyArray<Fiber.Fiber<A, E>>
) =>
  Effect.gen(function* () {
    while (true) {
      const polls = yield* Effect.forEach(fibers, Fiber.poll)
      
      const completed = polls.filter(Option.isSome).length
      const running = polls.filter(Option.isNone).length
      
      console.log(`Status: ${completed} completed, ${running} running`)
      
      if (running === 0) {
        const results = polls.map((p) => {
          if (Option.isSome(p) && Exit.isSuccess(p.value)) {
            return p.value.value
          }
          return null
        })
        return results
      }
      
      yield* Effect.sleep("100 millis")
    }
  })

const program = Effect.gen(function* () {
  const fibers = yield* Effect.forEach(
    [100, 300, 200, 500, 150],
    (delay) => Effect.fork(
      Effect.sleep(`${delay} millis`).pipe(Effect.as(delay))
    )
  )
  
  const results = yield* monitorFibers(fibers)
  console.log("Final results:", results)
})

Effect.runFork(program)

Fiber.interrupt — Прерывание файбера

Fiber.interrupt прерывает файбер и возвращает его Exit. Прерывание безопасно — все finalizers будут выполнены.

Сигнатура

declare const interrupt: <A, E>(
  fiber: Fiber<A, E>
) => Effect.Effect<Exit<A, E>, never, never>

Ключевые характеристики

  • Асинхронное — сигнализирует о прерывании, но завершение может занять время
  • Back-pressuring — по умолчанию ждёт полного завершения файбера
  • Safe — выполняет все finalizers
  • Идемпотентное — повторный вызов безопасен

Базовый пример прерывания


const program = Effect.gen(function* () {
  // Fork бесконечно работающего файбера
  const fiber = yield* Effect.fork(
    Effect.forever(
      Effect.log("Hi!").pipe(Effect.delay("10 millis"))
    )
  )
  
  yield* Effect.sleep("30 millis")
  
  // Прерываем и получаем Exit
  const exit = yield* Fiber.interrupt(fiber)
  console.log(exit)
})

Effect.runFork(program)
/*
Output:
timestamp=... level=INFO fiber=#1 message=Hi!
timestamp=... level=INFO fiber=#1 message=Hi!
{
  _id: 'Exit',
  _tag: 'Failure',
  cause: {
    _id: 'Cause',
    _tag: 'Interrupt',
    fiberId: { _id: 'FiberId', _tag: 'Runtime', id: 0, startTimeMillis: ... }
  }
}
*/

Back-pressuring vs Fire-and-forget


const slowCleanup = Effect.gen(function* () {
  yield* Effect.addFinalizer(() =>
    Effect.gen(function* () {
      yield* Effect.log("Cleanup started...")
      yield* Effect.sleep("500 millis")
      yield* Effect.log("Cleanup completed!")
    })
  )
  
  yield* Effect.forever(Effect.sleep("100 millis"))
})

// Back-pressuring (default) — ждём завершения cleanup
const withBackpressure = Effect.gen(function* () {
  const fiber = yield* Effect.fork(slowCleanup)
  yield* Effect.sleep("100 millis")
  
  console.log("Interrupting with back-pressure...")
  yield* Fiber.interrupt(fiber) // Ждёт cleanup
  console.log("Interrupt completed!") // После cleanup
})

// Fire-and-forget — не ждём
const withoutBackpressure = Effect.gen(function* () {
  const fiber = yield* Effect.fork(slowCleanup)
  yield* Effect.sleep("100 millis")
  
  console.log("Interrupting without back-pressure...")
  yield* Effect.fork(Fiber.interrupt(fiber)) // Не ждём
  console.log("Interrupt signal sent!") // Немедленно
  
  yield* Effect.sleep("1 second") // Даём время на cleanup
})

Effect.runFork(withBackpressure)
/*
Output:
Interrupting with back-pressure...
Cleanup started...
Cleanup completed!
Interrupt completed!
*/

Fiber.interruptFork — Удобный shorthand


const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    Effect.forever(Effect.log("Working...").pipe(Effect.delay("50 millis")))
  )
  
  yield* Effect.sleep("200 millis")
  
  // Эквивалентно Effect.fork(Fiber.interrupt(fiber))
  yield* Fiber.interruptFork(fiber)
  
  console.log("Interrupt signal sent, continuing...")
  yield* Effect.sleep("100 millis")
})

Effect.runFork(program)

Прерывание с причиной


const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    Effect.sleep("1 hour")
  )
  
  // Обычное прерывание
  const exit1 = yield* Fiber.interrupt(fiber)
  
  if (Exit.isFailure(exit1) && Cause.isInterruptedOnly(exit1.cause)) {
    const interruptors = Cause.interruptors(exit1.cause)
    console.log("Interrupted by:", interruptors)
  }
})

Effect.runFork(program)
/*
Output:
Interrupted by: Set { FiberId.Runtime(0, ...) }
*/

Композиция файберов

Effect предоставляет операции для композиции нескольких файберов.

Fiber.zip — Объединение в tuple


const program = Effect.gen(function* () {
  const fiber1 = yield* Effect.fork(Effect.succeed("Hello"))
  const fiber2 = yield* Effect.fork(Effect.succeed("World"))
  
  // Объединяем файберы
  const combined = Fiber.zip(fiber1, fiber2)
  
  // Join возвращает tuple
  const [a, b] = yield* Fiber.join(combined)
  console.log(`${a} ${b}`)
})

Effect.runFork(program)
// Output: Hello World

Fiber.zipWith — Объединение с функцией


const program = Effect.gen(function* () {
  const fiber1 = yield* Effect.fork(Effect.succeed(10))
  const fiber2 = yield* Effect.fork(Effect.succeed(32))
  
  // Комбинируем результаты
  const combined = Fiber.zipWith(
    fiber1, 
    fiber2, 
    (a, b) => a + b
  )
  
  const result = yield* Fiber.join(combined)
  console.log(`Sum: ${result}`)
})

Effect.runFork(program)
// Output: Sum: 42

Fiber.orElse — Fallback


const program = Effect.gen(function* () {
  // Первый файбер завершится ошибкой
  const fiber1 = yield* Effect.fork(Effect.fail("Primary failed"))
  
  // Второй — успешно
  const fiber2 = yield* Effect.fork(Effect.succeed("Fallback success"))
  
  // orElse: если первый провалится, используем второй
  const withFallback = Fiber.orElse(fiber1, fiber2)
  
  const result = yield* Fiber.join(withFallback)
  console.log(result)
})

Effect.runFork(program)
// Output: Fallback success

Fiber.all — Сбор всех результатов


const program = Effect.gen(function* () {
  const fibers = yield* Effect.forEach(
    [1, 2, 3, 4, 5],
    (n) => Effect.fork(
      Effect.sleep(`${n * 100} millis`).pipe(Effect.as(n * n))
    )
  )
  
  // Собираем все файберы в один
  const allFibers = Fiber.all(fibers)
  
  // Join всех
  const results = yield* Fiber.join(allFibers)
  console.log("Results:", results)
})

Effect.runFork(program)
// Output: Results: [1, 4, 9, 16, 25]

Семантика композиции

┌─────────────────────────────────────────────────────────────────┐
│                   КОМПОЗИЦИЯ ФАЙБЕРОВ                           │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Fiber.zip(f1, f2)                                              │
│  ├── Оба должны успешно завершиться                             │
│  ├── Если один провалится — composite fiber провалится          │
│  └── Результат: [A, B]                                          │
│                                                                 │
│  Fiber.orElse(f1, f2)                                           │
│  ├── Если f1 успешен — результат f1                             │
│  ├── Если f1 провалился — результат f2                          │
│  └── f2 может провалиться или успеть                            │
│                                                                 │
│  Fiber.all([f1, f2, ...])                                       │
│  ├── Все должны успешно завершиться                             │
│  ├── Если любой провалится — composite провалится               │
│  └── Результат: [A1, A2, ...]                                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Fiber.status — Получение статуса

Fiber.status возвращает текущий статус файбера без блокировки.

Типы статусов


// Файбер выполняется
type Running = FiberStatus.Running

// Файбер приостановлен (ждёт чего-то)
type Suspended = FiberStatus.Suspended

// Файбер завершён
type Done = FiberStatus.Done

Пример мониторинга статуса


const monitorStatus = (fiber: Fiber.RuntimeFiber<unknown, unknown>) =>
  Effect.gen(function* () {
    while (true) {
      const status = yield* Fiber.status(fiber)
      
      if (FiberStatus.isDone(status)) {
        console.log("Fiber completed!")
        return
      }
      
      if (FiberStatus.isRunning(status)) {
        console.log("Fiber is running")
      }
      
      if (FiberStatus.isSuspended(status)) {
        console.log("Fiber is suspended")
      }
      
      yield* Effect.sleep("100 millis")
    }
  })

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(
    Effect.gen(function* () {
      yield* Effect.log("Starting work...")
      yield* Effect.sleep("300 millis")
      yield* Effect.log("Finishing work...")
      return "done"
    })
  )
  
  yield* monitorStatus(fiber)
})

Effect.runFork(program)

API Reference

Операции ожидания

ФункцияСигнатураОписание
Fiber.joinFiber<A, E> → Effect<A, E>Блокирующее ожидание, re-raises ошибку
Fiber.awaitFiber<A, E> → Effect<Exit<A, E>>Блокирующее ожидание, возвращает Exit
Fiber.pollFiber<A, E> → Effect<Option<Exit<A, E>>>Неблокирующая проверка

Прерывание

ФункцияСигнатураОписание
Fiber.interruptFiber<A, E> → Effect<Exit<A, E>>Прервать и дождаться
Fiber.interruptForkFiber<A, E> → Effect<void>Прервать без ожидания
Fiber.interruptAllIterable<Fiber<A, E>> → Effect<void>Прервать все файберы
Fiber.interruptAllAs(FiberId, Iterable<Fiber>) → Effect<void>Прервать от имени

Композиция

ФункцияСигнатураОписание
Fiber.zip(Fiber<A>, Fiber<B>) → Fiber<[A, B]>Объединить в tuple
Fiber.zipWith(Fiber<A>, Fiber<B>, f) → Fiber<C>Объединить с функцией
Fiber.orElse(Fiber<A>, Fiber<A>) → Fiber<A>Fallback
Fiber.allIterable<Fiber<A>> → Fiber<A[]>Собрать все

Инспекция

ФункцияСигнатураОписание
Fiber.statusFiber<A, E> → Effect<FiberStatus>Текущий статус
Fiber.idFiber<A, E> → FiberIdID файбера

Примеры

Пример 1: Race с таймаутом


const raceWithTimeout = <A, E>(
  effect: Effect.Effect<A, E>,
  timeoutMs: number
): Effect.Effect<A, E | "Timeout"> =>
  Effect.gen(function* () {
    // Создаём Deferred для результата
    const result = yield* Deferred.make<A, E | "Timeout">()
    
    // Fork основную задачу
    const mainFiber = yield* Effect.fork(
      Effect.gen(function* () {
        const value = yield* effect
        yield* Deferred.succeed(result, value)
      }).pipe(
        Effect.catchAll((error) => Deferred.fail(result, error))
      )
    )
    
    // Fork таймер
    const timerFiber = yield* Effect.fork(
      Effect.gen(function* () {
        yield* Effect.sleep(`${timeoutMs} millis`)
        yield* Deferred.fail(result, "Timeout" as const)
      })
    )
    
    // Ждём результат
    const value = yield* Deferred.await(result)
    
    // Cleanup — прерываем оставшийся файбер
    yield* Fiber.interruptFork(mainFiber)
    yield* Fiber.interruptFork(timerFiber)
    
    return value
  })

const program = Effect.gen(function* () {
  // Успешный случай
  const result1 = yield* raceWithTimeout(
    Effect.sleep("100 millis").pipe(Effect.as("fast")),
    500
  )
  console.log("Result 1:", result1)
  
  // Таймаут
  const result2 = yield* raceWithTimeout(
    Effect.sleep("1 second").pipe(Effect.as("slow")),
    200
  ).pipe(
    Effect.catchAll((e) => Effect.succeed(`Error: ${e}`))
  )
  console.log("Result 2:", result2)
})

Effect.runFork(program)
/*
Output:
Result 1: fast
Result 2: Error: Timeout
*/

Пример 2: Batch processor с прогрессом


interface BatchResult<A> {
  readonly completed: ReadonlyArray<A>
  readonly failed: number
  readonly total: number
}

const processBatch = <A, E, B>(
  items: ReadonlyArray<A>,
  process: (item: A) => Effect.Effect<B, E>,
  onProgress: (completed: number, total: number) => Effect.Effect<void>
): Effect.Effect<BatchResult<B>> =>
  Effect.gen(function* () {
    const results = yield* Ref.make<Chunk.Chunk<B>>(Chunk.empty())
    const failedCount = yield* Ref.make(0)
    const completedCount = yield* Ref.make(0)
    
    // Процессим каждый item
    const fibers = yield* Effect.forEach(items, (item) =>
      Effect.fork(
        Effect.gen(function* () {
          const exit = yield* Effect.exit(process(item))
          
          if (Exit.isSuccess(exit)) {
            yield* Ref.update(results, Chunk.append(exit.value))
          } else {
            yield* Ref.update(failedCount, (n) => n + 1)
          }
          
          const completed = yield* Ref.updateAndGet(completedCount, (n) => n + 1)
          yield* onProgress(completed, items.length)
        })
      )
    )
    
    // Ждём все файберы
    yield* Effect.forEach(fibers, Fiber.join, { discard: true })
    
    const completed = yield* Ref.get(results)
    const failed = yield* Ref.get(failedCount)
    
    return {
      completed: Chunk.toArray(completed),
      failed,
      total: items.length
    }
  })

const program = Effect.gen(function* () {
  const items = Array.from({ length: 10 }, (_, i) => i + 1)
  
  const result = yield* processBatch(
    items,
    (n) =>
      n % 3 === 0
        ? Effect.fail(`Error for ${n}`)
        : Effect.sleep(`${n * 50} millis`).pipe(Effect.as(n * n)),
    (completed, total) =>
      Effect.log(`Progress: ${completed}/${total}`)
  )
  
  console.log("Final result:", result)
})

Effect.runFork(program)

Пример 3: First success pattern


const firstSuccess = <A, E>(
  effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
  Effect.gen(function* () {
    if (effects.length === 0) {
      return yield* Effect.die("No effects provided")
    }
    
    const result = yield* Deferred.make<A, E>()
    const failureCount = yield* Ref.make(0)
    const lastError = yield* Ref.make<E | null>(null)
    
    const fibers = yield* Effect.forEach(effects, (effect) =>
      Effect.fork(
        Effect.gen(function* () {
          const exit = yield* Effect.exit(effect)
          
          if (Exit.isSuccess(exit)) {
            // Первый успех выигрывает
            yield* Deferred.succeed(result, exit.value)
          } else {
            // Запоминаем ошибку
            const failures = Cause.failures(exit.cause)
            if (failures.length > 0) {
              yield* Ref.set(lastError, failures[0]!)
            }
            
            const count = yield* Ref.updateAndGet(failureCount, (n) => n + 1)
            
            // Все провалились
            if (count === effects.length) {
              const error = yield* Ref.get(lastError)
              if (error !== null) {
                yield* Deferred.fail(result, error)
              }
            }
          }
        })
      )
    )
    
    const value = yield* Deferred.await(result)
    
    // Cleanup
    yield* Effect.forEach(fibers, Fiber.interruptFork, { discard: true })
    
    return value
  })


const program = Effect.gen(function* () {
  const result = yield* firstSuccess([
    Effect.sleep("300 millis").pipe(Effect.flatMap(() => Effect.fail("slow fail"))),
    Effect.sleep("100 millis").pipe(Effect.flatMap(() => Effect.fail("fast fail"))),
    Effect.sleep("200 millis").pipe(Effect.as("success!")),
    Effect.sleep("500 millis").pipe(Effect.as("too slow"))
  ])
  
  console.log("First success:", result)
})

Effect.runFork(program)
// Output: First success: success!

Упражнения

Упражнение

Упражнение 1: Join vs Await

Легко

Сравните поведение join и await при ошибке.

import { Effect, Fiber, Exit } from "effect"

const failingEffect = Effect.fail("Something went wrong!")

const program = Effect.gen(function* () {
  const fiber = yield* Effect.fork(failingEffect)
  
  // Часть 1: Используйте join и обработайте ошибку через catchAll
  // Ваш код...
  
  // Часть 2: Используйте await и проанализируйте Exit
  // Ваш код...
})
Упражнение

Упражнение 2: Простой polling

Легко

Реализуйте функцию, которая poll’ит файбер каждые 100ms до завершения.

import { Effect, Fiber, Option, Exit } from "effect"

const pollUntilDone = <A, E>(
  fiber: Fiber.Fiber<A, E>
): Effect.Effect<A, E> =>
  // Ваша реализация
  ???
Упражнение

Упражнение 3: Race N effects

Средне

Реализуйте функцию, которая запускает N эффектов и возвращает первый успешный результат.

import { Effect, Fiber } from "effect"

const raceN = <A, E>(
  effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
  // Ваша реализация:
  // 1. Fork все эффекты
  // 2. Используйте Fiber.orElse для цепочки
  // 3. Очистите оставшиеся файберы
  ???
Упражнение

Упражнение 4: Композиция с таймаутами

Средне

Создайте функцию, объединяющую результаты нескольких файберов с индивидуальными таймаутами.

import { Effect, Fiber } from "effect"

interface TimedTask<A> {
  readonly task: Effect.Effect<A>
  readonly timeoutMs: number
  readonly name: string
}

interface TaskResult<A> {
  readonly name: string
  readonly result: A | "timeout" | "error"
}

const runTimedTasks = <A>(
  tasks: ReadonlyArray<TimedTask<A>>
): Effect.Effect<ReadonlyArray<TaskResult<A>>> =>
  // Ваша реализация
  ???
Упражнение

Упражнение 5: Circuit Breaker с мониторингом файберов

Сложно

Реализуйте Circuit Breaker, который отслеживает состояние через файберы.

import { Effect, Fiber, Ref, Deferred, Schedule } from "effect"

interface CircuitBreaker {
  readonly execute: <A, E>(effect: Effect.Effect<A, E>) => Effect.Effect<A, E | "CircuitOpen">
  readonly getState: Effect.Effect<"closed" | "open" | "half-open">
  readonly getStats: Effect.Effect<{ failures: number; successes: number }>
}

const createCircuitBreaker = (config: {
  readonly failureThreshold: number
  readonly resetTimeout: number
  readonly halfOpenRequests: number
}): Effect.Effect<CircuitBreaker, never, Scope> =>
  // Ваша реализация
  ???

Заключение

Операции с файберами в Effect предоставляют полный контроль над конкурентными вычислениями:

  • Fiber.join — простой способ получить результат
  • Fiber.await — полная информация через Exit
  • Fiber.poll — неблокирующий мониторинг
  • Fiber.interrupt — безопасная отмена с cleanup
  • Композицияzip, orElse, all для сложных сценариев

В следующей статье мы глубоко погрузимся в механизм прерывания файберов и узнаем, как создавать interruptible и uninterruptible регионы.