Queue
Конкурентные очереди.
Теория
Что такое Queue?
Queue<A> — это конкурентная структура данных для передачи значений типа A между файберами:
┌────────────────────────────────────────────────────────────┐
│ Queue<A> │
├────────────────────────────────────────────────────────────┤
│ │
│ Producers (writers) Consumers (readers) │
│ ───────────────── ──────────────── │
│ │
│ Fiber 1 ──┐ ┌── Fiber A │
│ │ ┌───────────┐ │ │
│ Fiber 2 ──┼────▶│ A A A A A │──────┼── Fiber B │
│ │ └───────────┘ │ │
│ Fiber 3 ──┘ Queue └── Fiber C │
│ │
│ offer(a) take │
│ │
│ ⚡ FIFO (First-In-First-Out) порядок │
│ ⚡ Thread-safe (fiber-safe) │
│ ⚡ Back-pressure при переполнении │
└────────────────────────────────────────────────────────────┘
Семантика операций
| Операция | Описание | Поведение при граничных условиях |
|---|---|---|
offer | Добавляет элемент | Зависит от типа очереди |
take | Извлекает элемент | Блокирует если пусто |
poll | Извлекает без блокировки | Возвращает None если пусто |
size | Текущий размер | Моментальный снимок |
shutdown | Закрывает очередь | Все ожидающие получат ошибку |
Back-pressure модели
┌──────────────────────────────────────────────────────────────┐
│ Back-pressure стратегии │
├──────────────────────────────────────────────────────────────┤
│ │
│ 1. BOUNDED (back-pressure) │
│ ┌─────────────────────────────────────┐ │
│ │ A A A A A │ ← FULL │ │
│ └─────────────────────────────────────┘ │
│ offer() → ⏳ BLOCKS until space available │
│ │
│ 2. DROPPING │
│ ┌─────────────────────────────────────┐ │
│ │ A A A A A │ ← FULL │ │
│ └─────────────────────────────────────┘ │
│ offer(X) → 🗑️ DROPS X, returns false │
│ │
│ 3. SLIDING │
│ ┌─────────────────────────────────────┐ │
│ │ A A A A A │ ← FULL │ │
│ └─────────────────────────────────────┘ │
│ offer(X) → 🗑️ DROPS oldest (A), adds X │
│ │
│ 4. UNBOUNDED │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ A A A A A ... (grows indefinitely) │ │
│ └─────────────────────────────────────────────────────┘ │
│ offer(X) → ✅ ALWAYS succeeds │
│ │
└──────────────────────────────────────────────────────────────┘
Queue vs Channel vs Stream
| Примитив | Назначение | Семантика |
|---|---|---|
| Queue | Point-to-point messaging | Один consumer забирает элемент |
| PubSub | Broadcast messaging | Все subscribers получают копию |
| Stream | Data transformation | Pull-based ленивые вычисления |
| Channel | Bidirectional | Input → Transform → Output |
Типы очередей
Bounded Queue
Очередь с фиксированной ёмкостью и back-pressure.
const program = Effect.gen(function* () {
// Очередь вместимостью 10 элементов
const queue = yield* Queue.bounded<number>(10)
// offer заблокируется если очередь полна
yield* Queue.offer(queue, 1)
yield* Queue.offer(queue, 2)
// Демонстрация back-pressure
yield* Effect.fork(
Effect.gen(function* () {
// Заполняем очередь
for (let i = 0; i < 15; i++) {
yield* Queue.offer(queue, i)
yield* Effect.log(`Offered ${i}`)
}
})
)
yield* Effect.sleep("100 millis")
// Освобождаем место
for (let i = 0; i < 15; i++) {
const value = yield* Queue.take(queue)
yield* Effect.log(`Took ${value}`)
}
})
Когда использовать:
- ✅ Нужен контроль потребления памяти
- ✅ Producer должен замедлиться при перегрузке consumer
- ✅ Защита от OOM при бесконечных потоках данных
Dropping Queue
Отбрасывает новые элементы при переполнении.
const program = Effect.gen(function* () {
// Очередь на 3 элемента, dropping strategy
const queue = yield* Queue.dropping<number>(3)
// Добавляем 5 элементов
for (let i = 1; i <= 5; i++) {
const accepted = yield* Queue.offer(queue, i)
console.log(`offer(${i}): ${accepted}`)
}
// Output:
// offer(1): true
// offer(2): true
// offer(3): true
// offer(4): false ← отброшен
// offer(5): false ← отброшен
// В очереди только первые 3
console.log(yield* Queue.takeAll(queue))
// { _id: 'Chunk', values: [ 1, 2, 3 ] }
})
Когда использовать:
- ✅ Допустима потеря данных (метрики, логи)
- ✅ Producer не должен блокироваться
- ✅ Важна последняя информация (например, статусы)
Sliding Queue
Отбрасывает старые элементы при переполнении.
const program = Effect.gen(function* () {
// Очередь на 3 элемента, sliding strategy
const queue = yield* Queue.sliding<number>(3)
// Добавляем 5 элементов
for (let i = 1; i <= 5; i++) {
yield* Queue.offer(queue, i)
}
// В очереди последние 3
console.log(yield* Queue.takeAll(queue))
// { _id: 'Chunk', values: [ 3, 4, 5 ] }
})
Когда использовать:
- ✅ Важны только последние N значений
- ✅ Буфер для “скользящего окна”
- ✅ Real-time системы (последние показания датчиков)
Unbounded Queue
Неограниченная очередь (используйте осторожно!).
const program = Effect.gen(function* () {
// ⚠️ Может вызвать OOM при быстром producer
const queue = yield* Queue.unbounded<number>()
// offer всегда успешен
for (let i = 0; i < 1000000; i++) {
yield* Queue.offer(queue, i)
}
console.log(`Size: ${yield* Queue.size(queue)}`)
// Size: 1000000
})
Когда использовать:
- ✅ Consumer гарантированно быстрее producer
- ✅ Кратковременные буферы
- ⚠️ С осторожностью в production!
API Reference
Создание очередей
// Bounded (back-pressure)
declare const bounded: <A>(capacity: number) => Effect.Effect<Queue<A>>
// Dropping (отбрасывает новые)
declare const dropping: <A>(capacity: number) => Effect.Effect<Queue<A>>
// Sliding (отбрасывает старые)
declare const sliding: <A>(capacity: number) => Effect.Effect<Queue<A>>
// Unbounded (без ограничений)
declare const unbounded: <A>() => Effect.Effect<Queue<A>>
Добавление элементов
Queue.offer [STABLE]
Добавляет один элемент в очередь.
declare const offer: <A>(
value: A
) => (self: Queue<A>) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<string>(10)
// Добавляем элемент
const success = yield* Queue.offer(queue, "hello")
console.log(success) // true
})
Queue.offerAll [STABLE]
Добавляет множество элементов.
declare const offerAll: <A>(
values: Iterable<A>
) => (self: Queue<A>) => Effect.Effect<boolean>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
console.log(yield* Queue.size(queue)) // 5
})
Извлечение элементов
Queue.take [STABLE]
Извлекает и удаляет элемент (блокирует если пусто).
declare const take: <A>(self: Queue<A>) => Effect.Effect<A>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
// Запускаем consumer, который будет ждать
const consumer = yield* Effect.fork(
Effect.gen(function* () {
const value = yield* Queue.take(queue)
console.log(`Received: ${value}`)
})
)
yield* Effect.sleep("100 millis")
// Добавляем элемент — consumer пробудится
yield* Queue.offer(queue, 42)
yield* Fiber.join(consumer)
// Output: Received: 42
})
Queue.takeUpTo [STABLE]
Извлекает до N элементов (не блокирует).
declare const takeUpTo: (
max: number
) => <A>(self: Queue<A>) => Effect.Effect<Chunk<A>>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
// Берём максимум 3
const chunk1 = yield* Queue.takeUpTo(queue, 3)
console.log(chunk1) // Chunk(1, 2, 3)
// Берём остальные (даже если меньше запрошенного)
const chunk2 = yield* Queue.takeUpTo(queue, 10)
console.log(chunk2) // Chunk(4, 5)
})
Queue.takeAll [STABLE]
Извлекает все доступные элементы.
declare const takeAll: <A>(self: Queue<A>) => Effect.Effect<Chunk<A>>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
yield* Queue.offerAll(queue, [1, 2, 3])
const all = yield* Queue.takeAll(queue)
console.log(all) // Chunk(1, 2, 3)
// Очередь теперь пуста
console.log(yield* Queue.size(queue)) // 0
})
Queue.poll [STABLE]
Извлекает элемент без блокировки.
declare const poll: <A>(
self: Queue<A>
) => Effect.Effect<Option<A>>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
// Очередь пуста
const empty = yield* Queue.poll(queue)
console.log(Option.isNone(empty)) // true
yield* Queue.offer(queue, 42)
// Теперь есть элемент
const value = yield* Queue.poll(queue)
console.log(value) // Some(42)
})
Информация об очереди
Queue.size [STABLE]
Возвращает текущий размер очереди.
declare const size: <A>(self: Queue<A>) => Effect.Effect<number>
Queue.isEmpty / Queue.isFull [STABLE]
Проверяют состояние очереди.
declare const isEmpty: <A>(self: Queue<A>) => Effect.Effect<boolean>
declare const isFull: <A>(self: Queue<A>) => Effect.Effect<boolean>
Queue.capacity [STABLE]
Возвращает ёмкость очереди (число, не Effect).
declare const capacity: <A>(self: Queue<A>) => number
Управление жизненным циклом
Queue.shutdown [STABLE]
Закрывает очередь. Все ожидающие операции прерываются.
declare const shutdown: <A>(self: Queue<A>) => Effect.Effect<void>
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
// Consumer, который ждёт элемент
const consumer = yield* Effect.fork(Queue.take(queue))
yield* Effect.sleep("100 millis")
// Закрываем очередь
yield* Queue.shutdown(queue)
// Consumer получит прерывание
const exit = yield* Fiber.await(consumer)
console.log(Exit.isInterrupted(exit)) // true
})
Queue.awaitShutdown [STABLE]
Ожидает закрытия очереди.
declare const awaitShutdown: <A>(self: Queue<A>) => Effect.Effect<void>
Queue.isShutdown [STABLE]
Проверяет, закрыта ли очередь.
declare const isShutdown: <A>(self: Queue<A>) => Effect.Effect<boolean>
Ограниченный доступ
Enqueue и Dequeue
Queue можно “сузить” до только-запись или только-чтение:
// Только для записи (producer API)
const sendOnly = (queue: Queue.Enqueue<number>, value: number) =>
Queue.offer(queue, value)
// Только для чтения (consumer API)
const receiveOnly = (queue: Queue.Dequeue<number>) =>
Queue.take(queue)
// Queue реализует оба интерфейса
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(10)
// Можно передать как Enqueue или Dequeue
yield* sendOnly(queue, 42)
const value = yield* receiveOnly(queue)
})
Примеры
Producer-Consumer
const producerConsumer = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(5)
// Producer: генерирует числа
const producer = Effect.gen(function* () {
for (let i = 1; i <= 20; i++) {
yield* Queue.offer(queue, i)
yield* Effect.log(`Produced: ${i}`)
yield* Effect.sleep("50 millis")
}
yield* Queue.shutdown(queue)
yield* Effect.log("Producer: done")
})
// Consumer: обрабатывает числа
const consumer = Effect.gen(function* () {
let total = 0
while (!(yield* Queue.isShutdown(queue))) {
const items = yield* Queue.takeUpTo(queue, 3).pipe(
Effect.timeout("200 millis"),
Effect.option
)
if (items._tag === "Some" && items.value.length > 0) {
const batch = items.value
total += Chunk.reduce(batch, 0, (a, b) => a + b)
yield* Effect.log(`Consumed batch: ${JSON.stringify([...batch])}, total: ${total}`)
}
}
// Забираем остатки
const remaining = yield* Queue.takeAll(queue)
if (remaining.length > 0) {
total += Chunk.reduce(remaining, 0, (a, b) => a + b)
yield* Effect.log(`Final batch: ${JSON.stringify([...remaining])}, total: ${total}`)
}
return total
})
// Запускаем параллельно
const producerFiber = yield* Effect.fork(producer)
const result = yield* consumer
yield* Fiber.join(producerFiber)
yield* Effect.log(`Final total: ${result}`)
})
Effect.runPromise(producerConsumer)
Work Queue (Job Processing)
interface Job {
readonly id: string
readonly payload: string
}
const workQueue = Effect.gen(function* () {
const jobs = yield* Queue.bounded<Job>(100)
const results = yield* Queue.unbounded<{ id: string; result: string }>()
// Worker: обрабатывает задачи
const worker = (workerId: number) =>
Effect.gen(function* () {
while (true) {
const job = yield* Queue.take(jobs)
yield* Effect.log(`Worker ${workerId}: processing ${job.id}`)
// Симуляция работы
yield* Effect.sleep(`${Math.random() * 100} millis`)
yield* Queue.offer(results, {
id: job.id,
result: `Processed by worker ${workerId}: ${job.payload.toUpperCase()}`
})
}
}).pipe(
Effect.catchAll(() => Effect.log(`Worker ${workerId}: queue closed`))
)
// Запускаем 3 worker'а
const workers = yield* Effect.all(
Array.range(1, 3).map((id) => Effect.fork(worker(id)))
)
// Добавляем задачи
for (let i = 1; i <= 10; i++) {
yield* Queue.offer(jobs, { id: `job-${i}`, payload: `task ${i}` })
}
// Ждём обработки
yield* Effect.sleep("500 millis")
// Собираем результаты
const allResults = yield* Queue.takeAll(results)
// Завершаем workers
yield* Queue.shutdown(jobs)
yield* Effect.forEach(workers, Fiber.interrupt, { discard: true })
return [...allResults]
})
Effect.runPromise(workQueue).then(console.log)
Rate-Limited Queue
interface RateLimitedQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean>
readonly take: Effect.Effect<A>
}
const makeRateLimitedQueue = <A>(
maxPerSecond: number
): Effect.Effect<RateLimitedQueue<A>> =>
Effect.gen(function* () {
const queue = yield* Queue.unbounded<A>()
const tokens = yield* Queue.bounded<void>(maxPerSecond)
// Token replenisher
yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
yield* Effect.sleep("1 second")
// Пополняем токены
const current = yield* Queue.size(tokens)
const toAdd = maxPerSecond - current
for (let i = 0; i < toAdd; i++) {
yield* Queue.offer(tokens, undefined)
}
})
)
)
// Инициализируем токены
for (let i = 0; i < maxPerSecond; i++) {
yield* Queue.offer(tokens, undefined)
}
return {
offer: (value: A) => Queue.offer(queue, value),
take: Effect.gen(function* () {
// Ждём токен
yield* Queue.take(tokens)
// Берём элемент
return yield* Queue.take(queue)
})
}
})
// Использование
const program = Effect.gen(function* () {
const queue = yield* makeRateLimitedQueue<number>(3) // 3 в секунду
// Producer: добавляет много элементов сразу
yield* Effect.fork(
Effect.forEach(
Array.range(1, 10),
(n) => Effect.gen(function* () {
yield* queue.offer(n)
yield* Effect.log(`Offered: ${n}`)
}),
{ discard: true }
)
)
// Consumer: получает с rate limit
yield* Effect.forEach(
Array.range(1, 10),
() => Effect.gen(function* () {
const value = yield* queue.take
yield* Effect.log(`Took: ${value} at ${Date.now()}`)
}),
{ discard: true }
)
})
Priority Queue
interface PriorityItem<A> {
readonly priority: number
readonly value: A
}
interface PriorityQueue<A> {
readonly offer: (priority: number, value: A) => Effect.Effect<void>
readonly take: Effect.Effect<A>
readonly size: Effect.Effect<number>
}
const makePriorityQueue = <A>(): Effect.Effect<PriorityQueue<A>> =>
Effect.gen(function* () {
// Используем несколько очередей для разных приоритетов
const high = yield* Queue.unbounded<A>() // priority >= 7
const medium = yield* Queue.unbounded<A>() // priority >= 4
const low = yield* Queue.unbounded<A>() // priority < 4
return {
offer: (priority: number, value: A) => {
if (priority >= 7) return Effect.asVoid(Queue.offer(high, value))
if (priority >= 4) return Effect.asVoid(Queue.offer(medium, value))
return Effect.asVoid(Queue.offer(low, value))
},
take: Effect.gen(function* () {
// Сначала пробуем высокий приоритет
const fromHigh = yield* Queue.poll(high)
if (Option.isSome(fromHigh)) return fromHigh.value
// Затем средний
const fromMedium = yield* Queue.poll(medium)
if (Option.isSome(fromMedium)) return fromMedium.value
// Наконец низкий (или ждём)
const fromLow = yield* Queue.poll(low)
if (Option.isSome(fromLow)) return fromLow.value
// Все пусты — ждём любую
return yield* Effect.race(
Queue.take(high),
Effect.race(Queue.take(medium), Queue.take(low))
)
}),
size: Effect.gen(function* () {
const h = yield* Queue.size(high)
const m = yield* Queue.size(medium)
const l = yield* Queue.size(low)
return h + m + l
})
}
})
Паттерны использования
Fan-Out / Fan-In
// Fan-out: один источник, много обработчиков
const fanOut = <A, B>(
source: Queue.Dequeue<A>,
workers: number,
process: (a: A) => Effect.Effect<B>
): Effect.Effect<Queue.Dequeue<B>> =>
Effect.gen(function* () {
const results = yield* Queue.unbounded<B>()
// Запускаем workers
const fibers = yield* Effect.all(
Array.makeBy(workers, () =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const item = yield* Queue.take(source)
const result = yield* process(item)
yield* Queue.offer(results, result)
})
)
)
)
)
return results as Queue.Dequeue<B>
})
// Fan-in: много источников, один обработчик
const fanIn = <A>(
sources: ReadonlyArray<Queue.Dequeue<A>>
): Effect.Effect<Queue.Dequeue<A>> =>
Effect.gen(function* () {
const merged = yield* Queue.unbounded<A>()
// Для каждого источника запускаем forwarder
yield* Effect.forEach(
sources,
(source) =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const item = yield* Queue.take(source)
yield* Queue.offer(merged, item)
})
)
),
{ discard: true }
)
return merged as Queue.Dequeue<A>
})
Batching
interface BatchConfig {
readonly maxSize: number
readonly maxWait: string
}
const batchQueue = <A>(
source: Queue.Dequeue<A>,
config: BatchConfig
): Effect.Effect<Queue.Dequeue<Chunk.Chunk<A>>> =>
Effect.gen(function* () {
const batches = yield* Queue.unbounded<Chunk.Chunk<A>>()
yield* Effect.fork(
Effect.forever(
Effect.gen(function* () {
const batch: A[] = []
const deadline = Date.now() + parseInt(config.maxWait)
while (batch.length < config.maxSize && Date.now() < deadline) {
const remaining = deadline - Date.now()
if (remaining <= 0) break
const item = yield* Queue.take(source).pipe(
Effect.timeout(`${remaining} millis`),
Effect.option
)
if (Option.isSome(item)) {
batch.push(item.value)
}
}
if (batch.length > 0) {
yield* Queue.offer(batches, Chunk.fromIterable(batch))
}
})
)
)
return batches as Queue.Dequeue<Chunk.Chunk<A>>
})
Circuit Breaker Queue
type CircuitState = "closed" | "open" | "half-open"
interface CircuitBreakerQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean>
readonly take: Effect.Effect<A>
readonly getState: Effect.Effect<CircuitState>
}
const makeCircuitBreakerQueue = <A>(
config: {
readonly capacity: number
readonly failureThreshold: number
readonly resetTimeout: string
}
): Effect.Effect<CircuitBreakerQueue<A>> =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<A>(config.capacity)
const state = yield* Ref.make<CircuitState>("closed")
const failures = yield* Ref.make(0)
return {
offer: (value: A) =>
Effect.gen(function* () {
const currentState = yield* Ref.get(state)
if (currentState === "open") {
return false // Circuit is open, reject
}
const result = yield* Queue.offer(queue, value).pipe(
Effect.timeout("100 millis"),
Effect.option
)
if (result._tag === "None") {
// Timeout — считаем как failure
const count = yield* Ref.updateAndGet(failures, (n) => n + 1)
if (count >= config.failureThreshold) {
yield* Ref.set(state, "open")
// Планируем переход в half-open
yield* Effect.fork(
Effect.gen(function* () {
yield* Effect.sleep(config.resetTimeout)
yield* Ref.set(state, "half-open")
})
)
}
return false
}
// Успех — сбрасываем счётчик
yield* Ref.set(failures, 0)
if ((yield* Ref.get(state)) === "half-open") {
yield* Ref.set(state, "closed")
}
return true
}),
take: Queue.take(queue),
getState: Ref.get(state)
}
})
Упражнения
Simple Buffer
Создайте буфер, который накапливает элементы и отдаёт их пачками.
import { Effect, Queue, Chunk } from "effect"
interface Buffer<A> {
readonly add: (value: A) => Effect.Effect<void>
readonly flush: Effect.Effect<Chunk.Chunk<A>>
}
const makeBuffer = <A>(): Effect.Effect<Buffer<A>> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, Queue, Chunk } from "effect"
interface Buffer<A> {
readonly add: (value: A) => Effect.Effect<void>
readonly flush: Effect.Effect<Chunk.Chunk<A>>
}
const makeBuffer = <A>(): Effect.Effect<Buffer<A>> =>
Effect.gen(function* () {
const queue = yield* Queue.unbounded<A>()
return {
add: (value: A) => Effect.asVoid(Queue.offer(queue, value)),
flush: Queue.takeAll(queue)
}
})Timeout Queue
Создайте очередь, где take автоматически возвращает default значение после timeout.
import { Effect, Queue, Option } from "effect"
interface TimeoutQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean>
readonly take: (timeout: string, defaultValue: A) => Effect.Effect<A>
}
const makeTimeoutQueue = <A>(capacity: number): Effect.Effect<TimeoutQueue<A>> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, Queue, Option } from "effect"
interface TimeoutQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean>
readonly take: (timeout: string, defaultValue: A) => Effect.Effect<A>
}
const makeTimeoutQueue = <A>(capacity: number): Effect.Effect<TimeoutQueue<A>> =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<A>(capacity)
return {
offer: (value: A) => Queue.offer(queue, value),
take: (timeout: string, defaultValue: A) =>
Queue.take(queue).pipe(
Effect.timeout(timeout),
Effect.map(Option.getOrElse(() => defaultValue))
)
}
})Deduplicating Queue
Создайте очередь, которая отбрасывает дубликаты.
import { Effect, Queue, HashSet } from "effect"
interface DedupeQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean> // false если дубликат
readonly take: Effect.Effect<A>
}
const makeDedupeQueue = <A>(
capacity: number,
hash: (a: A) => string
): Effect.Effect<DedupeQueue<A>> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: используйте Ref<HashSet> для отслеживания seen values
// ???
})import { Effect, Queue, Ref, HashSet } from "effect"
interface DedupeQueue<A> {
readonly offer: (value: A) => Effect.Effect<boolean>
readonly take: Effect.Effect<A>
}
const makeDedupeQueue = <A>(
capacity: number,
hash: (a: A) => string
): Effect.Effect<DedupeQueue<A>> =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<A>(capacity)
const seen = yield* Ref.make<HashSet.HashSet<string>>(HashSet.empty())
return {
offer: (value: A) =>
Effect.gen(function* () {
const key = hash(value)
const isDuplicate = yield* Ref.modify(seen, (set) => {
if (HashSet.has(set, key)) {
return [true, set] as const
}
return [false, HashSet.add(set, key)] as const
})
if (isDuplicate) return false
return yield* Queue.offer(queue, value)
}),
take: Effect.gen(function* () {
const value = yield* Queue.take(queue)
// Удаляем из seen чтобы можно было добавить снова
yield* Ref.update(seen, HashSet.remove(hash(value)))
return value
})
}
})Multi-Channel Queue
Создайте очередь с несколькими каналами, где consumer может подписаться на определённые каналы.
import { Effect, Queue, HashMap, Chunk } from "effect"
interface MultiChannelQueue<A> {
readonly publish: (channel: string, value: A) => Effect.Effect<void>
readonly subscribe: (
channels: ReadonlyArray<string>
) => Effect.Effect<Queue.Dequeue<{ channel: string; value: A }>>
}
const makeMultiChannelQueue = <A>(): Effect.Effect<MultiChannelQueue<A>> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, Queue, Ref, HashMap, Array } from "effect"
interface Message<A> {
readonly channel: string
readonly value: A
}
interface MultiChannelQueue<A> {
readonly publish: (channel: string, value: A) => Effect.Effect<void>
readonly subscribe: (
channels: ReadonlyArray<string>
) => Effect.Effect<Queue.Dequeue<Message<A>>>
}
const makeMultiChannelQueue = <A>(): Effect.Effect<MultiChannelQueue<A>> =>
Effect.gen(function* () {
// Map от channel к списку subscriber queues
const subscribers = yield* Ref.make<
HashMap.HashMap<string, ReadonlyArray<Queue.Queue<Message<A>>>>
>(HashMap.empty())
return {
publish: (channel: string, value: A) =>
Effect.gen(function* () {
const subs = yield* Ref.get(subscribers)
const channelSubs = HashMap.get(subs, channel)
if (channelSubs._tag === "Some") {
yield* Effect.forEach(
channelSubs.value,
(q) => Queue.offer(q, { channel, value }),
{ discard: true }
)
}
}),
subscribe: (channels: ReadonlyArray<string>) =>
Effect.gen(function* () {
const queue = yield* Queue.unbounded<Message<A>>()
// Регистрируем подписчика на каждый канал
yield* Ref.update(subscribers, (subs) => {
let result = subs
for (const channel of channels) {
const existing = HashMap.get(result, channel)
const list = existing._tag === "Some" ? existing.value : []
result = HashMap.set(result, channel, [...list, queue])
}
return result
})
return queue as Queue.Dequeue<Message<A>>
})
}
})Резюме
Queue<A> — основной примитив для передачи данных между файберами:
| Тип очереди | Поведение при переполнении | Use case |
|---|---|---|
| bounded | Блокирует producer | Flow control, back-pressure |
| dropping | Отбрасывает новые | Metrics, lossy channels |
| sliding | Отбрасывает старые | Latest values buffer |
| unbounded | Без ограничений | Кратковременные буферы |
Основные операции
| Операция | Блокирует? | Описание |
|---|---|---|
offer | Зависит от типа | Добавляет элемент |
take | Да, если пусто | Извлекает элемент |
poll | Нет | Извлекает если есть |
takeAll | Нет | Извлекает все |
shutdown | Нет | Закрывает очередь |
Best Practices
- ✅ Используйте bounded для production workloads
- ✅ Обрабатывайте shutdown сигналы
- ✅ Используйте Enqueue/Dequeue для разделения ответственности
- ⚠️ Осторожно с unbounded — риск OOM
- ⚠️ Помните о back-pressure при выборе стратегии