Effect Курс Queue

Queue

Конкурентные очереди.

Теория

Что такое Queue?

Queue<A> — это конкурентная структура данных для передачи значений типа A между файберами:

┌────────────────────────────────────────────────────────────┐
│                        Queue<A>                            │
├────────────────────────────────────────────────────────────┤
│                                                            │
│   Producers (writers)            Consumers (readers)       │
│   ─────────────────              ────────────────          │
│                                                            │
│   Fiber 1 ──┐                        ┌── Fiber A           │
│             │     ┌───────────┐      │                     │
│   Fiber 2 ──┼────▶│ A A A A A │──────┼── Fiber B           │
│             │     └───────────┘      │                     │
│   Fiber 3 ──┘        Queue           └── Fiber C           │
│                                                            │
│   offer(a)                           take                  │
│                                                            │
│  ⚡ FIFO (First-In-First-Out) порядок                     │
│  ⚡ Thread-safe (fiber-safe)                               │
│  ⚡ Back-pressure при переполнении                        │
└────────────────────────────────────────────────────────────┘

Семантика операций

ОперацияОписаниеПоведение при граничных условиях
offerДобавляет элементЗависит от типа очереди
takeИзвлекает элементБлокирует если пусто
pollИзвлекает без блокировкиВозвращает None если пусто
sizeТекущий размерМоментальный снимок
shutdownЗакрывает очередьВсе ожидающие получат ошибку

Back-pressure модели

┌──────────────────────────────────────────────────────────────┐
│                    Back-pressure стратегии                   │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│  1. BOUNDED (back-pressure)                                  │
│     ┌─────────────────────────────────────┐                 │
│     │ A A A A A │ ← FULL                   │                 │
│     └─────────────────────────────────────┘                 │
│     offer() → ⏳ BLOCKS until space available                │
│                                                              │
│  2. DROPPING                                                 │
│     ┌─────────────────────────────────────┐                 │
│     │ A A A A A │ ← FULL                   │                 │
│     └─────────────────────────────────────┘                 │
│     offer(X) → 🗑️ DROPS X, returns false                    │
│                                                              │
│  3. SLIDING                                                  │
│     ┌─────────────────────────────────────┐                 │
│     │ A A A A A │ ← FULL                   │                 │
│     └─────────────────────────────────────┘                 │
│     offer(X) → 🗑️ DROPS oldest (A), adds X                  │
│                                                              │
│  4. UNBOUNDED                                                │
│     ┌─────────────────────────────────────────────────────┐ │
│     │ A A A A A ... (grows indefinitely)                  │ │
│     └─────────────────────────────────────────────────────┘ │
│     offer(X) → ✅ ALWAYS succeeds                           │
│                                                              │
└──────────────────────────────────────────────────────────────┘

Queue vs Channel vs Stream

ПримитивНазначениеСемантика
QueuePoint-to-point messagingОдин consumer забирает элемент
PubSubBroadcast messagingВсе subscribers получают копию
StreamData transformationPull-based ленивые вычисления
ChannelBidirectionalInput → Transform → Output

Типы очередей

Bounded Queue

Очередь с фиксированной ёмкостью и back-pressure.


const program = Effect.gen(function* () {
  // Очередь вместимостью 10 элементов
  const queue = yield* Queue.bounded<number>(10)
  
  // offer заблокируется если очередь полна
  yield* Queue.offer(queue, 1)
  yield* Queue.offer(queue, 2)
  
  // Демонстрация back-pressure
  yield* Effect.fork(
    Effect.gen(function* () {
      // Заполняем очередь
      for (let i = 0; i < 15; i++) {
        yield* Queue.offer(queue, i)
        yield* Effect.log(`Offered ${i}`)
      }
    })
  )
  
  yield* Effect.sleep("100 millis")
  
  // Освобождаем место
  for (let i = 0; i < 15; i++) {
    const value = yield* Queue.take(queue)
    yield* Effect.log(`Took ${value}`)
  }
})

Когда использовать:

  • ✅ Нужен контроль потребления памяти
  • ✅ Producer должен замедлиться при перегрузке consumer
  • ✅ Защита от OOM при бесконечных потоках данных

Dropping Queue

Отбрасывает новые элементы при переполнении.


