Deferred<A, E>
Одноразовые промисы.
Теория
Что такое Deferred?
Deferred<A, E> — это контейнер, который:
- Создаётся пустым — начальное состояние “не завершён”
- Завершается один раз — можно записать значение
Aили ошибкуE - После завершения неизменяем — последующие записи игнорируются
- Позволяет ожидание — файберы могут ждать завершения без busy-waiting
┌────────────────────────────────────────────────────────────┐
│ Deferred<A, E> │
├────────────────────────────────────────────────────────────┤
│ │
│ Жизненный цикл: │
│ │
│ ┌─────────────┐ succeed(a) ┌─────────────────┐ │
│ │ PENDING │ ─────────────────▶ │ DONE(Right(a)) │ │
│ │ (empty) │ │ (completed) │ │
│ └─────────────┘ └─────────────────┘ │
│ │ │
│ │ fail(e) │
│ ▼ │
│ ┌─────────────────┐ │
│ │ DONE(Left(e)) │ │
│ │ (completed) │ │
│ └─────────────────┘ │
│ │
│ ⚠️ После DONE → никакие изменения невозможны │
└────────────────────────────────────────────────────────────┘
Deferred vs Promise
| Аспект | JavaScript Promise | Effect Deferred<A, E> |
|---|---|---|
| Типизация ошибок | ❌ any | ✅ Типизированные E |
| Создание | new Promise(...) + resolver | Deferred.make<A, E>() |
| Ожидание | .then() / await | Deferred.await |
| Прерывание | ❌ Невозможно | ✅ Deferred.interrupt |
| Композиция | ❌ Ограниченная | ✅ Полная (Effect) |
| Ленивость | ❌ Eager | ✅ Lazy |
Семантика ожидания
┌──────────────────────────────────────────────────────────────┐
│ Awaiting a Deferred │
├──────────────────────────────────────────────────────────────┤
│ │
│ Fiber A (completer) Fiber B (waiter) │
│ ───────────────── ─────────────── │
│ │ │ │
│ │ ├─ await(deferred) │
│ │ │ ⏳ (suspended) │
│ │ │ │
│ ├─ succeed(deferred, 42) │ │
│ │ │ │
│ │ ├─ ✅ resumes: 42 │
│ ▼ ▼ │
│ │
│ Множество файберов могут ожидать один Deferred │
│ Все получат одно и то же значение │
│ │
│ Fiber C (waiter) Fiber D (waiter) │
│ ─────────────── ─────────────── │
│ │ │ │
│ ├─ await(deferred) ├─ await(deferred) │
│ │ ⏳ │ ⏳ │
│ │ │ │
│ [after succeed] [after succeed] │
│ ├─ ✅ resumes: 42 ├─ ✅ resumes: 42 │
│ ▼ ▼ │
└──────────────────────────────────────────────────────────────┘
Ключевые свойства
- Семантическая блокировка:
awaitне блокирует поток, только приостанавливает файбер - Однократное завершение: Первый
succeed/failвыигрывает, остальные игнорируются - Broadcast: Все ожидающие файберы получают одно значение
- Прерываемость: Ожидание можно прервать через interrupt
Концепция ФП
IVar и MVar
Deferred концептуально близок к IVar (Immutable Variable) из Haskell/Concurrent ML:
IVar (Immutable Variable):
┌──────────────────────────┐
│ Создаётся пустым │
│ Записывается один раз │
│ Читается много раз │
│ После записи — константа│
└──────────────────────────┘
MVar (Mutable Variable):
┌──────────────────────────┐
│ Может быть пустым/полным│
│ take опустошает │
│ put заполняет │
│ Синхронизация через │
│ состояние │
└──────────────────────────┘
Deferred ≈ IVar + типизированные ошибки + Effect интеграция
Связь с Async/Await
Deferred — это реификация асинхронного результата:
// JavaScript async/await
async function example() {
const result = await someAsyncOperation()
return result
}
// Effect Deferred — явное управление
const example = Effect.gen(function* () {
const deferred = yield* Deferred.make<number, Error>()
// Запускаем операцию, которая заполнит deferred
yield* Effect.fork(
someAsyncOperation.pipe(
Effect.flatMap((result) => Deferred.succeed(deferred, result)),
Effect.catchAll((error) => Deferred.fail(deferred, error))
)
)
// Ждём результат
const result = yield* Deferred.await(deferred)
return result
})
Алгебра Deferred
// Законы Deferred:
// 1. await после succeed возвращает значение
// Deferred.succeed(d, a) *> Deferred.await(d) ≡ Effect.succeed(a)
// 2. await после fail возвращает ошибку
// Deferred.fail(d, e) *> Deferred.await(d) ≡ Effect.fail(e)
// 3. Второй succeed игнорируется
// Deferred.succeed(d, a) *> Deferred.succeed(d, b) *> Deferred.await(d)
// ≡ Effect.succeed(a)
// 4. Идемпотентность await
// Deferred.await(d) *> Deferred.await(d) ≡ Deferred.await(d).map(a => [a, a])
API Reference
Создание
Deferred.make [STABLE]
Создаёт новый незавершённый Deferred.
declare const make: <A, E = never>() => Effect.Effect<Deferred<A, E>>
const program = Effect.gen(function* () {
// Deferred с типом успеха
const deferred1 = yield* Deferred.make<number>()
// ^? Deferred<number, never>
// Deferred с типом успеха и ошибки
const deferred2 = yield* Deferred.make<string, Error>()
// ^? Deferred<string, Error>
// Deferred для сигнализации (Unit)
const signal = yield* Deferred.make<void>()
// ^? Deferred<void, never>
})
Ожидание
Deferred.await [STABLE]
Ожидает завершения Deferred и возвращает результат.
declare const await: <A, E>(
self: Deferred<A, E>
) => Effect.Effect<A, E>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number, string>()
// Запускаем completer в отдельном файбере
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* Deferred.succeed(deferred, 42)
})
)
// Ждём результат (приостановит файбер на 1 секунду)
const value = yield* Deferred.await(deferred)
// ^? number
console.log(value) // 42
})
Завершение успехом
Deferred.succeed [STABLE]
Завершает Deferred успешным значением.
declare const succeed: <A>(
value: A
) => <E>(self: Deferred<A, E>) => Effect.Effect<boolean>
Возвращает true если Deferred был успешно завершён, false если уже был завершён ранее.
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<string>()
// Первая попытка — успех
const first = yield* Deferred.succeed(deferred, "hello")
console.log(first) // true
// Вторая попытка — Deferred уже завершён
const second = yield* Deferred.succeed(deferred, "world")
console.log(second) // false
// Результат — первое значение
const value = yield* Deferred.await(deferred)
console.log(value) // "hello"
})
Завершение ошибкой
Deferred.fail [STABLE]
Завершает Deferred ошибкой.
declare const fail: <E>(
error: E
) => <A>(self: Deferred<A, E>) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number, string>()
// Завершаем ошибкой
yield* Deferred.fail(deferred, "Something went wrong")
// await вернёт ошибку
const result = yield* Deferred.await(deferred).pipe(
Effect.catchAll((error) => Effect.succeed(`Caught: ${error}`))
)
console.log(result) // "Caught: Something went wrong"
})
Deferred.die [STABLE]
Завершает Deferred дефектом (unrecoverable error).
declare const die: (
defect: unknown
) => <A, E>(self: Deferred<A, E>) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number>()
// Завершаем дефектом
yield* Deferred.die(deferred, new Error("Fatal error!"))
// await выбросит дефект (не ловится через catchAll)
const result = yield* Deferred.await(deferred).pipe(
Effect.catchAllDefect((defect) =>
Effect.succeed(`Defect: ${defect}`)
)
)
})
Завершение эффектом
Deferred.complete [STABLE]
Завершает Deferred результатом выполнения эффекта.
declare const complete: <A, E>(
effect: Effect.Effect<A, E>
) => (self: Deferred<A, E>) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number, Error>()
// Завершаем результатом эффекта
yield* Deferred.complete(
deferred,
Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return Math.random() > 0.5
? 42
: yield* Effect.fail(new Error("Random failure"))
})
)
const result = yield* Deferred.await(deferred)
})
Deferred.completeWith [STABLE]
Завершает Deferred эффектом, который выполнится для каждого ожидающего.
declare const completeWith: <A, E>(
effect: Effect.Effect<A, E>
) => (self: Deferred<A, E>) => Effect.Effect<boolean>
⚠️ Важно: Эффект выполнится заново для каждого await!
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number>()
let counter = 0
// Каждый await выполнит этот эффект заново!
yield* Deferred.completeWith(
deferred,
Effect.sync(() => ++counter)
)
// Первый await
const v1 = yield* Deferred.await(deferred)
console.log(v1) // 1
// Второй await — эффект выполнится снова!
const v2 = yield* Deferred.await(deferred)
console.log(v2) // 2
})
Прерывание
Deferred.interrupt [STABLE]
Прерывает Deferred, пробуждая всех ожидающих с InterruptedException.
declare const interrupt: <A, E>(
self: Deferred<A, E>
) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number>()
// Запускаем waiter
const waiter = yield* Effect.fork(Deferred.await(deferred))
// Даём время начать ожидание
yield* Effect.sleep("100 millis")
// Прерываем deferred
yield* Deferred.interrupt(deferred)
// waiter получит прерывание
const exit = yield* Fiber.await(waiter)
console.log(Exit.isInterrupted(exit)) // true
})
Проверка состояния
Deferred.isDone [STABLE]
Проверяет, завершён ли Deferred.
declare const isDone: <A, E>(
self: Deferred<A, E>
) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number>()
const before = yield* Deferred.isDone(deferred)
console.log(before) // false
yield* Deferred.succeed(deferred, 42)
const after = yield* Deferred.isDone(deferred)
console.log(after) // true
})
Deferred.poll [STABLE]
Неблокирующая проверка результата.
declare const poll: <A, E>(
self: Deferred<A, E>
) => Effect.Effect<Option.Option<Effect.Effect<A, E>>>
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<number, string>()
// До завершения — None
const poll1 = yield* Deferred.poll(deferred)
console.log(Option.isNone(poll1)) // true
yield* Deferred.succeed(deferred, 42)
// После завершения — Some(Effect)
const poll2 = yield* Deferred.poll(deferred)
if (Option.isSome(poll2)) {
const value = yield* poll2.value
console.log(value) // 42
}
})
Примеры
Координация двух файберов
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<string>()
// Producer: подготавливает данные
const producer = Effect.gen(function* () {
yield* Effect.log("Producer: starting work")
yield* Effect.sleep("1 second")
yield* Effect.log("Producer: completing deferred")
yield* Deferred.succeed(deferred, "Hello from producer!")
})
// Consumer: ждёт данные
const consumer = Effect.gen(function* () {
yield* Effect.log("Consumer: waiting for data")
const data = yield* Deferred.await(deferred)
yield* Effect.log(`Consumer: received "${data}"`)
return data
})
// Запускаем обоих
const producerFiber = yield* Effect.fork(producer)
const consumerFiber = yield* Effect.fork(consumer)
// Ждём consumer
const result = yield* Fiber.join(consumerFiber)
yield* Fiber.join(producerFiber)
return result
})
Effect.runPromise(program).then(console.log)
/*
Output:
timestamp=... message="Producer: starting work"
timestamp=... message="Consumer: waiting for data"
timestamp=... message="Producer: completing deferred"
timestamp=... message="Consumer: received \"Hello from producer!\""
Hello from producer!
*/
Handshake между файберами
// Паттерн: двустороннее подтверждение
const handshake = Effect.gen(function* () {
// Deferred для каждого направления
const clientReady = yield* Deferred.make<void>()
const serverReady = yield* Deferred.make<void>()
const connectionEstablished = yield* Deferred.make<string>()
// Server
const server = Effect.gen(function* () {
yield* Effect.log("Server: initializing")
yield* Effect.sleep("500 millis")
yield* Effect.log("Server: ready, signaling")
yield* Deferred.succeed(serverReady, undefined)
// Ждём готовности клиента
yield* Deferred.await(clientReady)
yield* Effect.log("Server: client is ready, establishing connection")
yield* Deferred.succeed(connectionEstablished, "conn-123")
})
// Client
const client = Effect.gen(function* () {
yield* Effect.log("Client: waiting for server")
yield* Deferred.await(serverReady)
yield* Effect.log("Client: server is ready, initializing")
yield* Effect.sleep("300 millis")
yield* Effect.log("Client: ready, signaling")
yield* Deferred.succeed(clientReady, undefined)
// Ждём установки соединения
const connId = yield* Deferred.await(connectionEstablished)
yield* Effect.log(`Client: connected with id ${connId}`)
return connId
})
// Запускаем оба
const serverFiber = yield* Effect.fork(server)
const clientFiber = yield* Effect.fork(client)
const connectionId = yield* Fiber.join(clientFiber)
yield* Fiber.join(serverFiber)
return connectionId
})
Effect.runPromise(handshake)
Timeout с Deferred
const withTimeout = <A, E>(
effect: Effect.Effect<A, E>,
timeout: string
): Effect.Effect<Option.Option<A>, E> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, E>()
// Запускаем основной эффект
const mainFiber = yield* Effect.fork(
effect.pipe(
Effect.flatMap((a) => Deferred.succeed(deferred, a)),
Effect.catchAll((e) => Deferred.fail(deferred, e))
)
)
// Запускаем таймер
const timeoutFiber = yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(timeout)
yield* Deferred.interrupt(deferred)
})
)
// Ждём результат
const exit = yield* Effect.exit(Deferred.await(deferred))
// Очищаем
yield* Fiber.interrupt(mainFiber)
yield* Fiber.interrupt(timeoutFiber)
return Exit.match(exit, {
onFailure: (cause) =>
cause._tag === "Interrupt"
? Option.none()
: Option.none(), // или перебросить ошибку
onSuccess: Option.some
})
})
// Использование
const program = Effect.gen(function* () {
// Быстрая операция — успеет
const fast = yield* withTimeout(
Effect.succeed("fast result").pipe(Effect.delay("100 millis")),
"500 millis"
)
console.log("Fast:", fast) // Some("fast result")
// Медленная операция — не успеет
const slow = yield* withTimeout(
Effect.succeed("slow result").pipe(Effect.delay("1 second")),
"200 millis"
)
console.log("Slow:", slow) // None
})
Effect.runPromise(program)
Барьер синхронизации
// Барьер: все участники ждут друг друга
interface Barrier {
readonly await: Effect.Effect<void>
}
const makeBarrier = (count: number): Effect.Effect<Barrier> =>
Effect.gen(function* () {
const arrived = yield* Ref.make(0)
const deferred = yield* Deferred.make<void>()
return {
await: Effect.gen(function* () {
const current = yield* Ref.updateAndGet(arrived, (n) => n + 1)
if (current === count) {
// Последний прибывший открывает барьер
yield* Deferred.succeed(deferred, undefined)
}
// Все ждут открытия
yield* Deferred.await(deferred)
})
}
})
// Использование
const program = Effect.gen(function* () {
const barrier = yield* makeBarrier(3)
const worker = (id: number) =>
Effect.gen(function* () {
yield* Effect.log(`Worker ${id}: starting preparation`)
yield* Effect.sleep(`${(id + 1) * 200} millis`)
yield* Effect.log(`Worker ${id}: ready, waiting at barrier`)
yield* barrier.await
yield* Effect.log(`Worker ${id}: passed barrier, continuing`)
})
// Запускаем 3 worker'а
yield* Effect.all(
[worker(1), worker(2), worker(3)],
{ concurrency: "unbounded" }
)
})
Effect.runPromise(program)
/*
Output:
Worker 1: starting preparation
Worker 2: starting preparation
Worker 3: starting preparation
Worker 1: ready, waiting at barrier
Worker 2: ready, waiting at barrier
Worker 3: ready, waiting at barrier
Worker 3: passed barrier, continuing
Worker 1: passed barrier, continuing
Worker 2: passed barrier, continuing
*/
Promise-like API
// Обёртка для создания Promise-подобного интерфейса
interface PromiseLike<A, E> {
readonly promise: Deferred.Deferred<A, E>
readonly resolve: (value: A) => Effect.Effect<boolean>
readonly reject: (error: E) => Effect.Effect<boolean>
readonly then: Effect.Effect<A, E>
}
const makePromiseLike = <A, E = never>(): Effect.Effect<PromiseLike<A, E>> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, E>()
return {
promise: deferred,
resolve: (value: A) => Deferred.succeed(deferred, value),
reject: (error: E) => Deferred.fail(deferred, error),
then: Deferred.await(deferred)
}
})
// Использование с callback-based API
const wrapCallbackAPI = <A>(
fn: (callback: (error: Error | null, result?: A) => void) => void
): Effect.Effect<A, Error> =>
Effect.gen(function* () {
const promise = yield* makePromiseLike<A, Error>()
// Вызываем callback API
fn((error, result) => {
if (error) {
Effect.runFork(promise.reject(error))
} else {
Effect.runFork(promise.resolve(result as A))
}
})
return yield* promise.then
})
Паттерны использования
One-shot Event
// Паттерн: событие, которое происходит один раз
interface ShutdownSignal {
readonly signal: Effect.Effect<void> // Ждать shutdown
readonly shutdown: Effect.Effect<void> // Инициировать shutdown
readonly isShuttingDown: Effect.Effect<boolean>
}
const makeShutdownSignal = (): Effect.Effect<ShutdownSignal> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<void>()
return {
signal: Deferred.await(deferred),
shutdown: Effect.asVoid(Deferred.succeed(deferred, undefined)),
isShuttingDown: Deferred.isDone(deferred)
}
})
// Использование в сервере
const server = Effect.gen(function* () {
const shutdown = yield* makeShutdownSignal()
// Worker, который завершается по сигналу
const worker = Effect.gen(function* () {
yield* Effect.log("Worker: running")
// Бесконечный цикл с проверкой shutdown
yield* Effect.race(
Effect.forever(
Effect.gen(function* () {
yield* Effect.log("Worker: processing")
yield* Effect.sleep("500 millis")
})
),
shutdown.signal.pipe(Effect.tap(() => Effect.log("Worker: shutdown received")))
)
yield* Effect.log("Worker: stopped")
})
// Запускаем worker
const workerFiber = yield* Effect.fork(worker)
// Через 2 секунды инициируем shutdown
yield* Effect.sleep("2 seconds")
yield* Effect.log("Main: initiating shutdown")
yield* shutdown.shutdown
yield* Fiber.join(workerFiber)
yield* Effect.log("Main: shutdown complete")
})
Future Result
// Паттерн: асинхронный результат, который можно ждать
interface Future<A, E> {
readonly get: Effect.Effect<A, E>
readonly poll: Effect.Effect<A | null>
}
const spawn = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<Future<A, E>, never, R> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, E>()
// Запускаем эффект в фоне
yield* Effect.fork(
Effect.matchCauseEffect(effect, {
onFailure: (cause) => Deferred.failCause(deferred, cause),
onSuccess: (a) => Deferred.succeed(deferred, a)
})
)
return {
get: Deferred.await(deferred),
poll: Effect.gen(function* () {
const result = yield* Deferred.poll(deferred)
if (result._tag === "None") return null
return yield* result.value
})
}
})
// Использование
const program = Effect.gen(function* () {
// Запускаем долгую операцию
const future = yield* spawn(
Effect.gen(function* () {
yield* Effect.sleep("1 second")
return 42
})
)
// Делаем что-то другое
yield* Effect.log("Doing other work...")
// Проверяем готовность
const poll1 = yield* future.poll
yield* Effect.log(`Poll 1: ${poll1}`) // null
yield* Effect.sleep("1.5 seconds")
// Теперь готово
const poll2 = yield* future.poll
yield* Effect.log(`Poll 2: ${poll2}`) // 42
// Или просто ждём
const result = yield* future.get
yield* Effect.log(`Result: ${result}`) // 42
})
Callback to Effect
// Преобразование callback-based API в Effect
const fromCallback = <A, E>(
register: (
resolve: (value: A) => void,
reject: (error: E) => void
) => void
): Effect.Effect<A, E> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, E>()
// Регистрируем callbacks
register(
(value) => Effect.runFork(Deferred.succeed(deferred, value)),
(error) => Effect.runFork(Deferred.fail(deferred, error))
)
return yield* Deferred.await(deferred)
})
// Пример: обёртка для setTimeout
const delay = (ms: number): Effect.Effect<void> =>
fromCallback((resolve) => {
setTimeout(() => resolve(undefined), ms)
})
// Пример: обёртка для EventEmitter
const waitForEvent = <A>(
emitter: { once: (event: string, handler: (data: A) => void) => void },
event: string
): Effect.Effect<A> =>
fromCallback((resolve) => {
emitter.once(event, resolve)
})
Упражнения
Exercise 1: Simple Gate
Создайте “ворота”, которые можно открыть один раз.
import { Effect, Deferred } from "effect"
interface Gate {
readonly open: Effect.Effect<void>
readonly pass: Effect.Effect<void> // ждёт открытия
readonly isOpen: Effect.Effect<boolean>
}
const makeGate = (): Effect.Effect<Gate> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, Deferred } from "effect"
interface Gate {
readonly open: Effect.Effect<void>
readonly pass: Effect.Effect<void>
readonly isOpen: Effect.Effect<boolean>
}
const makeGate = (): Effect.Effect<Gate> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<void>()
return {
open: Effect.asVoid(Deferred.succeed(deferred, undefined)),
pass: Deferred.await(deferred),
isOpen: Deferred.isDone(deferred)
}
})Exercise 2: Once
Создайте обёртку, которая выполняет эффект только один раз, кэшируя результат.
import { Effect, Deferred } from "effect"
interface Once<A, E> {
readonly get: Effect.Effect<A, E>
}
const once = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<Once<A, E>, never, R> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, Deferred, Ref } from "effect"
interface Once<A, E> {
readonly get: Effect.Effect<A, E>
}
const once = <A, E, R>(
effect: Effect.Effect<A, E, R>
): Effect.Effect<Once<A, E>, never, R> =>
Effect.gen(function* () {
const deferred = yield* Deferred.make<A, E>()
const started = yield* Ref.make(false)
return {
get: Effect.gen(function* () {
// Проверяем, запущен ли уже эффект
const wasStarted = yield* Ref.getAndSet(started, true)
if (!wasStarted) {
// Мы первые — выполняем эффект
const exit = yield* Effect.exit(effect)
yield* Deferred.done(deferred, exit)
}
// Ждём результат
return yield* Deferred.await(deferred)
})
}
})Exercise 3: Racing with Deferred
Реализуйте гонку между несколькими эффектами, где первый завершившийся побеждает.
import { Effect, Deferred, Fiber } from "effect"
const raceAll = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: используйте Deferred для результата
// и прерывайте остальные файберы после победы
// ???
})import { Effect, Deferred, Fiber, Exit } from "effect"
const raceAll = <A, E>(
effects: ReadonlyArray<Effect.Effect<A, E>>
): Effect.Effect<A, E> =>
Effect.gen(function* () {
if (effects.length === 0) {
return yield* Effect.die(new Error("raceAll: empty array"))
}
const deferred = yield* Deferred.make<A, E>()
// Запускаем все эффекты
const fibers = yield* Effect.forEach(
effects,
(effect) =>
Effect.fork(
effect.pipe(
Effect.matchCauseEffect({
onFailure: (cause) => Deferred.failCause(deferred, cause),
onSuccess: (a) => Deferred.succeed(deferred, a)
})
)
)
)
// Ждём результат
const result = yield* Deferred.await(deferred)
// Прерываем всех (победитель уже завершился)
yield* Effect.forEach(
fibers,
(fiber) => Fiber.interrupt(fiber),
{ discard: true }
)
return result
})Exercise 4: Async Semaphore
Создайте семафор с использованием Deferred для блокировки.
import { Effect, Deferred, Ref, Queue } from "effect"
interface AsyncSemaphore {
readonly acquire: Effect.Effect<void>
readonly release: Effect.Effect<void>
readonly withPermit: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
}
const makeAsyncSemaphore = (permits: number): Effect.Effect<AsyncSemaphore> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: используйте Queue<Deferred> для ожидающих
// ???
})import { Effect, Deferred, Ref, Queue } from "effect"
interface AsyncSemaphore {
readonly acquire: Effect.Effect<void>
readonly release: Effect.Effect<void>
readonly withPermit: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
}
const makeAsyncSemaphore = (permits: number): Effect.Effect<AsyncSemaphore> =>
Effect.gen(function* () {
const available = yield* Ref.make(permits)
const waiters = yield* Queue.unbounded<Deferred.Deferred<void, never>>()
const acquire: Effect.Effect<void> = Effect.gen(function* () {
// Пробуем взять permit
const got = yield* Ref.modify(available, (n) =>
n > 0 ? [true, n - 1] as const : [false, n] as const
)
if (got) return
// Нет permits — ждём
const waiter = yield* Deferred.make<void>()
yield* Queue.offer(waiters, waiter)
yield* Deferred.await(waiter)
})
const release: Effect.Effect<void> = Effect.gen(function* () {
// Проверяем, есть ли ожидающие
const maybeWaiter = yield* Queue.poll(waiters)
if (maybeWaiter._tag === "Some") {
// Передаём permit ожидающему
yield* Deferred.succeed(maybeWaiter.value, undefined)
} else {
// Возвращаем permit в пул
yield* Ref.update(available, (n) => n + 1)
}
})
return {
acquire,
release,
withPermit: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
Effect.acquireUseRelease(
acquire,
() => effect,
() => release
)
}
})Exercise 5: Async/Await Channel
Реализуйте однонаправленный канал для передачи значений между файберами.
import { Effect, Deferred, Ref, Option } from "effect"
interface AsyncChannel<A> {
readonly send: (value: A) => Effect.Effect<void>
readonly receive: Effect.Effect<A>
readonly close: Effect.Effect<void>
}
const makeChannel = <A>(): Effect.Effect<AsyncChannel<A>> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: используйте пару Deferred — один для отправителя, один для получателя
// ???
})import { Effect, Deferred, Ref, Option, Queue } from "effect"
interface AsyncChannel<A> {
readonly send: (value: A) => Effect.Effect<void>
readonly receive: Effect.Effect<A>
readonly close: Effect.Effect<void>
}
interface ChannelState<A> {
readonly buffer: Queue.Queue<A>
readonly receivers: Queue.Queue<Deferred.Deferred<A, ChannelClosed>>
readonly closed: Ref.Ref<boolean>
}
class ChannelClosed {
readonly _tag = "ChannelClosed"
}
const makeChannel = <A>(): Effect.Effect<AsyncChannel<A>> =>
Effect.gen(function* () {
const buffer = yield* Queue.unbounded<A>()
const receivers = yield* Queue.unbounded<Deferred.Deferred<A, ChannelClosed>>()
const closed = yield* Ref.make(false)
const send = (value: A): Effect.Effect<void> =>
Effect.gen(function* () {
// Проверяем, закрыт ли канал
const isClosed = yield* Ref.get(closed)
if (isClosed) {
return yield* Effect.fail(new ChannelClosed())
}
// Проверяем, есть ли ожидающий получатель
const maybeReceiver = yield* Queue.poll(receivers)
if (maybeReceiver._tag === "Some") {
// Передаём значение напрямую
yield* Deferred.succeed(maybeReceiver.value, value)
} else {
// Кладём в буфер
yield* Queue.offer(buffer, value)
}
}).pipe(Effect.catchAll(() => Effect.void))
const receive: Effect.Effect<A> = Effect.gen(function* () {
// Пробуем взять из буфера
const maybeValue = yield* Queue.poll(buffer)
if (maybeValue._tag === "Some") {
return maybeValue.value
}
// Проверяем, закрыт ли канал
const isClosed = yield* Ref.get(closed)
if (isClosed) {
return yield* Effect.fail(new ChannelClosed())
}
// Ждём значение
const receiver = yield* Deferred.make<A, ChannelClosed>()
yield* Queue.offer(receivers, receiver)
return yield* Deferred.await(receiver)
}).pipe(
Effect.catchAll(() => Effect.fail(new ChannelClosed()))
) as Effect.Effect<A>
const close: Effect.Effect<void> = Effect.gen(function* () {
yield* Ref.set(closed, true)
// Уведомляем всех ожидающих получателей
yield* Effect.gen(function* () {
while (true) {
const maybeReceiver = yield* Queue.poll(receivers)
if (maybeReceiver._tag === "None") break
yield* Deferred.fail(maybeReceiver.value, new ChannelClosed())
}
})
})
return { send, receive, close }
})Резюме
Deferred<A, E> — фундаментальный примитив синхронизации для однократной передачи значений:
| Аспект | Описание |
|---|---|
| Назначение | Однократная передача значения/ошибки |
| Создание | Deferred.make<A, E>() |
| Завершение | succeed, fail, die, complete |
| Ожидание | await (приостанавливает файбер) |
| Семантика | Broadcast — все ожидающие получают одно значение |
Основные use cases
- ✅ Координация между файберами
- ✅ Handshake/acknowledgment
- ✅ One-shot events (shutdown, ready signals)
- ✅ Обёртка callback API
- ✅ Барьеры синхронизации
Сравнение с Promise
| Promise | Deferred |
|---|---|
| Eager execution | Lazy execution |
| Untyped errors | Typed errors |
| No interruption | Interruptible |
| Limited composition | Full Effect composition |