Конкурентность
Конкурентная обработка потоков позволяет параллельно выполнять трансформации, объединять несколько источников данных и управлять степенью параллелизма — от строго последовательного до полностью неограниченного.
Теория
Модель конкурентности потоков
Stream в Effect-ts поддерживает несколько режимов конкурентности:
┌───────────────────────────────────────────────┐
│ Последовательный (default) │
│ [A] → [B] → [C] → [D] │
│ Один элемент за раз │
│ Максимальная предсказуемость │
├───────────────────────────────────────────────┤
│ Параллельный (concurrency: N) │
│ [A] ──┐ │
│ [B] ──┼──► collect results │
│ [C] ──┘ │
│ N элементов одновременно │
│ Порядок результатов сохраняется │
├───────────────────────────────────────────────┤
│ Неограниченный (concurrency: "unbounded") │
│ [A] ──┐ │
│ [B] ──┤ │
│ [C] ──┼──► collect results │
│ [D] ──┤ │
│ [E] ──┘ │
│ Все элементы одновременно │
│ ⚠️ Может перегрузить ресурсы │
├───────────────────────────────────────────────┤
│ Switch (switch: true) │
│ [A] → cancel → [B] → cancel → [C] │
│ Только последний элемент │
│ Предыдущие отменяются │
└───────────────────────────────────────────────┘
Конкурентный маппинг
Stream.mapEffect с concurrency
Stream.mapEffect поддерживает опцию concurrency для параллельной обработки:
import { Stream, Effect } from "effect"
const fetchUrl = (url: string) =>
Effect.gen(function* () {
console.log(`Fetching ${url}`)
yield* Effect.sleep("100 millis")
console.log(`Done ${url}`)
return `Result: ${url}`
})
const stream = Stream.make("url1", "url2", "url3", "url4").pipe(
Stream.mapEffect(fetchUrl, { concurrency: 2 })
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
/*
Fetching url1
Fetching url2 ← 2 одновременно
Done url1
Fetching url3 ← следующий начинается после завершения
Done url2
Fetching url4
Done url3
Done url4
{ _id: 'Chunk', values: [ 'Result: url1', 'Result: url2', 'Result: url3', 'Result: url4' ] }
*/
Важное свойство: несмотря на параллельное выполнение, порядок результатов сохраняется.
Неограниченная конкурентность
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.mapEffect(
(n) =>
Effect.sleep(`${n * 50} millis`).pipe(
Effect.as(n)
),
{ concurrency: "unbounded" }
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок сохранён: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Unordered — без гарантии порядка
Для максимальной производительности, когда порядок не важен:
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 5).pipe(
Stream.mapEffect(
(n) =>
Effect.sleep(`${(6 - n) * 100} millis`).pipe(
Effect.as(n)
),
{ concurrency: 5, unordered: true }
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// Порядок НЕ гарантирован, например: [5, 4, 3, 2, 1]
Конкурентный flatMap
Stream.flatMap с concurrency
import { Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3).pipe(
Stream.flatMap(
(n) =>
Stream.fromEffect(
Effect.sleep(`${n * 100} millis`).pipe(Effect.as(n * 10))
),
{ concurrency: 3 }
)
)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
Switch-режим — отмена предыдущих
При появлении нового элемента текущий внутренний поток отменяется:
import { Stream, Effect, Schedule } from "effect"
// Автокомплит: при каждом новом символе предыдущий поиск отменяется
const searchStream = Stream.make("a", "ab", "abc").pipe(
Stream.schedule(Schedule.spaced("300 millis")),
Stream.flatMap(
(query) =>
Stream.fromEffect(
Effect.gen(function* () {
console.log(`Searching: ${query}`)
yield* Effect.sleep("500 millis")
return `Results for: ${query}`
})
),
{ switch: true }
)
)
Effect.runPromise(Stream.runCollect(searchStream)).then(console.log)
// Только последний поиск "abc" завершится успешно
Merge — объединение потоков
Stream.merge — объединение двух потоков
merge чередует элементы по мере их доступности (не ждёт один поток):
import { Schedule, Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const s2 = Stream.make(4, 5, 6).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
const merged = Stream.merge(s1, s2)
Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
Стратегия завершения (haltStrategy)
По умолчанию merged-поток ждёт завершения обоих потоков. Это можно изменить:
| Стратегия | Описание |
|---|---|
"both" (default) | Ждёт завершения обоих потоков |
"left" | Завершается при завершении левого |
"right" | Завершается при завершении правого |
"either" | Завершается при завершении любого |
import { Stream, Schedule, Effect } from "effect"
const fast = Stream.range(1, 5).pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const slow = Stream.repeatValue(0).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
// Завершаемся когда fast закончится
const merged = Stream.merge(fast, slow, { haltStrategy: "left" })
Effect.runPromise(Stream.runCollect(merged)).then(console.log)
Stream.mergeWith — объединение с трансформацией
import { Schedule, Stream, Effect } from "effect"
const strings = Stream.make("1", "2", "3").pipe(
Stream.schedule(Schedule.spaced("100 millis"))
)
const numbers = Stream.make(4.1, 5.3, 6.2).pipe(
Stream.schedule(Schedule.spaced("200 millis"))
)
const merged = Stream.mergeWith(strings, numbers, {
onSelf: (s) => parseInt(s), // string → number
onOther: (n) => Math.floor(n) // float → int
})
Effect.runPromise(Stream.runCollect(merged)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 3, 5, 6 ] }
Stream.mergeAll — объединение множества потоков
import { Stream, Effect } from "effect"
const streams = [
Stream.make(1, 2),
Stream.make(3, 4),
Stream.make(5, 6)
]
const merged = Stream.mergeAll(streams, { concurrency: 3 })
Effect.runPromise(Stream.runCollect(merged)).then(console.log)
Interleaving — чередование
Stream.interleave — строгое чередование
Берёт по одному элементу из каждого потока поочерёдно:
import { Stream, Effect } from "effect"
const s1 = Stream.make(1, 2, 3)
const s2 = Stream.make(4, 5, 6)
const stream = Stream.interleave(s1, s2)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 4, 2, 5, 3, 6 ] }
Stream.interleaveWith — управляемое чередование
Использует поток boolean для управления, из какого потока брать:
import { Stream, Effect } from "effect"
const s1 = Stream.make(1, 3, 5, 7, 9)
const s2 = Stream.make(2, 4, 6, 8, 10)
// true → s1, false → s2
const pattern = Stream.make(true, false, false).pipe(Stream.forever)
const stream = Stream.interleaveWith(s1, s2, pattern)
Effect.runPromise(Stream.runCollect(stream)).then(console.log)
// { _id: 'Chunk', values: [ 1, 2, 4, 3, 6, 8, 5, 10, 7, 9 ] }
Buffering — буферизация
Stream.buffer — буфер между producer и consumer
Позволяет producer работать с опережением consumer:
import { Stream, Console, Schedule, Effect } from "effect"
const stream = Stream.range(1, 10).pipe(
Stream.tap((n) => Console.log(`produced: ${n}`)),
Stream.buffer({ capacity: 4 }),
Stream.tap((n) => Console.log(`consumed: ${n}`)),
Stream.schedule(Schedule.spaced("5 seconds"))
)
Стратегии буферизации:
| Тип | Конфигурация | Поведение при заполнении |
|---|---|---|
| Bounded | { capacity: N } | Back-pressure (producer ждёт) |
| Unbounded | { capacity: "unbounded" } | Без лимита (⚠️ OOM risk) |
| Sliding | { capacity: N, strategy: "sliding" } | Отбрасывает старые |
| Dropping | { capacity: N, strategy: "dropping" } | Отбрасывает новые |
Throttling и Debouncing
Stream.throttle — ограничение скорости
import { Stream, Effect } from "effect"
const stream = Stream.range(1, 100).pipe(
Stream.throttle({
cost: () => 1, // стоимость каждого элемента
duration: "1 second", // период
units: 10, // максимум единиц за период
strategy: "enforce" // enforce | shape
})
)
Stream.debounce — антидребезг
Эмитирует значение только после паузы в указанную длительность:
import { Stream, Effect, Schedule } from "effect"
const rapidEvents = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.schedule(Schedule.spaced("50 millis"))
)
const debounced = rapidEvents.pipe(
Stream.debounce("200 millis")
)
// Только последнее значение после 200мс паузы
Scheduling — расписание эмиссии
Stream.schedule — управление скоростью
import { Stream, Schedule, Console, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5).pipe(
Stream.schedule(Schedule.spaced("1 second")),
Stream.tap((n) => Console.log(`Emitted: ${n}`))
)
Effect.runPromise(Stream.runCollect(stream))
// Элементы эмитируются с интервалом 1 секунда
API Reference
| Функция | Описание |
|---|---|
mapEffect(f, { concurrency }) | Параллельный маппинг |
flatMap(f, { concurrency }) | Параллельный flatMap |
flatMap(f, { switch: true }) | Switch-режим flatMap |
merge(other) | Объединение двух потоков |
merge(other, { haltStrategy }) | С стратегией завершения |
mergeWith(other, { onSelf, onOther }) | Объединение с трансформацией |
mergeAll(streams, { concurrency }) | Объединение множества потоков |
interleave(other) | Строгое чередование 1:1 |
interleaveWith(other, boolStream) | Управляемое чередование |
buffer({ capacity }) | Буфер для back-pressure |
throttle(config) | Ограничение скорости |
debounce(duration) | Антидребезг |
schedule(schedule) | Управление скоростью эмиссии |
Примеры
Production: параллельный HTTP crawler
import { Stream, Effect, Schedule, Console, Chunk } from "effect"
interface CrawlResult {
readonly url: string
readonly status: number
readonly contentLength: number
}
const crawlUrl = (url: string): Effect.Effect<CrawlResult, Error> =>
Effect.gen(function* () {
yield* Effect.sleep("100 millis") // симуляция HTTP
return {
url,
status: 200,
contentLength: Math.floor(Math.random() * 10000)
}
})
const urls = Stream.fromIterable([
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
"https://api.example.com/reviews",
"https://api.example.com/categories"
])
const crawler = urls.pipe(
// Параллельный crawl с лимитом 3 одновременных запросов
Stream.mapEffect(crawlUrl, { concurrency: 3 }),
// Логирование
Stream.tap((result) =>
Console.log(`Crawled ${result.url}: ${result.status} (${result.contentLength}b)`)
),
// Сбор результатов
Stream.runCollect
)
Effect.runPromise(crawler).then((results) =>
console.log(`Crawled ${Chunk.size(results)} pages`)
)
Production: merge нескольких источников событий
import { Stream, Effect, Schedule, Random } from "effect"
// Три источника событий с разной скоростью
const userEvents = Stream.repeatEffect(
Effect.succeed({ type: "user" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("500 millis")))
const systemEvents = Stream.repeatEffect(
Effect.succeed({ type: "system" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("2 seconds")))
const auditEvents = Stream.repeatEffect(
Effect.succeed({ type: "audit" as const, ts: Date.now() })
).pipe(Stream.schedule(Schedule.spaced("5 seconds")))
// Объединяем все источники в единый поток
const allEvents = Stream.mergeAll(
[userEvents, systemEvents, auditEvents],
{ concurrency: 3 }
).pipe(
Stream.take(20)
)
Упражнения
🟢 Basic
Упражнение 1: Создайте два потока чисел (чётные и нечётные). Объедините их с помощью merge. Соберите первые 10 элементов.
Решение:
import { Stream, Effect } from "effect"
const evens = Stream.iterate(0, (n) => n + 2)
const odds = Stream.iterate(1, (n) => n + 2)
const program = Stream.merge(evens, odds).pipe(
Stream.take(10),
Stream.runCollect
)
Effect.runPromise(program).then(console.log)
🟡 Intermediate
Упражнение 2: Реализуйте параллельный процессор заказов: поток заказов обрабатывается с concurrency=5, каждый заказ «обрабатывается» 200мс. Измерьте общее время обработки 20 заказов (должно быть ~800мс, а не 4000мс).
🔴 Advanced
Упражнение 3: Реализуйте rate-limited API client: merge 3 источника запросов, общий throttle не более 10 запросов в секунду, buffer для сглаживания burst-нагрузок. Каждый запрос — mapEffect с concurrency=5.
🔗 Далее: 08-broadcasting.md — broadcast, distributedWith, toPubSub