const program = Effect.gen(function* () {
  // Очередь на 3 элемента, dropping strategy
  const queue = yield* Queue.dropping<number>(3)
  
  // Добавляем 5 элементов
  for (let i = 1; i <= 5; i++) {
    const accepted = yield* Queue.offer(queue, i)
    console.log(`offer(${i}): ${accepted}`)
  }
  // Output:
  // offer(1): true
  // offer(2): true
  // offer(3): true
  // offer(4): false  ← отброшен
  // offer(5): false  ← отброшен
  
  // В очереди только первые 3
  console.log(yield* Queue.takeAll(queue))
  // { _id: 'Chunk', values: [ 1, 2, 3 ] }
})

Когда использовать:

  • ✅ Допустима потеря данных (метрики, логи)
  • ✅ Producer не должен блокироваться
  • ✅ Важна последняя информация (например, статусы)

Sliding Queue

Отбрасывает старые элементы при переполнении.


const program = Effect.gen(function* () {
  // Очередь на 3 элемента, sliding strategy
  const queue = yield* Queue.sliding<number>(3)
  
  // Добавляем 5 элементов
  for (let i = 1; i <= 5; i++) {
    yield* Queue.offer(queue, i)
  }
  
  // В очереди последние 3
  console.log(yield* Queue.takeAll(queue))
  // { _id: 'Chunk', values: [ 3, 4, 5 ] }
})

Когда использовать:

  • ✅ Важны только последние N значений
  • ✅ Буфер для “скользящего окна”
  • ✅ Real-time системы (последние показания датчиков)

Unbounded Queue

Неограниченная очередь (используйте осторожно!).


const program = Effect.gen(function* () {
  // ⚠️ Может вызвать OOM при быстром producer
  const queue = yield* Queue.unbounded<number>()
  
  // offer всегда успешен
  for (let i = 0; i < 1000000; i++) {
    yield* Queue.offer(queue, i)
  }
  
  console.log(`Size: ${yield* Queue.size(queue)}`)
  // Size: 1000000
})

Когда использовать:

  • ✅ Consumer гарантированно быстрее producer
  • ✅ Кратковременные буферы
  • ⚠️ С осторожностью в production!

API Reference

Создание очередей


// Bounded (back-pressure)
declare const bounded: <A>(capacity: number) => Effect.Effect<Queue<A>>

// Dropping (отбрасывает новые)
declare const dropping: <A>(capacity: number) => Effect.Effect<Queue<A>>

// Sliding (отбрасывает старые)  
declare const sliding: <A>(capacity: number) => Effect.Effect<Queue<A>>

// Unbounded (без ограничений)
declare const unbounded: <A>() => Effect.Effect<Queue<A>>

Добавление элементов

Queue.offer [STABLE]

Добавляет один элемент в очередь.

declare const offer: <A>(
  value: A
) => (self: Queue<A>) => Effect.Effect<boolean>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<string>(10)
  
  // Добавляем элемент
  const success = yield* Queue.offer(queue, "hello")
  console.log(success) // true
})

Queue.offerAll [STABLE]

Добавляет множество элементов.

declare const offerAll: <A>(
  values: Iterable<A>
) => (self: Queue<A>) => Effect.Effect<boolean>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  console.log(yield* Queue.size(queue)) // 5
})

Извлечение элементов

Queue.take [STABLE]

Извлекает и удаляет элемент (блокирует если пусто).

declare const take: <A>(self: Queue<A>) => Effect.Effect<A>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  // Запускаем consumer, который будет ждать
  const consumer = yield* Effect.fork(
    Effect.gen(function* () {
      const value = yield* Queue.take(queue)
      console.log(`Received: ${value}`)
    })
  )
  
  yield* Effect.sleep("100 millis")
  
  // Добавляем элемент — consumer пробудится
  yield* Queue.offer(queue, 42)
  
  yield* Fiber.join(consumer)
  // Output: Received: 42
})

Queue.takeUpTo [STABLE]

Извлекает до N элементов (не блокирует).

declare const takeUpTo: (
  max: number
) => <A>(self: Queue<A>) => Effect.Effect<Chunk<A>>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
  
  // Берём максимум 3
  const chunk1 = yield* Queue.takeUpTo(queue, 3)
  console.log(chunk1) // Chunk(1, 2, 3)
  
  // Берём остальные (даже если меньше запрошенного)
  const chunk2 = yield* Queue.takeUpTo(queue, 10)
  console.log(chunk2) // Chunk(4, 5)
})

Queue.takeAll [STABLE]

Извлекает все доступные элементы.

