Effect Курс Конкурентность
Глава

Конкурентность

Конкурентная обработка потоков позволяет параллельно выполнять трансформации, объединять несколько источников данных и управлять степенью параллелизма — от строго последовательного до полностью неограниченного.

Теория

Модель конкурентности потоков

Stream в Effect-ts поддерживает несколько режимов конкурентности:

┌───────────────────────────────────────────────┐
│  Последовательный (default)                    │
│  [A] → [B] → [C] → [D]                       │
│  Один элемент за раз                          │
│  Максимальная предсказуемость                 │
├───────────────────────────────────────────────┤
│  Параллельный (concurrency: N)                │
│  [A] ──┐                                     │
│  [B] ──┼──► collect results                   │
│  [C] ──┘                                     │
│  N элементов одновременно                     │
│  Порядок результатов сохраняется              │
├───────────────────────────────────────────────┤
│  Неограниченный (concurrency: "unbounded")    │
│  [A] ──┐                                     │
│  [B] ──┤                                     │
│  [C] ──┼──► collect results                   │
│  [D] ──┤                                     │
│  [E] ──┘                                     │
│  Все элементы одновременно                    │
│  ⚠️ Может перегрузить ресурсы                 │
├───────────────────────────────────────────────┤
│  Switch (switch: true)                        │
│  [A] → cancel → [B] → cancel → [C]           │
│  Только последний элемент                     │
│  Предыдущие отменяются                        │
└───────────────────────────────────────────────┘

Конкурентный маппинг

Stream.mapEffect с concurrency

Stream.mapEffect поддерживает опцию concurrency для параллельной обработки:

import { Stream, Effect } from "effect"

const fetchUrl = (url: string) =>
  Effect.gen(function* () {
    console.log(`Fetching ${url}`)
    yield* Effect.sleep("100 millis")
    console.log(`Done ${url}`)
    return `Result: ${url}`
  })

const stream = Stream.make("url1", "url2", "url3", "url4").pipe(
  Stream.mapEffect(fetchUrl, { concurrency: 2 })
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Fetching url1
Fetching url2       ← 2 одновременно
Done url1
Fetching url3       ← следующий начинается после завершения
Done url2
Fetching url4
Done url3
Done url4
{ _id: 'Chunk', values: [ 'Result: url1', 'Result: url2', 'Result: url3', 'Result: url4' ] }
*/

Важное свойство: несмотря на параллельное выполнение, порядок результатов сохраняется.

Неограниченная конкурентность

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.mapEffect(
    (n) =>
      Effect.sleep(`${n * 50} millis`).pipe(
        Effect.as(n)
      ),
    { concurrency: "unbounded" }
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок сохранён: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Unordered — без гарантии порядка

Для максимальной производительности, когда порядок не важен:

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 5).pipe(
  Stream.mapEffect(
    (n) =>
      Effect.sleep(`${(6 - n) * 100} millis`).pipe(
        Effect.as(n)
      ),
    { concurrency: 5, unordered: true }
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок НЕ гарантирован, например: [5, 4, 3, 2, 1]

Конкурентный flatMap

Stream.flatMap с concurrency

import { Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3).pipe(
  Stream.flatMap(
    (n) =>
      Stream.fromEffect(
        Effect.sleep(`${n * 100} millis`).pipe(Effect.as(n * 10))
      ),
    { concurrency: 3 }
  )
)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)

Switch-режим — отмена предыдущих

При появлении нового элемента текущий внутренний поток отменяется:

import { Stream, Effect, Schedule } from "effect"

// Автокомплит: при каждом новом символе предыдущий поиск отменяется
const searchStream = Stream.make("a", "ab", "abc").pipe(
  Stream.schedule(Schedule.spaced("300 millis")),
  Stream.flatMap(
    (query) =>
      Stream.fromEffect(
        Effect.gen(function* () {
          console.log(`Searching: ${query}`)
          yield* Effect.sleep("500 millis")
          return `Results for: ${query}`
        })
      ),
    { switch: true }
  )
)

Effect.runPromise(Stream.runCollect(searchStream)).then(console.log)
// Только последний поиск "abc" завершится успешно

Merge — объединение потоков

Stream.merge — объединение двух потоков

merge чередует элементы по мере их доступности (не ждёт один поток):

import { Schedule, Stream, Effect } from "effect"

const s1 = Stream.make(1, 2, 3).pipe(
  Stream.schedule(Schedule.spaced("100 millis"))
)

const s2 = Stream.make(4, 5, 6).pipe(
  Stream.schedule(Schedule.spaced("200 millis"))
)

const merged = Stream.merge(s1, s2)

Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }

Стратегия завершения (haltStrategy)

По умолчанию merged-поток ждёт завершения обоих потоков. Это можно изменить:

СтратегияОписание
"both" (default)Ждёт завершения обоих потоков
"left"Завершается при завершении левого
"right"Завершается при завершении правого
"either"Завершается при завершении любого
import { Stream, Schedule, Effect } from "effect"

const fast = Stream.range(1, 5).pipe(
  Stream.schedule(Schedule.spaced("100 millis"))
)

const slow = Stream.repeatValue(0).pipe(
  Stream.schedule(Schedule.spaced("200 millis"))
)

// Завершаемся когда fast закончится
const merged = Stream.merge(fast, slow, { haltStrategy: "left" })

Effect.runPromise(Stream.runCollect(merged)).then(console.log)

Stream.mergeWith — объединение с трансформацией

import { Schedule, Stream, Effect } from "effect"

const strings = Stream.make("1", "2", "3").pipe(
  Stream.schedule(Schedule.spaced("100 millis"))
)

const numbers = Stream.make(4.1, 5.3, 6.2).pipe(
  Stream.schedule(Schedule.spaced("200 millis"))
)

const merged = Stream.mergeWith(strings, numbers, {
  onSelf: (s) => parseInt(s),       // string → number
  onOther: (n) => Math.floor(n)     // float → int
})

Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }

