Комбинаторы Sink
Композиция потребителей — параллельное, последовательное и конкурентное потребление потоков через алгебраические комбинаторы.
Теория
Три режима композиции
Комбинаторы Sink реализуют три фундаментальных паттерна композиции потребителей:
┌──────────────────────────────────────────────────────┐
│ Композиция Sink │
├──────────────┬──────────────┬────────────────────────┤
│ flatMap │ zip │ race │
│ (sequential)│ (parallel) │ (competitive) │
├──────────────┼──────────────┼────────────────────────┤
│ Sink A ──> │ Sink A ═══> │ Sink A ═══> │
│ then Sink B │ Sink B ═══> │ Sink B ═══> │
│ │ combine │ first wins │
├──────────────┼──────────────┼────────────────────────┤
│ Leftovers │ Broadcast │ Cancel loser │
│ pass to │ stream to │ take winner │
│ next sink │ both sinks │ result │
└──────────────┴──────────────┴────────────────────────┘
- flatMap — последовательная: первый Sink потребляет часть потока, его leftovers передаются второму Sink. Позволяет строить зависимые цепочки.
- zip — параллельная: оба Sink работают одновременно над одним потоком (через broadcast). Результаты объединяются в кортеж.
- race — конкурентная: оба Sink работают одновременно, побеждает первый завершившийся. Проигравший отменяется.
Алгебраические свойства
Комбинаторы Sink обладают важными алгебраическими свойствами:
flatMap — ассоциативность (монада):
flatMap(flatMap(s, f), g) ≡ flatMap(s, x => flatMap(f(x), g))
zip — коммутативность результата при concurrent:
zip(a, b) ≡ map(zip(b, a), ([b, a]) => [a, b])
race — идемпотентность:
race(a, a) ≡ a (один и тот же Sink всегда финиширует одновременно)
Концепция ФП
Монадическая композиция через flatMap
flatMap для Sink реализует монадическое связывание (bind, >>=). В контексте потоков это означает: результат первого потребителя определяет, какой потребитель будет следующим.
Монада Sink:
return a = Sink.succeed(a) -- Sink, немедленно возвращающий a
m >>= f = Sink.flatMap(m, f) -- Связывание
┌─────────┐
Stream: [1, 2, 3, 4, 5, 6, 7, 8]
│ │
┌──────────┐ │
│ Sink A │ │
│ take(3) │ │
└────┬─────┘ │
│ result: Chunk(1,2,3)
│ leftovers: [4, 5, 6, 7, 8]
▼
f(Chunk(1,2,3)) → Sink B
┌──────────────┐
│ Sink B │
│ take(2) │ ← потребляет leftovers
└──────┬───────┘
│ result: Chunk(4, 5)
│ leftovers: [6, 7, 8]
▼
Final: Chunk(4, 5)
Ключевое свойство: функция f имеет доступ к результату первого Sink и может конструировать второй Sink на его основе. Это делает flatMap строго мощнее, чем zip.
Аппликативная композиция через zip
zip реализует аппликативную композицию — оба Sink работают независимо друг от друга. В отличие от flatMap, второй Sink не зависит от результата первого:
Аппликатив:
<*> : Sink<A → B> × Sink<A> → Sink<B>
zip = liftA2((,)) -- поднимает конструктор кортежа
zip(sa, sb) = Sink<[A, B]>
Семигруппа через race
race определяет семигруппу на Sink: ассоциативная бинарная операция выбора “первого завершившегося”:
race(a, race(b, c)) ≡ race(race(a, b), c) -- ассоциативность
Это не коммутативная операция в строгом смысле: при одновременном завершении выбор может быть недетерминированным.
Sink.flatMap — последовательная композиция
Базовый flatMap
flatMap позволяет цепочку потребителей, где каждый следующий зависит от результата предыдущего:
import { Stream, Sink, Effect, Chunk, Option } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
// Шаг 1: берём первые 3 элемента
// Шаг 2: на основе их суммы решаем, сколько ещё взять
const adaptiveSink = Sink.take<number>(3).pipe(
Sink.flatMap((firstThree) => {
const sum = Chunk.toReadonlyArray(firstThree).reduce((a, b) => a + b, 0)
// Если сумма > 5, берём ещё 2, иначе — ещё 4
const remaining = sum > 5 ? 2 : 4
return Sink.take<number>(remaining)
})
)
Effect.runPromise(Stream.run(stream, adaptiveSink)).then(console.log)
// firstThree = [1, 2, 3], sum = 6 > 5, берём ещё 2
// Output: { _id: 'Chunk', values: [ 4, 5 ] }
Механика передачи leftovers
Когда первый Sink в flatMap завершается, его leftovers автоматически становятся входом для следующего Sink:
import { Stream, Sink, Effect, Chunk } from "effect"
const stream = Stream.range(1, 11) // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
// Цепочка: take(3) → foldLeft → вернуть сумму остатка
const chainedSink = Sink.take<number>(3).pipe(
Sink.flatMap((taken) => {
// taken = Chunk(1, 2, 3)
// Leftovers = [4, 5, 6, 7, 8, 9, 10]
// Второй Sink потребляет leftovers
return Sink.foldLeft(0, (acc: number, n: number) => acc + n).pipe(
Sink.map((restSum) => ({
taken: Chunk.toReadonlyArray(taken),
restSum
}))
)
})
)
Effect.runPromise(Stream.run(stream, chainedSink)).then(console.log)
// Output: { taken: [ 1, 2, 3 ], restSum: 49 }
// restSum = 4+5+6+7+8+9+10 = 49
Многошаговые цепочки
flatMap можно вызывать последовательно, строя длинные цепочки потребителей:
import { Stream, Sink, Effect, Option } from "effect"
const stream = Stream.make("header", "v1", "Alice", "30", "Bob", "25")
// Парсинг протокола: header → version → пары (name, age)
const protocolSink = Sink.head<string>().pipe(
Sink.flatMap((header) =>
Sink.head<string>().pipe(
Sink.flatMap((version) =>
Sink.collectAll<string>().pipe(
Sink.map((data) => ({
header: Option.getOrElse(header, () => "unknown"),
version: Option.getOrElse(version, () => "0"),
records: data
}))
)
)
)
)
)
Effect.runPromise(Stream.run(stream, protocolSink)).then(console.log)
// Output:
// {
// header: "header",
// version: "v1",
// records: Chunk("Alice", "30", "Bob", "25")
// }
flatMap через Effect.gen
Для удобства можно использовать Effect.gen-стиль через вложенные flatMap:
import { Stream, Sink, Effect, Option, Chunk, pipe } from "effect"
const stream = Stream.range(1, 21) // 1..20
const multiStepSink = pipe(
Sink.take<number>(5),
Sink.flatMap((first5) =>
pipe(
Sink.take<number>(5),
Sink.flatMap((second5) =>
pipe(
Sink.sum,
Sink.map((restSum) => ({
batch1: Chunk.toReadonlyArray(first5),
batch2: Chunk.toReadonlyArray(second5),
restSum
}))
)
)
)
)
)
Effect.runPromise(Stream.run(stream, multiStepSink)).then(console.log)
// Output:
// {
// batch1: [1, 2, 3, 4, 5],
// batch2: [6, 7, 8, 9, 10],
// restSum: 155 (11+12+...+20)
// }
Sink.zip — параллельная композиция
Базовый zip
Sink.zip запускает два Sink параллельно на одном потоке и комбинирует их результаты в кортеж. Под капотом поток broadcast’ится на две копии:
import { Sink, Console, Stream, Schedule, Effect } from "effect"
const stream = Stream.make("1", "2", "3", "4", "5").pipe(
Stream.schedule(Schedule.spaced("10 millis"))
)
const sink1 = Sink.forEach((s: string) =>
Console.log(`sink 1: ${s}`)
).pipe(Sink.as(1))
const sink2 = Sink.forEach((s: string) =>
Console.log(`sink 2: ${s}`)
).pipe(Sink.as(2))
// Оба Sink работают параллельно, результаты в кортеже
const combined = Sink.zip(sink1, sink2, { concurrent: true })
Effect.runPromise(Stream.run(stream, combined)).then(console.log)
// Output:
// sink 1: 1
// sink 2: 1
// sink 1: 2
// sink 2: 2
// ...
// [ 1, 2 ]
Опция concurrent
По умолчанию Sink.zip работает последовательно. Для параллельного выполнения нужно явно указать { concurrent: true }:
import { Sink, Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Последовательный zip (по умолчанию)
const sequential = Sink.zip(Sink.sum, Sink.count)
// Параллельный zip
const parallel = Sink.zip(Sink.sum, Sink.count, { concurrent: true })
Effect.runPromise(Stream.run(stream, parallel)).then(console.log)
// Output: [ 15, 5 ]
Sink.zipWith — zip с трансформацией
Sink.zipWith позволяет сразу применить функцию к результатам обоих Sink:
import { Sink, Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Вычисляем среднее: sum / count
const averageSink = Sink.zipWith(
Sink.sum,
Sink.count,
(sum, count) => count > 0 ? sum / count : 0,
{ concurrent: true }
)
Effect.runPromise(Stream.run(stream, averageSink)).then(console.log)
// Output: 3
Sink.zipLeft и Sink.zipRight
Для случаев, когда нужен результат только одного из Sink:
import { Sink, Console, Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// zipLeft: возвращает результат левого Sink, игнорируя правый
const sumOnly = Sink.zipLeft(
Sink.sum,
Sink.forEach((n: number) => Console.log(`Processed: ${n}`)),
{ concurrent: true }
)
Effect.runPromise(Stream.run(stream, sumOnly)).then(console.log)
// Output:
// Processed: 1
// Processed: 2
// ... (побочные эффекты от правого Sink)
// 15 (результат от левого Sink)
Множественный zip
Для объединения более двух Sink используйте вложенный zip или Sink.zipWith:
import { Sink, Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Три метрики параллельно
const metricsSink = Sink.zip(
Sink.zip(Sink.sum, Sink.count, { concurrent: true }),
Sink.zip(Sink.head<number>(), Sink.last<number>(), { concurrent: true }),
{ concurrent: true }
).pipe(
Sink.map(([[sum, count], [first, last]]) => ({
sum,
count,
average: count > 0 ? sum / count : 0,
first,
last
}))
)
Effect.runPromise(Stream.run(stream, metricsSink)).then(console.log)
// Output: { sum: 15, count: 5, average: 3, first: Some(1), last: Some(5) }
Sink.race — конкурентная композиция
Базовый race
Sink.race запускает два Sink параллельно, и побеждает первый завершившийся. Проигравший Sink отменяется:
import { Sink, Console, Stream, Schedule, Effect } from "effect"
const stream = Stream.make("1", "2", "3", "4", "5").pipe(
Stream.schedule(Schedule.spaced("10 millis"))
)
const sink1 = Sink.forEach((s: string) =>
Console.log(`sink 1: ${s}`)
).pipe(Sink.as(1))
const sink2 = Sink.forEach((s: string) =>
Console.log(`sink 2: ${s}`)
).pipe(Sink.as(2))
// Оба Sink обрабатывают все элементы, но кто первый — тот и победил
const racing = Sink.race(sink1, sink2)
Effect.runPromise(Stream.run(stream, racing)).then(console.log)
// Output: 1 (или 2, в зависимости от порядка завершения)
Практическое применение race
Race полезен когда у вас есть несколько стратегий потребления, и вы хотите использовать самую быструю:
import { Sink, Stream, Effect, Chunk } from "effect"
const dataStream = Stream.range(1, 1001) // 1..1000
// Стратегия 1: Собрать первые 100 элементов
const strategy1 = Sink.take<number>(100).pipe(
Sink.map((chunk) => ({
strategy: "take100" as const,
result: Chunk.size(chunk)
}))
)
// Стратегия 2: Считать сумму пока она < 5000
const strategy2 = Sink.fold(
0,
(sum: number) => sum < 5000,
(acc: number, n: number) => acc + n
).pipe(
Sink.map((sum) => ({
strategy: "sumTo5000" as const,
result: sum
}))
)
// Кто первый завершится — тот и победит
const winner = Sink.race(strategy1, strategy2)
Effect.runPromise(Stream.run(dataStream, winner)).then(console.log)
// strategy2 завершится раньше (при сумме ~5050 после ~100 элементов)
// или strategy1 (ровно после 100 элементов)
Типизация race
При использовании race типы результатов обоих Sink должны быть совместимы (union type):
import { Sink, Stream, Effect } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Оба Sink возвращают number — union type = number
const race1 = Sink.race(Sink.sum, Sink.count)
// Sink<number, number, ...>
// Разные типы — будет union
const race2 = Sink.race(
Sink.sum.pipe(Sink.map((n) => ({ type: "sum" as const, value: n }))),
Sink.count.pipe(Sink.map((n) => ({ type: "count" as const, value: n })))
)
// Sink<{ type: "sum", value: number } | { type: "count", value: number }, ...>
Дополнительные комбинаторы
Sink.mapEffect — эффективная трансформация результата
Sink.mapEffect позволяет применить эффект к результату Sink:
import { Stream, Sink, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
const sink = Sink.sum.pipe(
Sink.mapEffect((total) =>
Effect.gen(function* () {
yield* Console.log(`Computing result for total: ${total}`)
return total * 2
})
)
)
Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Computing result for total: 15
// 30
Sink.mapError — трансформация ошибки
import { Stream, Sink, Effect } from "effect"
class SinkError {
readonly _tag = "SinkError"
constructor(readonly message: string) {}
}
const failingSink = Sink.fail("raw error").pipe(
Sink.mapError((err) => new SinkError(err))
)
// Sink<never, unknown, never, SinkError, never>
Sink.orElse — fallback при ошибке
import { Stream, Sink, Effect } from "effect"
const stream = Stream.make(1, 2, 3)
// Если первый Sink завершится ошибкой, используем запасной
const primarySink = Sink.fail("primary failed" as const)
const withFallback = primarySink.pipe(
Sink.orElse(() => Sink.sum)
)
Effect.runPromise(Stream.run(stream, withFallback)).then(console.log)
// Output: 6 (fallback Sink.sum отработал)
Sink.unwrap и Sink.unwrapScoped — создание Sink из Effect
Sink.unwrap позволяет создать Sink, конфигурация которого зависит от эффекта:
import { Stream, Sink, Effect, Ref } from "effect"
const stream = Stream.make(1, 2, 3, 4, 5)
// Sink, поведение которого зависит от конфигурации
const dynamicSink = Sink.unwrap(
Effect.gen(function* () {
// Можно читать конфигурацию, обращаться к сервисам и т.д.
const threshold = 3
return Sink.fold(
0,
(sum: number) => sum <= threshold * 10,
(acc: number, n: number) => acc + n
)
})
)
Effect.runPromise(Stream.run(stream, dynamicSink)).then(console.log)
Sink.fromEffect — Sink из единичного Effect
Создаёт Sink, который не потребляет элементов, а просто выполняет Effect:
import { Stream, Sink, Effect, Console } from "effect"
const stream = Stream.make(1, 2, 3)
const setupSink = Sink.fromEffect(
Console.log("Sink initialized").pipe(
Effect.as(0)
)
)
Effect.runPromise(Stream.run(stream, setupSink)).then(console.log)
// Output:
// Sink initialized
// 0 (элементы потока не потреблены)
API Reference
Монадические комбинаторы
| Функция | Тип композиции | Описание |
|---|---|---|
flatMap(sink, f) | Последовательная | Цепочка Sink с передачей leftovers |
tap(sink, f) | Последовательная | flatMap без изменения результата |
Аппликативные комбинаторы
| Функция | Опция concurrent | Описание |
|---|---|---|
zip(a, b, opts?) | Да | Комбинирует результаты в кортеж |
zipWith(a, b, f, opts?) | Да | zip с трансформацией |
zipLeft(a, b, opts?) | Да | Результат левого Sink |
zipRight(a, b, opts?) | Да | Результат правого Sink |
Конкурентные комбинаторы
| Функция | Описание |
|---|---|
race(a, b) | Первый завершившийся побеждает |
Обработка ошибок
| Функция | Описание |
|---|---|
mapError(sink, f) | Трансформация ошибки |
orElse(sink, fallback) | Запасной Sink при ошибке |
Конструкторы из Effect
| Функция | Описание |
|---|---|
unwrap(effect) | Создаёт Sink из Effect |
fromEffect(effect) | Sink, выполняющий единичный Effect |
Примеры
💻 Пример 1: Парсер бинарного протокола
import { Stream, Sink, Effect, Chunk, Option } from "effect"
// Бинарный протокол:
// [1 byte: version] [2 bytes: length] [N bytes: payload] [4 bytes: checksum]
const rawBytes = Stream.make(
2, // version
0, 5, // length = 5
72, 101, 108, 108, 111, // payload = "Hello"
0, 0, 0, 42 // checksum
)
interface ParsedMessage {
readonly version: number
readonly payload: ReadonlyArray<number>
readonly checksum: number
}
// Последовательная композиция через flatMap
const protocolParser = Sink.head<number>().pipe(
Sink.flatMap((versionOpt) => {
const version = Option.getOrElse(versionOpt, () => 0)
// Читаем 2 байта длины
return Sink.take<number>(2).pipe(
Sink.flatMap((lengthBytes) => {
const arr = Chunk.toReadonlyArray(lengthBytes)
const length = (arr[0] ?? 0) * 256 + (arr[1] ?? 0)
// Читаем payload указанной длины
return Sink.take<number>(length).pipe(
Sink.flatMap((payload) =>
// Читаем 4 байта checksum
Sink.take<number>(4).pipe(
Sink.map((checksumBytes) => {
const csArr = Chunk.toReadonlyArray(checksumBytes)
const checksum = csArr.reduce(
(acc, b, i) => acc + b * Math.pow(256, 3 - i), 0
)
return {
version,
payload: Chunk.toReadonlyArray(payload),
checksum
} satisfies ParsedMessage
})
)
)
)
})
)
})
)
Effect.runPromise(Stream.run(rawBytes, protocolParser)).then(console.log)
// Output: { version: 2, payload: [72, 101, 108, 108, 111], checksum: 42 }
💻 Пример 2: Параллельные метрики с zip
import { Stream, Sink, Effect, Chunk, Option } from "effect"
interface SensorReading {
readonly sensorId: string
readonly value: number
readonly timestamp: number
}
const readings: Stream.Stream<SensorReading> = Stream.make(
{ sensorId: "temp-1", value: 22.5, timestamp: 1000 },
{ sensorId: "temp-2", value: 23.1, timestamp: 1001 },
{ sensorId: "temp-1", value: 22.8, timestamp: 1002 },
{ sensorId: "temp-2", value: 24.0, timestamp: 1003 },
{ sensorId: "temp-1", value: 21.9, timestamp: 1004 },
{ sensorId: "temp-2", value: 23.5, timestamp: 1005 }
)
// Три параллельных Sink для разных метрик
// 1. Средняя температура
const avgSink = Sink.zipWith(
Sink.foldLeft(0, (acc: number, r: SensorReading) => acc + r.value),
Sink.count,
(sum, count) => count > 0 ? sum / count : 0,
{ concurrent: true }
)
// 2. Максимальная температура
const maxSink = Sink.foldLeft(
-Infinity,
(max: number, r: SensorReading) => Math.max(max, r.value)
)
// 3. Количество уникальных сенсоров
const uniqueSensorsSink = Sink.collectAllToSet<SensorReading>().pipe(
Sink.mapInput((r: SensorReading) => r.sensorId),
// Нужно адаптировать — collectAllToSet работает с sensorId
Sink.map(() => 0) // placeholder
)
// Объединяем avg и max параллельно
const combinedMetrics = Sink.zipWith(
avgSink,
maxSink,
(avg, max) => ({ averageTemp: avg, maxTemp: max }),
{ concurrent: true }
)
Effect.runPromise(Stream.run(readings, combinedMetrics)).then(console.log)
// Output: { averageTemp: 22.96..., maxTemp: 24.0 }
💻 Пример 3: Обработка CSV с header + body
import { Stream, Sink, Effect, Chunk, Option } from "effect"
const csvStream = Stream.make(
"id,name,email",
"1,Alice,alice@example.com",
"2,Bob,bob@example.com",
"3,Charlie,charlie@example.com"
)
interface CsvData {
readonly headers: ReadonlyArray<string>
readonly rows: ReadonlyArray<ReadonlyArray<string>>
}
// flatMap: сначала читаем заголовок, потом все строки
const csvParser = Sink.head<string>().pipe(
Sink.flatMap((headerOpt) => {
const headerLine = Option.getOrElse(headerOpt, () => "")
const headers = headerLine.split(",")
return Sink.collectAll<string>().pipe(
Sink.map((bodyChunk) => ({
headers,
rows: Chunk.toReadonlyArray(bodyChunk).map((line) => line.split(","))
} satisfies CsvData))
)
})
)
Effect.runPromise(Stream.run(csvStream, csvParser)).then((csv) => {
console.log("Headers:", csv.headers)
csv.rows.forEach((row) => console.log("Row:", row))
})
// Output:
// Headers: [ 'id', 'name', 'email' ]
// Row: [ '1', 'Alice', 'alice@example.com' ]
// Row: [ '2', 'Bob', 'bob@example.com' ]
// Row: [ '3', 'Charlie', 'charlie@example.com' ]
Упражнения
🟢 Basic
Упражнение 1: Параллельные sum и count
Используйте Sink.zip с { concurrent: true } для одновременного вычисления суммы и количества элементов потока. Вычислите среднее.
import { Stream, Sink, Effect } from "effect"
const numbers = Stream.make(10, 20, 30, 40, 50)
const program = Effect.gen(function* () {
// Ваш код здесь
})
Effect.runPromise(program)
Решение:
import { Stream, Sink, Effect } from "effect"
const numbers = Stream.make(10, 20, 30, 40, 50)
const program = Effect.gen(function* () {
const [sum, count] = yield* Stream.run(
numbers,
Sink.zip(Sink.sum, Sink.count, { concurrent: true })
)
const average = count > 0 ? sum / count : 0
yield* Effect.log(`Sum: ${sum}, Count: ${count}, Average: ${average}`)
})
Effect.runPromise(program)
// Output: Sum: 150, Count: 5, Average: 30
Упражнение 2: Последовательное чтение
Используйте flatMap для чтения заголовка (1 элемент) и тела (все остальные) из потока строк.
Решение:
import { Stream, Sink, Effect, Option, Chunk } from "effect"
const lines = Stream.make("HEADER", "line1", "line2", "line3")
const program = Effect.gen(function* () {
const result = yield* Stream.run(
lines,
Sink.head<string>().pipe(
Sink.flatMap((headerOpt) => {
const header = Option.getOrElse(headerOpt, () => "")
return Sink.collectAll<string>().pipe(
Sink.map((body) => ({ header, body: Chunk.toReadonlyArray(body) }))
)
})
)
)
yield* Effect.log(`Header: ${result.header}`)
yield* Effect.log(`Body: ${result.body}`)
})
Effect.runPromise(program)
// Output:
// Header: HEADER
// Body: line1,line2,line3
🟡 Intermediate
Упражнение 3: Race — быстрый vs полный
Создайте два Sink:
- “Быстрый” — берёт первые 5 элементов и считает их сумму
- “Полный” — считает сумму всех элементов
Используйте race чтобы получить результат первого завершившегося.
Решение:
import { Stream, Sink, Effect, Chunk } from "effect"
const stream = Stream.range(1, 101) // 1..100
const fastSink = Sink.take<number>(5).pipe(
Sink.map((chunk) => ({
strategy: "fast" as const,
result: Chunk.toReadonlyArray(chunk).reduce((a, b) => a + b, 0)
}))
)
const fullSink = Sink.sum.pipe(
Sink.map((total) => ({
strategy: "full" as const,
result: total
}))
)
const program = Effect.gen(function* () {
const winner = yield* Stream.run(stream, Sink.race(fastSink, fullSink))
yield* Effect.log(`Winner: ${winner.strategy}, Result: ${winner.result}`)
})
Effect.runPromise(program)
// Output: Winner: fast, Result: 15
// (fast завершился после 5 элементов, а full ещё ждёт все 100)
🔴 Advanced
Упражнение 4: Протокол с переменной длиной
Реализуйте парсер потокового протокола, где каждое сообщение имеет формат:
- 1 элемент: тип сообщения (1 = text, 2 = binary)
- 1 элемент: длина payload
- N элементов: payload (N = длина)
Используйте flatMap + transduce для парсинга потока сообщений.
Решение:
import { Stream, Sink, Effect, Chunk, Option } from "effect"
interface Message {
readonly type: "text" | "binary"
readonly payload: ReadonlyArray<number>
}
const rawStream = Stream.make(
1, 3, 65, 66, 67, // text message: "ABC"
2, 2, 255, 0, // binary message: [255, 0]
1, 5, 72, 101, 108, 108, 111 // text message: "Hello"
)
const messageSink: Sink.Sink<Message, number, number, never, never> =
Sink.head<number>().pipe(
Sink.flatMap((typeOpt) => {
const typeNum = Option.getOrElse(typeOpt, () => 0)
const type = typeNum === 1 ? "text" as const : "binary" as const
return Sink.head<number>().pipe(
Sink.flatMap((lengthOpt) => {
const length = Option.getOrElse(lengthOpt, () => 0)
return Sink.take<number>(length).pipe(
Sink.map((payload) => ({
type,
payload: Chunk.toReadonlyArray(payload)
} satisfies Message))
)
})
)
})
)
// transduce применяет messageSink повторно
const messages = rawStream.pipe(Stream.transduce(messageSink))
const program = Effect.gen(function* () {
const result = yield* Stream.runCollect(messages)
Chunk.toReadonlyArray(result).forEach((msg, i) => {
const display = msg.type === "text"
? String.fromCharCode(...msg.payload)
: msg.payload.join(", ")
console.log(`Message ${i}: [${msg.type}] ${display}`)
})
})
Effect.runPromise(program)
// Output:
// Message 0: [text] ABC
// Message 1: [binary] 255, 0
// Message 2: [text] Hello
🔗 Далее: Создание собственных Sink →