declare const takeAll: <A>(self: Queue<A>) => Effect.Effect<Chunk<A>>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  yield* Queue.offerAll(queue, [1, 2, 3])
  
  const all = yield* Queue.takeAll(queue)
  console.log(all) // Chunk(1, 2, 3)
  
  // Очередь теперь пуста
  console.log(yield* Queue.size(queue)) // 0
})

Queue.poll [STABLE]

Извлекает элемент без блокировки.

declare const poll: <A>(
  self: Queue<A>
) => Effect.Effect<Option<A>>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  // Очередь пуста
  const empty = yield* Queue.poll(queue)
  console.log(Option.isNone(empty)) // true
  
  yield* Queue.offer(queue, 42)
  
  // Теперь есть элемент
  const value = yield* Queue.poll(queue)
  console.log(value) // Some(42)
})

Информация об очереди

Queue.size [STABLE]

Возвращает текущий размер очереди.

declare const size: <A>(self: Queue<A>) => Effect.Effect<number>

Queue.isEmpty / Queue.isFull [STABLE]

Проверяют состояние очереди.

declare const isEmpty: <A>(self: Queue<A>) => Effect.Effect<boolean>
declare const isFull: <A>(self: Queue<A>) => Effect.Effect<boolean>

Queue.capacity [STABLE]

Возвращает ёмкость очереди (число, не Effect).

declare const capacity: <A>(self: Queue<A>) => number

Управление жизненным циклом

Queue.shutdown [STABLE]

Закрывает очередь. Все ожидающие операции прерываются.

declare const shutdown: <A>(self: Queue<A>) => Effect.Effect<void>

const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  // Consumer, который ждёт элемент
  const consumer = yield* Effect.fork(Queue.take(queue))
  
  yield* Effect.sleep("100 millis")
  
  // Закрываем очередь
  yield* Queue.shutdown(queue)
  
  // Consumer получит прерывание
  const exit = yield* Fiber.await(consumer)
  console.log(Exit.isInterrupted(exit)) // true
})

Queue.awaitShutdown [STABLE]

Ожидает закрытия очереди.

declare const awaitShutdown: <A>(self: Queue<A>) => Effect.Effect<void>

Queue.isShutdown [STABLE]

Проверяет, закрыта ли очередь.

declare const isShutdown: <A>(self: Queue<A>) => Effect.Effect<boolean>

Ограниченный доступ

Enqueue и Dequeue

Queue можно “сузить” до только-запись или только-чтение:


// Только для записи (producer API)
const sendOnly = (queue: Queue.Enqueue<number>, value: number) =>
  Queue.offer(queue, value)

// Только для чтения (consumer API)  
const receiveOnly = (queue: Queue.Dequeue<number>) =>
  Queue.take(queue)

// Queue реализует оба интерфейса
const program = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(10)
  
  // Можно передать как Enqueue или Dequeue
  yield* sendOnly(queue, 42)
  const value = yield* receiveOnly(queue)
})

Примеры

Producer-Consumer


const producerConsumer = Effect.gen(function* () {
  const queue = yield* Queue.bounded<number>(5)
  
  // Producer: генерирует числа
  const producer = Effect.gen(function* () {
    for (let i = 1; i <= 20; i++) {
      yield* Queue.offer(queue, i)
      yield* Effect.log(`Produced: ${i}`)
      yield* Effect.sleep("50 millis")
    }
    yield* Queue.shutdown(queue)
    yield* Effect.log("Producer: done")
  })
  
  // Consumer: обрабатывает числа
  const consumer = Effect.gen(function* () {
    let total = 0
    
    while (!(yield* Queue.isShutdown(queue))) {
      const items = yield* Queue.takeUpTo(queue, 3).pipe(
        Effect.timeout("200 millis"),
        Effect.option
      )
      
      if (items._tag === "Some" && items.value.length > 0) {
        const batch = items.value
        total += Chunk.reduce(batch, 0, (a, b) => a + b)
        yield* Effect.log(`Consumed batch: ${JSON.stringify([...batch])}, total: ${total}`)
      }
    }
    
    // Забираем остатки
    const remaining = yield* Queue.takeAll(queue)
    if (remaining.length > 0) {
      total += Chunk.reduce(remaining, 0, (a, b) => a + b)
      yield* Effect.log(`Final batch: ${JSON.stringify([...remaining])}, total: ${total}`)
    }
    
    return total
  })
  
  // Запускаем параллельно
  const producerFiber = yield* Effect.fork(producer)
  const result = yield* consumer
  yield* Fiber.join(producerFiber)
  
  yield* Effect.log(`Final total: ${result}`)
})