Stream.mergeAll — объединение множества потоков

import { Stream, Effect } from "effect"

const streams = [
  Stream.make(1, 2),
  Stream.make(3, 4),
  Stream.make(5, 6)
]

const merged = Stream.mergeAll(streams, { concurrency: 3 })

Effect.runPromise(Stream.runCollect(merged)).then(console.log)

Interleaving — чередование

Stream.interleave — строгое чередование

Берёт по одному элементу из каждого потока поочерёдно:

import { Stream, Effect } from "effect"

const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)

const stream = Stream.interleave(s1, s2)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 5, 3, 6 ] }

Stream.interleaveWith — управляемое чередование

Использует поток boolean для управления, из какого потока брать:

import { Stream, Effect } from "effect"

const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)

// true → s1, false → s2
const pattern = Stream.make(true, false, false).pipe(Stream.forever)

const stream = Stream.interleaveWith(s1, s2, pattern)

Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ] }

Buffering — буферизация

Stream.buffer — буфер между producer и consumer

Позволяет producer работать с опережением consumer:

import { Stream, Console, Schedule, Effect } from "effect"

const stream = Stream.range(1, 10).pipe(
  Stream.tap((n) => Console.log(`produced: ${n}`)),
  Stream.buffer({ capacity: 4 }),
  Stream.tap((n) => Console.log(`consumed: ${n}`)),
  Stream.schedule(Schedule.spaced("5 seconds"))
)

Стратегии буферизации:

ТипКонфигурацияПоведение при заполнении
Bounded{ capacity: N }Back-pressure (producer ждёт)
Unbounded{ capacity: "unbounded" }Без лимита (⚠️ OOM risk)
Sliding{ capacity: N, strategy: "sliding" }Отбрасывает старые
Dropping{ capacity: N, strategy: "dropping" }Отбрасывает новые

Throttling и Debouncing

Stream.throttle — ограничение скорости

import { Stream, Effect } from "effect"

const stream = Stream.range(1, 100).pipe(
  Stream.throttle({
    cost: () => 1,           // стоимость каждого элемента
    duration: "1 second",    // период
    units: 10,               // максимум единиц за период
    strategy: "enforce"      // enforce | shape
  })
)

Stream.debounce — антидребезг

Эмитирует значение только после паузы в указанную длительность:

import { Stream, Effect, Schedule } from "effect"

const rapidEvents = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.schedule(Schedule.spaced("50 millis"))
)

