Effect Курс Комбинаторы Sink
Глава

Комбинаторы Sink

Композиция потребителей — параллельное, последовательное и конкурентное потребление потоков через алгебраические комбинаторы.

Теория

Три режима композиции

Комбинаторы Sink реализуют три фундаментальных паттерна композиции потребителей:

┌──────────────────────────────────────────────────────┐
│                  Композиция Sink                     │
├──────────────┬──────────────┬────────────────────────┤
│  flatMap     │  zip         │  race                  │
│  (sequential)│  (parallel)  │  (competitive)         │
├──────────────┼──────────────┼────────────────────────┤
│  Sink A ──>  │  Sink A ═══> │  Sink A ═══>           │
│  then Sink B │  Sink B ═══> │  Sink B ═══>           │
│              │  combine     │  first wins            │
├──────────────┼──────────────┼────────────────────────┤
│  Leftovers   │  Broadcast   │  Cancel loser          │
│  pass to     │  stream to   │  take winner           │
│  next sink   │  both sinks  │  result                │
└──────────────┴──────────────┴────────────────────────┘
  • flatMapпоследовательная: первый Sink потребляет часть потока, его leftovers передаются второму Sink. Позволяет строить зависимые цепочки.
  • zipпараллельная: оба Sink работают одновременно над одним потоком (через broadcast). Результаты объединяются в кортеж.
  • raceконкурентная: оба Sink работают одновременно, побеждает первый завершившийся. Проигравший отменяется.

Алгебраические свойства

Комбинаторы Sink обладают важными алгебраическими свойствами:

flatMap — ассоциативность (монада):

flatMap(flatMap(s, f), g) ≡ flatMap(s, x => flatMap(f(x), g))

zip — коммутативность результата при concurrent:

zip(a, b) ≡ map(zip(b, a), ([b, a]) => [a, b])

race — идемпотентность:

race(a, a) ≡ a   (один и тот же Sink всегда финиширует одновременно)

Концепция ФП

Монадическая композиция через flatMap

flatMap для Sink реализует монадическое связывание (bind, >>=). В контексте потоков это означает: результат первого потребителя определяет, какой потребитель будет следующим.

Монада Sink:

return a = Sink.succeed(a)            -- Sink, немедленно возвращающий a
m >>= f  = Sink.flatMap(m, f)         -- Связывание

                ┌─────────┐
Stream: [1, 2, 3, 4, 5, 6, 7, 8]
                │         │
         ┌──────────┐    │
         │ Sink A   │    │
         │ take(3)  │    │
         └────┬─────┘    │
              │ result: Chunk(1,2,3)
              │ leftovers: [4, 5, 6, 7, 8]

         f(Chunk(1,2,3)) → Sink B
         ┌──────────────┐
         │ Sink B       │
         │ take(2)      │  ← потребляет leftovers
         └──────┬───────┘
                │ result: Chunk(4, 5)
                │ leftovers: [6, 7, 8]

         Final: Chunk(4, 5)

Ключевое свойство: функция f имеет доступ к результату первого Sink и может конструировать второй Sink на его основе. Это делает flatMap строго мощнее, чем zip.

Аппликативная композиция через zip

zip реализует аппликативную композицию — оба Sink работают независимо друг от друга. В отличие от flatMap, второй Sink не зависит от результата первого:

Аппликатив:
<*> : Sink<A → B> × Sink<A> → Sink<B>

zip = liftA2((,))   -- поднимает конструктор кортежа
zip(sa, sb) = Sink<[A, B]>

Семигруппа через race

race определяет семигруппу на Sink: ассоциативная бинарная операция выбора “первого завершившегося”:

race(a, race(b, c)) ≡ race(race(a, b), c)   -- ассоциативность

Это не коммутативная операция в строгом смысле: при одновременном завершении выбор может быть недетерминированным.


Sink.flatMap — последовательная композиция

Базовый flatMap

flatMap позволяет цепочку потребителей, где каждый следующий зависит от результата предыдущего:

import { Stream, Sink, Effect, Chunk, Option } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

// Шаг 1: берём первые 3 элемента
// Шаг 2: на основе их суммы решаем, сколько ещё взять
const adaptiveSink = Sink.take<number>(3).pipe(
  Sink.flatMap((firstThree) => {
    const sum = Chunk.toReadonlyArray(firstThree).reduce((a, b) => a + b, 0)
    // Если сумма > 5, берём ещё 2, иначе — ещё 4
    const remaining = sum > 5 ? 2 : 4
    return Sink.take<number>(remaining)
  })
)

Effect.runPromise(Stream.run(stream, adaptiveSink)).then(console.log)
// firstThree = [1, 2, 3], sum = 6 > 5, берём ещё 2
// Output: { _id: 'Chunk', values: [ 4, 5 ] }

Механика передачи leftovers

Когда первый Sink в flatMap завершается, его leftovers автоматически становятся входом для следующего Sink:

import { Stream, Sink, Effect, Chunk } from "effect"

const stream = Stream.range(1, 11) // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

// Цепочка: take(3) → foldLeft → вернуть сумму остатка
const chainedSink = Sink.take<number>(3).pipe(
  Sink.flatMap((taken) => {
    // taken = Chunk(1, 2, 3)
    // Leftovers = [4, 5, 6, 7, 8, 9, 10]
    // Второй Sink потребляет leftovers
    return Sink.foldLeft(0, (acc: number, n: number) => acc + n).pipe(
      Sink.map((restSum) => ({
        taken: Chunk.toReadonlyArray(taken),
        restSum
      }))
    )
  })
)

Effect.runPromise(Stream.run(stream, chainedSink)).then(console.log)
// Output: { taken: [ 1, 2, 3 ], restSum: 49 }
// restSum = 4+5+6+7+8+9+10 = 49

Многошаговые цепочки

flatMap можно вызывать последовательно, строя длинные цепочки потребителей:

import { Stream, Sink, Effect, Option } from "effect"

const stream = Stream.make("header", "v1", "Alice", "30", "Bob", "25")

// Парсинг протокола: header → version → пары (name, age)
const protocolSink = Sink.head<string>().pipe(
  Sink.flatMap((header) =>
    Sink.head<string>().pipe(
      Sink.flatMap((version) =>
        Sink.collectAll<string>().pipe(
          Sink.map((data) => ({
            header: Option.getOrElse(header, () => "unknown"),
            version: Option.getOrElse(version, () => "0"),
            records: data
          }))
        )
      )
    )
  )
)

Effect.runPromise(Stream.run(stream, protocolSink)).then(console.log)
// Output:
// {
//   header: "header",
//   version: "v1",
//   records: Chunk("Alice", "30", "Bob", "25")
// }

flatMap через Effect.gen

Для удобства можно использовать Effect.gen-стиль через вложенные flatMap:

import { Stream, Sink, Effect, Option, Chunk, pipe } from "effect"

const stream = Stream.range(1, 21) // 1..20

const multiStepSink = pipe(
  Sink.take<number>(5),
  Sink.flatMap((first5) =>
    pipe(
      Sink.take<number>(5),
      Sink.flatMap((second5) =>
        pipe(
          Sink.sum,
          Sink.map((restSum) => ({
            batch1: Chunk.toReadonlyArray(first5),
            batch2: Chunk.toReadonlyArray(second5),
            restSum
          }))
        )
      )
    )
  )
)

Effect.runPromise(Stream.run(stream, multiStepSink)).then(console.log)
// Output:
// {
//   batch1: [1, 2, 3, 4, 5],
//   batch2: [6, 7, 8, 9, 10],
//   restSum: 155  (11+12+...+20)
// }

Sink.zip — параллельная композиция

Базовый zip

Sink.zip запускает два Sink параллельно на одном потоке и комбинирует их результаты в кортеж. Под капотом поток broadcast’ится на две копии:

import { Sink, Console, Stream, Schedule, Effect } from "effect"

const stream = Stream.make("1", "2", "3", "4", "5").pipe(
  Stream.schedule(Schedule.spaced("10 millis"))
)

const sink1 = Sink.forEach((s: string) =>
  Console.log(`sink 1: ${s}`)
).pipe(Sink.as(1))

const sink2 = Sink.forEach((s: string) =>
  Console.log(`sink 2: ${s}`)
).pipe(Sink.as(2))

// Оба Sink работают параллельно, результаты в кортеже
const combined = Sink.zip(sink1, sink2, { concurrent: true })

Effect.runPromise(Stream.run(stream, combined)).then(console.log)
// Output:
// sink 1: 1
// sink 2: 1
// sink 1: 2
// sink 2: 2
// ...
// [ 1, 2 ]

Опция concurrent

По умолчанию Sink.zip работает последовательно. Для параллельного выполнения нужно явно указать { concurrent: true }:

import { Sink, Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Последовательный zip (по умолчанию)
const sequential = Sink.zip(Sink.sum, Sink.count)

// Параллельный zip
const parallel = Sink.zip(Sink.sum, Sink.count, { concurrent: true })

Effect.runPromise(Stream.run(stream, parallel)).then(console.log)
// Output: [ 15, 5 ]

Sink.zipWith — zip с трансформацией

Sink.zipWith позволяет сразу применить функцию к результатам обоих Sink:

import { Sink, Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Вычисляем среднее: sum / count
const averageSink = Sink.zipWith(
  Sink.sum,
  Sink.count,
  (sum, count) => count > 0 ? sum / count : 0,
  { concurrent: true }
)

Effect.runPromise(Stream.run(stream, averageSink)).then(console.log)
// Output: 3

Sink.zipLeft и Sink.zipRight

Для случаев, когда нужен результат только одного из Sink:

import { Sink, Console, Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// zipLeft: возвращает результат левого Sink, игнорируя правый
const sumOnly = Sink.zipLeft(
  Sink.sum,
  Sink.forEach((n: number) => Console.log(`Processed: ${n}`)),
  { concurrent: true }
)

Effect.runPromise(Stream.run(stream, sumOnly)).then(console.log)
// Output:
// Processed: 1
// Processed: 2
// ... (побочные эффекты от правого Sink)
// 15 (результат от левого Sink)

Множественный zip

Для объединения более двух Sink используйте вложенный zip или Sink.zipWith:

import { Sink, Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Три метрики параллельно
const metricsSink = Sink.zip(
  Sink.zip(Sink.sum, Sink.count, { concurrent: true }),
  Sink.zip(Sink.head<number>(), Sink.last<number>(), { concurrent: true }),
  { concurrent: true }
).pipe(
  Sink.map(([[sum, count], [first, last]]) => ({
    sum,
    count,
    average: count > 0 ? sum / count : 0,
    first,
    last
  }))
)

Effect.runPromise(Stream.run(stream, metricsSink)).then(console.log)
// Output: { sum: 15, count: 5, average: 3, first: Some(1), last: Some(5) }

Sink.race — конкурентная композиция

Базовый race

Sink.race запускает два Sink параллельно, и побеждает первый завершившийся. Проигравший Sink отменяется:

import { Sink, Console, Stream, Schedule, Effect } from "effect"

const stream = Stream.make("1", "2", "3", "4", "5").pipe(
  Stream.schedule(Schedule.spaced("10 millis"))
)

const sink1 = Sink.forEach((s: string) =>
  Console.log(`sink 1: ${s}`)
).pipe(Sink.as(1))

const sink2 = Sink.forEach((s: string) =>
  Console.log(`sink 2: ${s}`)
).pipe(Sink.as(2))

// Оба Sink обрабатывают все элементы, но кто первый — тот и победил
const racing = Sink.race(sink1, sink2)

Effect.runPromise(Stream.run(stream, racing)).then(console.log)
// Output: 1 (или 2, в зависимости от порядка завершения)

Практическое применение race

Race полезен когда у вас есть несколько стратегий потребления, и вы хотите использовать самую быструю:

import { Sink, Stream, Effect, Chunk } from "effect"

const dataStream = Stream.range(1, 1001) // 1..1000

// Стратегия 1: Собрать первые 100 элементов
const strategy1 = Sink.take<number>(100).pipe(
  Sink.map((chunk) => ({
    strategy: "take100" as const,
    result: Chunk.size(chunk)
  }))
)

// Стратегия 2: Считать сумму пока она < 5000
const strategy2 = Sink.fold(
  0,
  (sum: number) => sum < 5000,
  (acc: number, n: number) => acc + n
).pipe(
  Sink.map((sum) => ({
    strategy: "sumTo5000" as const,
    result: sum
  }))
)

// Кто первый завершится — тот и победит
const winner = Sink.race(strategy1, strategy2)

Effect.runPromise(Stream.run(dataStream, winner)).then(console.log)
// strategy2 завершится раньше (при сумме ~5050 после ~100 элементов)
// или strategy1 (ровно после 100 элементов)

Типизация race

При использовании race типы результатов обоих Sink должны быть совместимы (union type):

import { Sink, Stream, Effect } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Оба Sink возвращают number — union type = number
const race1 = Sink.race(Sink.sum, Sink.count)
// Sink<number, number, ...>

// Разные типы — будет union
const race2 = Sink.race(
  Sink.sum.pipe(Sink.map((n) => ({ type: "sum" as const, value: n }))),
  Sink.count.pipe(Sink.map((n) => ({ type: "count" as const, value: n })))
)
// Sink<{ type: "sum", value: number } | { type: "count", value: number }, ...>

Дополнительные комбинаторы

Sink.mapEffect — эффективная трансформация результата

Sink.mapEffect позволяет применить эффект к результату Sink:

import { Stream, Sink, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

const sink = Sink.sum.pipe(
  Sink.mapEffect((total) =>
    Effect.gen(function* () {
      yield* Console.log(`Computing result for total: ${total}`)
      return total * 2
    })
  )
)

Effect.runPromise(Stream.run(stream, sink)).then(console.log)
// Output:
// Computing result for total: 15
// 30

Sink.mapError — трансформация ошибки

import { Stream, Sink, Effect } from "effect"

class SinkError {
  readonly _tag = "SinkError"
  constructor(readonly message: string) {}
}

const failingSink = Sink.fail("raw error").pipe(
  Sink.mapError((err) => new SinkError(err))
)
// Sink<never, unknown, never, SinkError, never>

Sink.orElse — fallback при ошибке

import { Stream, Sink, Effect } from "effect"

const stream = Stream.make(1, 2, 3)

// Если первый Sink завершится ошибкой, используем запасной
const primarySink = Sink.fail("primary failed" as const)

const withFallback = primarySink.pipe(
  Sink.orElse(() => Sink.sum)
)

Effect.runPromise(Stream.run(stream, withFallback)).then(console.log)
// Output: 6 (fallback Sink.sum отработал)

Sink.unwrap и Sink.unwrapScoped — создание Sink из Effect

Sink.unwrap позволяет создать Sink, конфигурация которого зависит от эффекта:

import { Stream, Sink, Effect, Ref } from "effect"

const stream = Stream.make(1, 2, 3, 4, 5)

// Sink, поведение которого зависит от конфигурации
const dynamicSink = Sink.unwrap(
  Effect.gen(function* () {
    // Можно читать конфигурацию, обращаться к сервисам и т.д.
    const threshold = 3
    return Sink.fold(
      0,
      (sum: number) => sum <= threshold * 10,
      (acc: number, n: number) => acc + n
    )
  })
)

Effect.runPromise(Stream.run(stream, dynamicSink)).then(console.log)

Sink.fromEffect — Sink из единичного Effect

Создаёт Sink, который не потребляет элементов, а просто выполняет Effect:

import { Stream, Sink, Effect, Console } from "effect"

const stream = Stream.make(1, 2, 3)

const setupSink = Sink.fromEffect(
  Console.log("Sink initialized").pipe(
    Effect.as(0)
  )
)

Effect.runPromise(Stream.run(stream, setupSink)).then(console.log)
// Output:
// Sink initialized
// 0 (элементы потока не потреблены)

API Reference

Монадические комбинаторы

ФункцияТип композицииОписание
flatMap(sink, f)ПоследовательнаяЦепочка Sink с передачей leftovers
tap(sink, f)ПоследовательнаяflatMap без изменения результата

Аппликативные комбинаторы

ФункцияОпция concurrentОписание
zip(a, b, opts?)ДаКомбинирует результаты в кортеж
zipWith(a, b, f, opts?)Даzip с трансформацией
zipLeft(a, b, opts?)ДаРезультат левого Sink
zipRight(a, b, opts?)ДаРезультат правого Sink

Конкурентные комбинаторы

ФункцияОписание
race(a, b)Первый завершившийся побеждает

Обработка ошибок

ФункцияОписание
mapError(sink, f)Трансформация ошибки
orElse(sink, fallback)Запасной Sink при ошибке

Конструкторы из Effect

ФункцияОписание
unwrap(effect)Создаёт Sink из Effect
fromEffect(effect)Sink, выполняющий единичный Effect

Примеры

💻 Пример 1: Парсер бинарного протокола

import { Stream, Sink, Effect, Chunk, Option } from "effect"

// Бинарный протокол:
// [1 byte: version] [2 bytes: length] [N bytes: payload] [4 bytes: checksum]

const rawBytes = Stream.make(
  2,           // version
  0, 5,        // length = 5
  72, 101, 108, 108, 111,  // payload = "Hello"
  0, 0, 0, 42  // checksum
)

interface ParsedMessage {
  readonly version: number
  readonly payload: ReadonlyArray<number>
  readonly checksum: number
}

// Последовательная композиция через flatMap
const protocolParser = Sink.head<number>().pipe(
  Sink.flatMap((versionOpt) => {
    const version = Option.getOrElse(versionOpt, () => 0)

    // Читаем 2 байта длины
    return Sink.take<number>(2).pipe(
      Sink.flatMap((lengthBytes) => {
        const arr = Chunk.toReadonlyArray(lengthBytes)
        const length = (arr[0] ?? 0) * 256 + (arr[1] ?? 0)

        // Читаем payload указанной длины
        return Sink.take<number>(length).pipe(
          Sink.flatMap((payload) =>
            // Читаем 4 байта checksum
            Sink.take<number>(4).pipe(
              Sink.map((checksumBytes) => {
                const csArr = Chunk.toReadonlyArray(checksumBytes)
                const checksum = csArr.reduce(
                  (acc, b, i) => acc + b * Math.pow(256, 3 - i), 0
                )
                return {
                  version,
                  payload: Chunk.toReadonlyArray(payload),
                  checksum
                } satisfies ParsedMessage
              })
            )
          )
        )
      })
    )
  })
)

Effect.runPromise(Stream.run(rawBytes, protocolParser)).then(console.log)
// Output: { version: 2, payload: [72, 101, 108, 108, 111], checksum: 42 }

💻 Пример 2: Параллельные метрики с zip

import { Stream, Sink, Effect, Chunk, Option } from "effect"

interface SensorReading {
  readonly sensorId: string
  readonly value: number
  readonly timestamp: number
}

const readings: Stream.Stream<SensorReading> = Stream.make(
  { sensorId: "temp-1", value: 22.5, timestamp: 1000 },
  { sensorId: "temp-2", value: 23.1, timestamp: 1001 },
  { sensorId: "temp-1", value: 22.8, timestamp: 1002 },
  { sensorId: "temp-2", value: 24.0, timestamp: 1003 },
  { sensorId: "temp-1", value: 21.9, timestamp: 1004 },
  { sensorId: "temp-2", value: 23.5, timestamp: 1005 }
)

// Три параллельных Sink для разных метрик

// 1. Средняя температура
const avgSink = Sink.zipWith(
  Sink.foldLeft(0, (acc: number, r: SensorReading) => acc + r.value),
  Sink.count,
  (sum, count) => count > 0 ? sum / count : 0,
  { concurrent: true }
)

// 2. Максимальная температура
const maxSink = Sink.foldLeft(
  -Infinity,
  (max: number, r: SensorReading) => Math.max(max, r.value)
)

// 3. Количество уникальных сенсоров
const uniqueSensorsSink = Sink.collectAllToSet<SensorReading>().pipe(
  Sink.mapInput((r: SensorReading) => r.sensorId),
  // Нужно адаптировать — collectAllToSet работает с sensorId
  Sink.map(() => 0) // placeholder
)

// Объединяем avg и max параллельно
const combinedMetrics = Sink.zipWith(
  avgSink,
  maxSink,
  (avg, max) => ({ averageTemp: avg, maxTemp: max }),
  { concurrent: true }
)

Effect.runPromise(Stream.run(readings, combinedMetrics)).then(console.log)
// Output: { averageTemp: 22.96..., maxTemp: 24.0 }

💻 Пример 3: Обработка CSV с header + body

import { Stream, Sink, Effect, Chunk, Option } from "effect"

const csvStream = Stream.make(
  "id,name,email",
  "1,Alice,alice@example.com",
  "2,Bob,bob@example.com",
  "3,Charlie,charlie@example.com"
)

interface CsvData {
  readonly headers: ReadonlyArray<string>
  readonly rows: ReadonlyArray<ReadonlyArray<string>>
}

// flatMap: сначала читаем заголовок, потом все строки
const csvParser = Sink.head<string>().pipe(
  Sink.flatMap((headerOpt) => {
    const headerLine = Option.getOrElse(headerOpt, () => "")
    const headers = headerLine.split(",")

    return Sink.collectAll<string>().pipe(
      Sink.map((bodyChunk) => ({
        headers,
        rows: Chunk.toReadonlyArray(bodyChunk).map((line) => line.split(","))
      } satisfies CsvData))
    )
  })
)

Effect.runPromise(Stream.run(csvStream, csvParser)).then((csv) => {
  console.log("Headers:", csv.headers)
  csv.rows.forEach((row) => console.log("Row:", row))
})
// Output:
// Headers: [ 'id', 'name', 'email' ]
// Row: [ '1', 'Alice', 'alice@example.com' ]
// Row: [ '2', 'Bob', 'bob@example.com' ]
// Row: [ '3', 'Charlie', 'charlie@example.com' ]

Упражнения

🟢 Basic

Упражнение 1: Параллельные sum и count

Используйте Sink.zip с { concurrent: true } для одновременного вычисления суммы и количества элементов потока. Вычислите среднее.

import { Stream, Sink, Effect } from "effect"

const numbers = Stream.make(10, 20, 30, 40, 50)

const program = Effect.gen(function* () {
  // Ваш код здесь
})

Effect.runPromise(program)

Решение:

import { Stream, Sink, Effect } from "effect"

const numbers = Stream.make(10, 20, 30, 40, 50)

const program = Effect.gen(function* () {
  const [sum, count] = yield* Stream.run(
    numbers,
    Sink.zip(Sink.sum, Sink.count, { concurrent: true })
  )

  const average = count > 0 ? sum / count : 0
  yield* Effect.log(`Sum: ${sum}, Count: ${count}, Average: ${average}`)
})

Effect.runPromise(program)
// Output: Sum: 150, Count: 5, Average: 30

Упражнение 2: Последовательное чтение

Используйте flatMap для чтения заголовка (1 элемент) и тела (все остальные) из потока строк.

Решение:

import { Stream, Sink, Effect, Option, Chunk } from "effect"

const lines = Stream.make("HEADER", "line1", "line2", "line3")

const program = Effect.gen(function* () {
  const result = yield* Stream.run(
    lines,
    Sink.head<string>().pipe(
      Sink.flatMap((headerOpt) => {
        const header = Option.getOrElse(headerOpt, () => "")
        return Sink.collectAll<string>().pipe(
          Sink.map((body) => ({ header, body: Chunk.toReadonlyArray(body) }))
        )
      })
    )
  )

  yield* Effect.log(`Header: ${result.header}`)
  yield* Effect.log(`Body: ${result.body}`)
})

Effect.runPromise(program)
// Output:
// Header: HEADER
// Body: line1,line2,line3

🟡 Intermediate

Упражнение 3: Race — быстрый vs полный

Создайте два Sink:

  1. “Быстрый” — берёт первые 5 элементов и считает их сумму
  2. “Полный” — считает сумму всех элементов

Используйте race чтобы получить результат первого завершившегося.

Решение:

import { Stream, Sink, Effect, Chunk } from "effect"

const stream = Stream.range(1, 101) // 1..100

const fastSink = Sink.take<number>(5).pipe(
  Sink.map((chunk) => ({
    strategy: "fast" as const,
    result: Chunk.toReadonlyArray(chunk).reduce((a, b) => a + b, 0)
  }))
)

const fullSink = Sink.sum.pipe(
  Sink.map((total) => ({
    strategy: "full" as const,
    result: total
  }))
)

const program = Effect.gen(function* () {
  const winner = yield* Stream.run(stream, Sink.race(fastSink, fullSink))
  yield* Effect.log(`Winner: ${winner.strategy}, Result: ${winner.result}`)
})

Effect.runPromise(program)
// Output: Winner: fast, Result: 15
// (fast завершился после 5 элементов, а full ещё ждёт все 100)

🔴 Advanced

Упражнение 4: Протокол с переменной длиной

Реализуйте парсер потокового протокола, где каждое сообщение имеет формат:

  • 1 элемент: тип сообщения (1 = text, 2 = binary)
  • 1 элемент: длина payload
  • N элементов: payload (N = длина)

Используйте flatMap + transduce для парсинга потока сообщений.

Решение:

import { Stream, Sink, Effect, Chunk, Option } from "effect"

interface Message {
  readonly type: "text" | "binary"
  readonly payload: ReadonlyArray<number>
}

const rawStream = Stream.make(
  1, 3, 65, 66, 67,     // text message: "ABC"
  2, 2, 255, 0,          // binary message: [255, 0]
  1, 5, 72, 101, 108, 108, 111  // text message: "Hello"
)

const messageSink: Sink.Sink<Message, number, number, never, never> =
  Sink.head<number>().pipe(
    Sink.flatMap((typeOpt) => {
      const typeNum = Option.getOrElse(typeOpt, () => 0)
      const type = typeNum === 1 ? "text" as const : "binary" as const

      return Sink.head<number>().pipe(
        Sink.flatMap((lengthOpt) => {
          const length = Option.getOrElse(lengthOpt, () => 0)

          return Sink.take<number>(length).pipe(
            Sink.map((payload) => ({
              type,
              payload: Chunk.toReadonlyArray(payload)
            } satisfies Message))
          )
        })
      )
    })
  )

// transduce применяет messageSink повторно
const messages = rawStream.pipe(Stream.transduce(messageSink))

const program = Effect.gen(function* () {
  const result = yield* Stream.runCollect(messages)
  Chunk.toReadonlyArray(result).forEach((msg, i) => {
    const display = msg.type === "text"
      ? String.fromCharCode(...msg.payload)
      : msg.payload.join(", ")
    console.log(`Message ${i}: [${msg.type}] ${display}`)
  })
})

Effect.runPromise(program)
// Output:
// Message 0: [text] ABC
// Message 1: [binary] 255, 0
// Message 2: [text] Hello

🔗 Далее: Создание собственных Sink →