Effect.runPromise(producerConsumer)

Work Queue (Job Processing)


interface Job {
  readonly id: string
  readonly payload: string
}

const workQueue = Effect.gen(function* () {
  const jobs = yield* Queue.bounded<Job>(100)
  const results = yield* Queue.unbounded<{ id: string; result: string }>()
  
  // Worker: обрабатывает задачи
  const worker = (workerId: number) =>
    Effect.gen(function* () {
      while (true) {
        const job = yield* Queue.take(jobs)
        yield* Effect.log(`Worker ${workerId}: processing ${job.id}`)
        
        // Симуляция работы
        yield* Effect.sleep(`${Math.random() * 100} millis`)
        
        yield* Queue.offer(results, {
          id: job.id,
          result: `Processed by worker ${workerId}: ${job.payload.toUpperCase()}`
        })
      }
    }).pipe(
      Effect.catchAll(() => Effect.log(`Worker ${workerId}: queue closed`))
    )
  
  // Запускаем 3 worker'а
  const workers = yield* Effect.all(
    Array.range(1, 3).map((id) => Effect.fork(worker(id)))
  )
  
  // Добавляем задачи
  for (let i = 1; i <= 10; i++) {
    yield* Queue.offer(jobs, { id: `job-${i}`, payload: `task ${i}` })
  }
  
  // Ждём обработки
  yield* Effect.sleep("500 millis")
  
  // Собираем результаты
  const allResults = yield* Queue.takeAll(results)
  
  // Завершаем workers
  yield* Queue.shutdown(jobs)
  yield* Effect.forEach(workers, Fiber.interrupt, { discard: true })
  
  return [...allResults]
})

Effect.runPromise(workQueue).then(console.log)

Rate-Limited Queue


interface RateLimitedQueue<A> {
  readonly offer: (value: A) => Effect.Effect<boolean>
  readonly take: Effect.Effect<A>
}

const makeRateLimitedQueue = <A>(
  maxPerSecond: number
): Effect.Effect<RateLimitedQueue<A>> =>
  Effect.gen(function* () {
    const queue = yield* Queue.unbounded<A>()
    const tokens = yield* Queue.bounded<void>(maxPerSecond)
    
    // Token replenisher
    yield* Effect.fork(
      Effect.forever(
        Effect.gen(function* () {
          yield* Effect.sleep("1 second")
          // Пополняем токены
          const current = yield* Queue.size(tokens)
          const toAdd = maxPerSecond - current
          for (let i = 0; i < toAdd; i++) {
            yield* Queue.offer(tokens, undefined)
          }
        })
      )
    )
    
    // Инициализируем токены
    for (let i = 0; i < maxPerSecond; i++) {
      yield* Queue.offer(tokens, undefined)
    }
    
    return {
      offer: (value: A) => Queue.offer(queue, value),
      
      take: Effect.gen(function* () {
        // Ждём токен
        yield* Queue.take(tokens)
        // Берём элемент
        return yield* Queue.take(queue)
      })
    }
  })

// Использование
const program = Effect.gen(function* () {
  const queue = yield* makeRateLimitedQueue<number>(3) // 3 в секунду
  
  // Producer: добавляет много элементов сразу
  yield* Effect.fork(
    Effect.forEach(
      Array.range(1, 10),
      (n) => Effect.gen(function* () {
        yield* queue.offer(n)
        yield* Effect.log(`Offered: ${n}`)
      }),
      { discard: true }
    )
  )
  
  // Consumer: получает с rate limit
  yield* Effect.forEach(
    Array.range(1, 10),
    () => Effect.gen(function* () {
      const value = yield* queue.take
      yield* Effect.log(`Took: ${value} at ${Date.now()}`)
    }),
    { discard: true }
  )
})

Priority Queue


interface PriorityItem<A> {
  readonly priority: number
  readonly value: A
}

interface PriorityQueue<A> {
  readonly offer: (priority: number, value: A) => Effect.Effect<void>
  readonly take: Effect.Effect<A>
  readonly size: Effect.Effect<number>
}