const debounced = rapidEvents.pipe(
  Stream.debounce("200 millis")
)
// Только последнее значение после 200мс паузы

Scheduling — расписание эмиссии

Stream.schedule — управление скоростью

import { Stream, Schedule, Console, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5).pipe(
  Stream.schedule(Schedule.spaced("1 second")),
  Stream.tap((n) => Console.log(`Emitted: ${n}`))
)

Effect.runPromise(Stream.runCollect(stream))
// Элементы эмитируются с интервалом 1 секунда

API Reference

ФункцияОписание
mapEffect(f, { concurrency })Параллельный маппинг
flatMap(f, { concurrency })Параллельный flatMap
flatMap(f, { switch: true })Switch-режим flatMap
merge(other)Объединение двух потоков
merge(other, { haltStrategy })С стратегией завершения
mergeWith(other, { onSelf, onOther })Объединение с трансформацией
mergeAll(streams, { concurrency })Объединение множества потоков
interleave(other)Строгое чередование 1:1
interleaveWith(other, boolStream)Управляемое чередование
buffer({ capacity })Буфер для back-pressure
throttle(config)Ограничение скорости
debounce(duration)Антидребезг
schedule(schedule)Управление скоростью эмиссии

Примеры

Production: параллельный HTTP crawler

import { Stream, Effect, Schedule, Console, Chunk } from "effect"

interface CrawlResult {
  readonly url: string
  readonly status: number
  readonly contentLength: number
}

const crawlUrl = (url: string): Effect.Effect<CrawlResult, Error> =>
  Effect.gen(function* () {
    yield* Effect.sleep("100 millis") // симуляция HTTP
    return {
      url,
      status: 200,
      contentLength: Math.floor(Math.random() * 10000)
    }
  })

const urls = Stream.fromIterable([
  "https://api.example.com/users",
  "https://api.example.com/products",
  "https://api.example.com/orders",
  "https://api.example.com/reviews",
  "https://api.example.com/categories"
])

const crawler = urls.pipe(
  // Параллельный crawl с лимитом 3 одновременных запросов
  Stream.mapEffect(crawlUrl, { concurrency: 3 }),

  // Логирование
  Stream.tap((result) =>
    Console.log(`Crawled ${result.url}: ${result.status} (${result.contentLength}b)`)
  ),

  // Сбор результатов
  Stream.runCollect
)

Effect.runPromise(crawler).then((results) =>
  console.log(`Crawled ${Chunk.size(results)} pages`)
)

Production: merge нескольких источников событий

import { Stream, Effect, Schedule, Random } from "effect"

// Три источника событий с разной скоростью
const userEvents = Stream.repeatEffect(
  Effect.succeed({ type: "user" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("500 millis")))

const systemEvents = Stream.repeatEffect(
  Effect.succeed({ type: "system" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("2 seconds")))

const auditEvents = Stream.repeatEffect(
  Effect.succeed({ type: "audit" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("5 seconds")))

// Объединяем все источники в единый поток
const allEvents = Stream.mergeAll(
  [userEvents, systemEvents, auditEvents],
  { concurrency: 3 }
).pipe(
  Stream.take(20)
)

Упражнения

🟢 Basic

Упражнение 1: Создайте два потока чисел (чётные и нечётные). Объедините их с помощью merge. Соберите первые 10 элементов.

Решение:

import { Stream, Effect } from "effect"

const evens = Stream.iterate(0, (n) => n + 2)
const odds = Stream.iterate(1, (n) => n + 2)

const program = Stream.merge(evens, odds).pipe(
  Stream.take(10),
  Stream.runCollect
)

Effect.runPromise(program).then(console.log)

🟡 Intermediate

Упражнение 2: Реализуйте параллельный процессор заказов: поток заказов обрабатывается с concurrency=5, каждый заказ «обрабатывается» 200мс. Измерьте общее время обработки 20 заказов (должно быть ~800мс, а не 4000мс).

🔴 Advanced

Упражнение 3: Реализуйте rate-limited API client: merge 3 источника запросов, общий throttle не более 10 запросов в секунду, buffer для сглаживания burst-нагрузок. Каждый запрос — mapEffect с concurrency=5.


🔗 Далее: 08-broadcasting.md — broadcast, distributedWith, toPubSub