const makePriorityQueue = <A>(): Effect.Effect<PriorityQueue<A>> =>
  Effect.gen(function* () {
    // Используем несколько очередей для разных приоритетов
    const high = yield* Queue.unbounded<A>()   // priority >= 7
    const medium = yield* Queue.unbounded<A>() // priority >= 4
    const low = yield* Queue.unbounded<A>()    // priority < 4
    
    return {
      offer: (priority: number, value: A) => {
        if (priority >= 7) return Effect.asVoid(Queue.offer(high, value))
        if (priority >= 4) return Effect.asVoid(Queue.offer(medium, value))
        return Effect.asVoid(Queue.offer(low, value))
      },
      
      take: Effect.gen(function* () {
        // Сначала пробуем высокий приоритет
        const fromHigh = yield* Queue.poll(high)
        if (Option.isSome(fromHigh)) return fromHigh.value
        
        // Затем средний
        const fromMedium = yield* Queue.poll(medium)
        if (Option.isSome(fromMedium)) return fromMedium.value
        
        // Наконец низкий (или ждём)
        const fromLow = yield* Queue.poll(low)
        if (Option.isSome(fromLow)) return fromLow.value
        
        // Все пусты — ждём любую
        return yield* Effect.race(
          Queue.take(high),
          Effect.race(Queue.take(medium), Queue.take(low))
        )
      }),
      
      size: Effect.gen(function* () {
        const h = yield* Queue.size(high)
        const m = yield* Queue.size(medium)
        const l = yield* Queue.size(low)
        return h + m + l
      })
    }
  })

Паттерны использования

Fan-Out / Fan-In


// Fan-out: один источник, много обработчиков
const fanOut = <A, B>(
  source: Queue.Dequeue<A>,
  workers: number,
  process: (a: A) => Effect.Effect<B>
): Effect.Effect<Queue.Dequeue<B>> =>
  Effect.gen(function* () {
    const results = yield* Queue.unbounded<B>()
    
    // Запускаем workers
    const fibers = yield* Effect.all(
      Array.makeBy(workers, () =>
        Effect.fork(
          Effect.forever(
            Effect.gen(function* () {
              const item = yield* Queue.take(source)
              const result = yield* process(item)
              yield* Queue.offer(results, result)
            })
          )
        )
      )
    )
    
    return results as Queue.Dequeue<B>
  })

// Fan-in: много источников, один обработчик
const fanIn = <A>(
  sources: ReadonlyArray<Queue.Dequeue<A>>
): Effect.Effect<Queue.Dequeue<A>> =>
  Effect.gen(function* () {
    const merged = yield* Queue.unbounded<A>()
    
    // Для каждого источника запускаем forwarder
    yield* Effect.forEach(
      sources,
      (source) =>
        Effect.fork(
          Effect.forever(
            Effect.gen(function* () {
              const item = yield* Queue.take(source)
              yield* Queue.offer(merged, item)
            })
          )
        ),
      { discard: true }
    )
    
    return merged as Queue.Dequeue<A>
  })

Batching


interface BatchConfig {
  readonly maxSize: number
  readonly maxWait: string
}

const batchQueue = <A>(
  source: Queue.Dequeue<A>,
  config: BatchConfig
): Effect.Effect<Queue.Dequeue<Chunk.Chunk<A>>> =>
  Effect.gen(function* () {
    const batches = yield* Queue.unbounded<Chunk.Chunk<A>>()
    
    yield* Effect.fork(
      Effect.forever(
        Effect.gen(function* () {
          const batch: A[] = []
          const deadline = Date.now() + parseInt(config.maxWait)
          
          while (batch.length < config.maxSize && Date.now() < deadline) {
            const remaining = deadline - Date.now()
            
            if (remaining <= 0) break
            
            const item = yield* Queue.take(source).pipe(
              Effect.timeout(`${remaining} millis`),
              Effect.option
            )
            
            if (Option.isSome(item)) {
              batch.push(item.value)
            }
          }
          
          if (batch.length > 0) {
            yield* Queue.offer(batches, Chunk.fromIterable(batch))
          }
        })
      )
    )
    
    return batches as Queue.Dequeue<Chunk.Chunk<A>>
  })

Circuit Breaker Queue


type CircuitState = "closed" | "open" | "half-open"

interface CircuitBreakerQueue<A> {
  readonly offer: (value: A) => Effect.Effect<boolean>
  readonly take: Effect.Effect<A>
  readonly getState: Effect.Effect<CircuitState>
}

const makeCircuitBreakerQueue = <A>(
  config: {
    readonly capacity: number
    readonly failureThreshold: number
    readonly resetTimeout: string
  }
): Effect.Effect<CircuitBreakerQueue<A>> =>
  Effect.gen(function* () {
    const queue = yield* Queue.bounded<A>(config.capacity)
    const state = yield* Ref.make<CircuitState>("closed")
    const failures = yield* Ref.make(0)
    
    return {
      offer: (value: A) =>
        Effect.gen(function* () {
          const currentState = yield* Ref.get(state)
          
          if (currentState === "open") {
            return false // Circuit is open, reject
          }
          
          const result = yield* Queue.offer(queue, value).pipe(
            Effect.timeout("100 millis"),
            Effect.option
          )
          
          if (result._tag === "None") {
            // Timeout — считаем как failure
            const count = yield* Ref.updateAndGet(failures, (n) => n + 1)
            
            if (count >= config.failureThreshold) {
              yield* Ref.set(state, "open")
              // Планируем переход в half-open
              yield* Effect.fork(
                Effect.gen(function* () {
                  yield* Effect.sleep(config.resetTimeout)
                  yield* Ref.set(state, "half-open")
                })
              )
            }
            
            return false
          }
          
          // Успех — сбрасываем счётчик
          yield* Ref.set(failures, 0)
          if ((yield* Ref.get(state)) === "half-open") {
            yield* Ref.set(state, "closed")
          }
          
          return true
        }),
      
      take: Queue.take(queue),
      getState: Ref.get(state)
    }
  })

Упражнения

Упражнение

Simple Buffer

Легко

Создайте буфер, который накапливает элементы и отдаёт их пачками.

import { Effect, Queue, Chunk } from "effect"

interface Buffer<A> {
  readonly add: (value: A) => Effect.Effect<void>
  readonly flush: Effect.Effect<Chunk.Chunk<A>>
}

const makeBuffer = <A>(): Effect.Effect<Buffer<A>> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // ???
  })
Упражнение

Timeout Queue

Средне

Создайте очередь, где take автоматически возвращает default значение после timeout.

import { Effect, Queue, Option } from "effect"

interface TimeoutQueue<A> {
  readonly offer: (value: A) => Effect.Effect<boolean>
  readonly take: (timeout: string, defaultValue: A) => Effect.Effect<A>
}

const makeTimeoutQueue = <A>(capacity: number): Effect.Effect<TimeoutQueue<A>> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // ???
  })
Упражнение

Deduplicating Queue

Средне

Создайте очередь, которая отбрасывает дубликаты.

import { Effect, Queue, HashSet } from "effect"

interface DedupeQueue<A> {
  readonly offer: (value: A) => Effect.Effect<boolean> // false если дубликат
  readonly take: Effect.Effect<A>
}

const makeDedupeQueue = <A>(
  capacity: number,
  hash: (a: A) => string
): Effect.Effect<DedupeQueue<A>> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // Подсказка: используйте Ref<HashSet> для отслеживания seen values
    // ???
  })
Упражнение

Multi-Channel Queue

Сложно

Создайте очередь с несколькими каналами, где consumer может подписаться на определённые каналы.

import { Effect, Queue, HashMap, Chunk } from "effect"

interface MultiChannelQueue<A> {
  readonly publish: (channel: string, value: A) => Effect.Effect<void>
  readonly subscribe: (
    channels: ReadonlyArray<string>
  ) => Effect.Effect<Queue.Dequeue<{ channel: string; value: A }>>
}

const makeMultiChannelQueue = <A>(): Effect.Effect<MultiChannelQueue<A>> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // ???
  })

Резюме

Queue<A> — основной примитив для передачи данных между файберами:

Тип очередиПоведение при переполненииUse case
boundedБлокирует producerFlow control, back-pressure
droppingОтбрасывает новыеMetrics, lossy channels
slidingОтбрасывает старыеLatest values buffer
unboundedБез ограниченийКратковременные буферы

Основные операции

ОперацияБлокирует?Описание
offerЗависит от типаДобавляет элемент
takeДа, если пустоИзвлекает элемент
pollНетИзвлекает если есть
takeAllНетИзвлекает все
shutdownНетЗакрывает очередь

Best Practices

  • ✅ Используйте bounded для production workloads
  • ✅ Обрабатывайте shutdown сигналы
  • ✅ Используйте Enqueue/Dequeue для разделения ответственности
  • ⚠️ Осторожно с unbounded — риск OOM
  • ⚠️ Помните о back-pressure при выборе стратегии