Effect Курс Создание собственных Sink
Глава

Создание собственных Sink

Паттерны конструирования пользовательских потребителей потоков — от простых обёрток над fold до сложных stateful Sink с зависимостями.

Теория

Зачем создавать свои Sink

Встроенные Sink покрывают большинство типовых сценариев: сбор, свёртка, подсчёт. Но production-системы требуют специализированных потребителей:

  • Агрегация с бизнес-логикой — вычисление метрик, специфичных для домена (percentile, moving average, custom scoring)
  • Sink с побочными эффектами — запись в БД, отправка в очередь, HTTP-вызовы
  • Sink с зависимостями — потребитель, требующий сервисы из контекста (Logger, MetricsService, DatabaseClient)
  • Sink с ресурсами — потребитель, управляющий жизненным циклом ресурсов (файловые дескрипторы, сетевые соединения)
  • Валидирующие Sink — потребитель с проверкой инвариантов и ранним завершением по ошибке

Стратегии конструирования

Существует несколько подходов к созданию пользовательских Sink, от простого к сложному:

Стратегии создания Sink

├── 1. Композиция встроенных (самый простой)
│   └── pipe(Sink.sum, Sink.map(...), Sink.filterInput(...))

├── 2. fold / foldLeft (аккумулятор)
│   └── Sink.foldLeft(initial, reducer)

├── 3. forEach + Ref (stateful с побочными эффектами)
│   └── Sink.unwrap(Ref.make(...).pipe(...))

├── 4. flatMap цепочки (протоколы, парсеры)
│   └── Sink.head().pipe(Sink.flatMap(...))

├── 5. Sink.fromEffect / Sink.unwrap (эффективное конструирование)
│   └── Sink.unwrap(Effect.gen(...))

└── 6. Sink.unwrapScoped (с управлением ресурсами)
    └── Sink.unwrapScoped(Effect.acquireRelease(...))

Концепция ФП

Sink как Free Monad

Пользовательские Sink можно рассматривать как программы в свободной монаде потребления. Каждая операция — это инструкция:

  • Consume(n) — потребить n элементов
  • Emit(result) — завершиться с результатом
  • Fail(error) — завершиться ошибкой
  • Access(service) — получить зависимость

Комбинаторы (flatMap, map, zip) интерпретируют эти инструкции, создавая конкретные стратегии потребления.

Church-кодирование потребителей

Другой взгляд — Sink как Church-кодированный тип данных, определяемый через его элиминатор:

-- Sink определяется тем, как его "разрушить"
type Sink a = forall r.
  (a -> r)           -- обработка успешного результата
  -> (e -> r)        -- обработка ошибки
  -> (In -> Sink a)  -- потребление следующего элемента
  -> r

Это означает, что любой Sink полностью определяется тремя вещами:

  1. Как обработать финальный результат
  2. Как обработать ошибку
  3. Как обработать очередной входной элемент

Алгебра F-coalgebra для Sink

Sink можно формализовать как коалгебру — двойственное к алгебре понятие:

Алгебра (fold):   F(A) → A    (уничтожает структуру)
Коалгебра (unfold): A → F(A)  (генерирует структуру)

Sink-коалгебра:
  step: State → Input → State + Result + Error

Каждый шаг Sink — это функция из текущего состояния и входного элемента в новое состояние (или результат, или ошибку). Это в точности описывает fold.


Конструирование через fold

Паттерн: fold как универсальный конструктор

Большинство пользовательских Sink можно выразить через foldLeft или fold:

import { Stream, Sink, Effect } from "effect"

// ── Среднее значение через foldLeft ────────────────────
interface AvgState {
  readonly sum: number
  readonly count: number
}

const averageSink: Sink.Sink<number, number, never, never, never> =
  Sink.foldLeft(
    { sum: 0, count: 0 } satisfies AvgState,
    (state, n: number) => ({
      sum: state.sum + n,
      count: state.count + 1
    })
  ).pipe(
    Sink.map(({ sum, count }) => count > 0 ? sum / count : 0)
  )

const stream = Stream.make(10, 20, 30, 40, 50)

Effect.runPromise(Stream.run(stream, averageSink)).then(console.log)
// Output: 30

Паттерн: fold с ранним завершением

import { Stream, Sink, Effect } from "effect"

// Sink, который ищет первый элемент, удовлетворяющий предикату
const findFirst = <A>(
  predicate: (a: A) => boolean
): Sink.Sink<A | null, A, A, never, never> =>
  Sink.fold(
    null as A | null,
    (result) => result === null,  // Продолжаем пока не нашли
    (_found, element: A) => predicate(element) ? element : null
  )

const stream = Stream.make(1, 3, 5, 8, 11, 14)

Effect.runPromise(
  Stream.run(stream, findFirst<number>((n) => n > 10))
).then(console.log)
// Output: 11

Паттерн: foldLeft с аккумулятором-записью

import { Stream, Sink, Effect } from "effect"

// Полная статистика потока за один проход
interface Stats {
  readonly min: number
  readonly max: number
  readonly sum: number
  readonly count: number
  readonly sumOfSquares: number
}

const statsSink: Sink.Sink<Stats & { readonly avg: number; readonly stddev: number }, number, never, never, never> =
  Sink.foldLeft(
    {
      min: Infinity,
      max: -Infinity,
      sum: 0,
      count: 0,
      sumOfSquares: 0
    } satisfies Stats,
    (stats, n: number): Stats => ({
      min: Math.min(stats.min, n),
      max: Math.max(stats.max, n),
      sum: stats.sum + n,
      count: stats.count + 1,
      sumOfSquares: stats.sumOfSquares + n * n
    })
  ).pipe(
    Sink.map((s) => {
      const avg = s.count > 0 ? s.sum / s.count : 0
      const variance = s.count > 0
        ? s.sumOfSquares / s.count - avg * avg
        : 0
      return { ...s, avg, stddev: Math.sqrt(variance) }
    })
  )

const stream = Stream.make(4, 8, 15, 16, 23, 42)

Effect.runPromise(Stream.run(stream, statsSink)).then(console.log)
// Output: { min: 4, max: 42, sum: 108, count: 6, avg: 18, stddev: ~12.34, ... }

Паттерн: Running window через fold

import { Stream, Sink, Chunk, Effect } from "effect"

// Скользящее окно размера N с вычислением среднего
const movingAverageSink = (windowSize: number) =>
  Sink.foldLeft(
    {
      window: Chunk.empty<number>(),
      averages: Chunk.empty<number>()
    },
    (state, n: number) => {
      const newWindow = Chunk.size(state.window) >= windowSize
        ? Chunk.append(Chunk.drop(state.window, 1), n)
        : Chunk.append(state.window, n)

      const avg = Chunk.size(newWindow) >= windowSize
        ? Chunk.reduce(newWindow, 0, (a, b) => a + b) / windowSize
        : NaN

      const newAverages = isNaN(avg)
        ? state.averages
        : Chunk.append(state.averages, avg)

      return { window: newWindow, averages: newAverages }
    }
  ).pipe(
    Sink.map((state) => Chunk.toReadonlyArray(state.averages))
  )

const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Effect.runPromise(
  Stream.run(stream, movingAverageSink(3))
).then(console.log)
// Output: [2, 3, 4, 5, 6, 7, 8, 9]
// (скользящее среднее по окну размера 3)

Конструирование через forEach

Паттерн: forEach + Ref для stateful обработки

Когда нужны побочные эффекты на каждый элемент с накоплением состояния:

import { Stream, Sink, Effect, Ref } from "effect"

// Sink, который считает и логирует каждый N-й элемент
const everyNthLogger = (n: number) =>
  Sink.unwrap(
    Effect.gen(function* () {
      const counter = yield* Ref.make(0)

      return Sink.forEach((element: number) =>
        Effect.gen(function* () {
          const current = yield* Ref.updateAndGet(counter, (c) => c + 1)
          if (current % n === 0) {
            yield* Effect.log(`[${current}] element: ${element}`)
          }
        })
      ).pipe(
        Sink.mapEffect(() => Ref.get(counter))
      )
    })
  )

const stream = Stream.range(1, 21) // 1..20

Effect.runPromise(Stream.run(stream, everyNthLogger(5))).then(console.log)
// Output:
// [5] element: 5
// [10] element: 10
// [15] element: 15
// [20] element: 20
// 20

Паттерн: forEach с аккумулятором через Ref

import { Stream, Sink, Effect, Ref, HashMap } from "effect"

// Sink-гистограмма: считает частоту каждого значения
const histogramSink = <A>() =>
  Sink.unwrap(
    Effect.gen(function* () {
      const mapRef = yield* Ref.make(HashMap.empty<A, number>())

      return Sink.forEach((element: A) =>
        Ref.update(mapRef, (map) =>
          HashMap.set(
            map,
            element,
            (HashMap.get(map, element).pipe(
              // Если ключ есть — инкрементируем, иначе 1
              (opt) => opt._tag === "Some" ? opt.value + 1 : 1
            ))
          )
        )
      ).pipe(
        Sink.mapEffect(() => Ref.get(mapRef))
      )
    })
  )

const stream = Stream.make("a", "b", "a", "c", "b", "a", "d", "b")

Effect.runPromise(Stream.run(stream, histogramSink())).then(console.log)
// Output: HashMap { a: 3, b: 3, c: 1, d: 1 }

Sink из Effect

Sink.fromEffect — одноразовый эффект

Sink.fromEffect создаёт Sink, который не потребляет элементов, а выполняет единственный эффект:

import { Stream, Sink, Effect } from "effect"

// Sink, инициализирующий ресурс перед потреблением
const initSink = Sink.fromEffect(
  Effect.gen(function* () {
    yield* Effect.log("Initializing consumer...")
    return Date.now()
  })
)

const stream = Stream.make(1, 2, 3)

Effect.runPromise(Stream.run(stream, initSink)).then(console.log)
// Output:
// Initializing consumer...
// <timestamp>

Sink.unwrap — Sink из Effect

Sink.unwrap позволяет сконструировать Sink на основе эффекта. Это ключевой паттерн для динамических Sink:

import { Stream, Sink, Effect, Ref } from "effect"

// Sink, конфигурация которого зависит от внешнего эффекта
const configurableSink = (config: {
  readonly maxElements: number
  readonly minValue: number
}) =>
  Sink.unwrap(
    Effect.gen(function* () {
      yield* Effect.log(`Configured: max=${config.maxElements}, min=${config.minValue}`)

      return Sink.foldUntil(
        0,
        config.maxElements,
        (acc: number, n: number) => n >= config.minValue ? acc + n : acc
      )
    })
  )

const stream = Stream.make(1, 5, 2, 8, 3, 10, 4, 7)

Effect.runPromise(
  Stream.run(stream, configurableSink({ maxElements: 5, minValue: 4 }))
).then(console.log)
// Output: 13 (5 + 8, первые 5 элементов, только >= 4)

Паттерн: Sink из конфигурации

import { Stream, Sink, Effect, Context } from "effect"

// Сервис конфигурации
class AggregationConfig extends Context.Tag("AggregationConfig")<
  AggregationConfig,
  {
    readonly batchSize: number
    readonly threshold: number
  }
>() {}

// Sink, который использует конфигурацию из контекста
const configuredSink = Sink.unwrap(
  Effect.gen(function* () {
    const config = yield* AggregationConfig

    return Sink.fold(
      0,
      (sum: number) => sum < config.threshold,
      (acc: number, n: number) => acc + n
    )
  })
)

const stream = Stream.make(10, 20, 30, 40, 50)

const program = stream.pipe(
  Stream.run(configuredSink),
  Effect.provideService(AggregationConfig, {
    batchSize: 10,
    threshold: 50
  })
)

Effect.runPromise(program).then(console.log)
// Output: 60 (10 + 20 + 30, после чего сумма >= 50)

Stateful Sink с Ref

Паттерн: Sink с mutable состоянием

Для сложных stateful-потребителей используется Ref внутри Sink.unwrap:

import { Stream, Sink, Effect, Ref, Chunk } from "effect"

// Sink для вычисления экспоненциального скользящего среднего (EMA)
const emaSink = (alpha: number) =>
  Sink.unwrap(
    Effect.gen(function* () {
      const emaRef = yield* Ref.make<number | null>(null)
      const historyRef = yield* Ref.make(Chunk.empty<number>())

      return Sink.forEach((value: number) =>
        Effect.gen(function* () {
          const currentEma = yield* Ref.get(emaRef)
          const newEma = currentEma === null
            ? value
            : alpha * value + (1 - alpha) * currentEma

          yield* Ref.set(emaRef, newEma)
          yield* Ref.update(historyRef, (h) => Chunk.append(h, newEma))
        })
      ).pipe(
        Sink.mapEffect(() =>
          Effect.gen(function* () {
            const history = yield* Ref.get(historyRef)
            const finalEma = yield* Ref.get(emaRef)
            return {
              ema: finalEma ?? 0,
              history: Chunk.toReadonlyArray(history)
            }
          })
        )
      )
    })
  )

const prices = Stream.make(100, 102, 101, 105, 110, 108, 112, 115)

Effect.runPromise(Stream.run(prices, emaSink(0.3))).then(console.log)
// Output: { ema: ~109.7, history: [100, 100.6, 100.72, ...] }

Паттерн: Sink с множественными Ref

import { Stream, Sink, Effect, Ref } from "effect"

interface WindowedStats {
  readonly windowAvg: number
  readonly globalAvg: number
  readonly totalProcessed: number
}

// Sink с двумя уровнями агрегации: окно и глобальные
const dualAggregateSink = (windowSize: number) =>
  Sink.unwrap(
    Effect.gen(function* () {
      const windowRef = yield* Ref.make<ReadonlyArray<number>>([])
      const globalSumRef = yield* Ref.make(0)
      const globalCountRef = yield* Ref.make(0)

      return Sink.forEach((n: number) =>
        Effect.gen(function* () {
          // Обновляем окно
          yield* Ref.update(windowRef, (w) => {
            const newWindow = [...w, n]
            return newWindow.length > windowSize
              ? newWindow.slice(-windowSize)
              : newWindow
          })

          // Обновляем глобальные
          yield* Ref.update(globalSumRef, (s) => s + n)
          yield* Ref.update(globalCountRef, (c) => c + 1)
        })
      ).pipe(
        Sink.mapEffect(() =>
          Effect.gen(function* () {
            const window = yield* Ref.get(windowRef)
            const globalSum = yield* Ref.get(globalSumRef)
            const globalCount = yield* Ref.get(globalCountRef)

            const windowAvg = window.length > 0
              ? window.reduce((a, b) => a + b, 0) / window.length
              : 0

            return {
              windowAvg,
              globalAvg: globalCount > 0 ? globalSum / globalCount : 0,
              totalProcessed: globalCount
            } satisfies WindowedStats
          })
        )
      )
    })
  )

const stream = Stream.make(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)

Effect.runPromise(Stream.run(stream, dualAggregateSink(3))).then(console.log)
// Output: { windowAvg: 90, globalAvg: 55, totalProcessed: 10 }

Sink с зависимостями (Services)

Паттерн: Sink, требующий сервис

import { Stream, Sink, Effect, Context } from "effect"

// Определяем сервис логирования метрик
class MetricsService extends Context.Tag("MetricsService")<
  MetricsService,
  {
    readonly recordValue: (name: string, value: number) => Effect.Effect<void>
    readonly getTotal: (name: string) => Effect.Effect<number>
  }
>() {}

// Sink, использующий MetricsService
const metricsSink = (metricName: string) =>
  Sink.unwrap(
    Effect.gen(function* () {
      const metrics = yield* MetricsService

      return Sink.forEach((n: number) =>
        metrics.recordValue(metricName, n)
      ).pipe(
        Sink.mapEffect(() => metrics.getTotal(metricName))
      )
    })
  )

// Использование
const stream = Stream.make(10, 20, 30, 40, 50)

const program = stream.pipe(
  Stream.run(metricsSink("request_latency")),
  Effect.provideService(MetricsService, {
    recordValue: (name, value) =>
      Effect.log(`[${name}] recorded: ${value}`),
    getTotal: (_name) => Effect.succeed(150)
  })
)

Effect.runPromise(program).then(console.log)
// Output:
// [request_latency] recorded: 10
// [request_latency] recorded: 20
// ...
// 150

Паттерн: Sink с множественными зависимостями

import { Stream, Sink, Effect, Context, Ref } from "effect"

class Logger extends Context.Tag("Logger")<
  Logger,
  { readonly info: (msg: string) => Effect.Effect<void> }
>() {}

class Database extends Context.Tag("Database")<
  Database,
  { readonly insert: (data: unknown) => Effect.Effect<void> }
>() {}

// Sink, требующий Logger и Database
const persistSink = <A>() =>
  Sink.unwrap(
    Effect.gen(function* () {
      const logger = yield* Logger
      const db = yield* Database
      const countRef = yield* Ref.make(0)

      return Sink.forEach((element: A) =>
        Effect.gen(function* () {
          yield* logger.info(`Persisting: ${JSON.stringify(element)}`)
          yield* db.insert(element)
          yield* Ref.update(countRef, (c) => c + 1)
        })
      ).pipe(
        Sink.mapEffect(() =>
          Effect.gen(function* () {
            const count = yield* Ref.get(countRef)
            yield* logger.info(`Total persisted: ${count}`)
            return count
          })
        )
      )
    })
  )

// Тип Sink: Sink<number, A, never, never, Logger | Database>

Sink с ресурсами (Scope)

Sink.unwrapScoped — Sink с управлением жизненным циклом

Sink.unwrapScoped позволяет создать Sink, который управляет ресурсами (файлы, соединения, и т.д.):

import { Stream, Sink, Effect, Scope } from "effect"

// Симуляция файлового ресурса
const createFileWriter = (path: string) =>
  Effect.acquireRelease(
    Effect.gen(function* () {
      yield* Effect.log(`Opening file: ${path}`)
      const lines: string[] = []
      return {
        write: (line: string) =>
          Effect.sync(() => { lines.push(line) }),
        getLines: () => Effect.sync(() => [...lines])
      }
    }),
    (writer) =>
      Effect.gen(function* () {
        yield* Effect.log(`Closing file: ${path}`)
      })
  )

// Sink, который пишет в файл и закрывает его по завершении
const fileWriterSink = (path: string) =>
  Sink.unwrapScoped(
    Effect.gen(function* () {
      const writer = yield* createFileWriter(path)

      return Sink.forEach((line: string) =>
        writer.write(line)
      ).pipe(
        Sink.mapEffect(() => writer.getLines())
      )
    })
  )

const logStream = Stream.make(
  "2024-01-01 INFO Server started",
  "2024-01-01 WARN High memory",
  "2024-01-01 ERROR Timeout"
)

const program = logStream.pipe(
  Stream.run(fileWriterSink("/tmp/app.log")),
  Effect.scoped
)

Effect.runPromise(program).then(console.log)
// Output:
// Opening file: /tmp/app.log
// Closing file: /tmp/app.log
// ["2024-01-01 INFO Server started", ...]

Паттерны композитного конструирования

Паттерн: Sink-фабрика

Создайте фабричную функцию, возвращающую настроенный Sink:

import { Stream, Sink, Effect, Chunk } from "effect"

// Фабрика Sink для пакетной обработки
const batchProcessor = <A, B>(config: {
  readonly batchSize: number
  readonly process: (batch: ReadonlyArray<A>) => Effect.Effect<B>
}) => {
  const batchSink = Sink.foldUntil(
    Chunk.empty<A>(),
    config.batchSize,
    (acc: Chunk.Chunk<A>, el: A) => Chunk.append(acc, el)
  )

  // transduce для повторного применения + mapEffect для обработки
  return (stream: Stream.Stream<A>) =>
    stream.pipe(
      Stream.transduce(batchSink),
      Stream.mapEffect((batch) =>
        config.process(Chunk.toReadonlyArray(batch))
      )
    )
}

// Использование
const items = Stream.range(1, 21)

const processed = batchProcessor({
  batchSize: 5,
  process: (batch) =>
    Effect.gen(function* () {
      yield* Effect.log(`Processing batch of ${batch.length}: [${batch}]`)
      return batch.reduce((a, b) => a + b, 0)
    })
})(items)

Effect.runPromise(Stream.runCollect(processed)).then(console.log)
// Output:
// Processing batch of 5: [1,2,3,4,5]
// Processing batch of 5: [6,7,8,9,10]
// Processing batch of 5: [11,12,13,14,15]
// Processing batch of 5: [16,17,18,19,20]
// Chunk(15, 50, 65, 90)

Паттерн: Sink с валидацией

import { Stream, Sink, Effect, Either } from "effect"

class ValidationError {
  readonly _tag = "ValidationError"
  constructor(
    readonly field: string,
    readonly message: string
  ) {}
}

// Sink, который валидирует каждый элемент и собирает ошибки
const validatingSink = <A>(
  validate: (a: A) => Either.Either<A, ValidationError>
) =>
  Sink.foldLeft(
    { valid: [] as ReadonlyArray<A>, errors: [] as ReadonlyArray<ValidationError> },
    (state, element: A) => {
      const result = validate(element)
      return Either.isRight(result)
        ? { ...state, valid: [...state.valid, result.right] }
        : { ...state, errors: [...state.errors, result.left] }
    }
  )

// Использование
interface UserInput {
  readonly name: string
  readonly age: number
}

const validateUser = (input: UserInput): Either.Either<UserInput, ValidationError> => {
  if (input.name.length < 2) {
    return Either.left(new ValidationError("name", `Too short: ${input.name}`))
  }
  if (input.age < 0 || input.age > 150) {
    return Either.left(new ValidationError("age", `Invalid age: ${input.age}`))
  }
  return Either.right(input)
}

const userStream = Stream.make(
  { name: "Alice", age: 30 },
  { name: "B", age: 25 },         // name too short
  { name: "Charlie", age: -5 },   // invalid age
  { name: "Diana", age: 28 }
)

Effect.runPromise(
  Stream.run(userStream, validatingSink(validateUser))
).then((result) => {
  console.log(`Valid: ${result.valid.length}`)
  console.log(`Errors: ${result.errors.length}`)
  result.errors.forEach((e) =>
    console.log(`  ${e.field}: ${e.message}`)
  )
})
// Output:
// Valid: 2
// Errors: 2
//   name: Too short: B
//   age: Invalid age: -5

Паттерн: Sink с rate-limiting

import { Stream, Sink, Effect, Ref } from "effect"

// Sink, который замедляет потребление до N элементов в секунду
const rateLimitedSink = <A>(
  elementsPerSecond: number,
  innerSink: Sink.Sink<void, A, never, never, never>
) =>
  Sink.unwrap(
    Effect.gen(function* () {
      const intervalMs = Math.floor(1000 / elementsPerSecond)

      return Sink.forEach((element: A) =>
        Effect.gen(function* () {
          yield* Effect.sleep(`${intervalMs} millis`)
          // Делегируем обработку внутреннему потребителю
          yield* Stream.run(Stream.make(element), innerSink)
        })
      )
    })
  )

Паттерн: Composable Sink через pipe

import { Stream, Sink, Effect, pipe } from "effect"

// Строим сложный Sink пошагово через pipe
const createAnalyticsSink = () =>
  pipe(
    Sink.foldLeft(
      {
        count: 0,
        sum: 0,
        min: Infinity,
        max: -Infinity,
        values: [] as ReadonlyArray<number>
      },
      (state, n: number) => ({
        count: state.count + 1,
        sum: state.sum + n,
        min: Math.min(state.min, n),
        max: Math.max(state.max, n),
        values: [...state.values, n]
      })
    ),
    Sink.map((state) => {
      const avg = state.count > 0 ? state.sum / state.count : 0
      const sorted = [...state.values].sort((a, b) => a - b)
      const median = sorted.length > 0
        ? sorted[Math.floor(sorted.length / 2)] ?? 0
        : 0

      return {
        count: state.count,
        sum: state.sum,
        avg,
        min: state.min,
        max: state.max,
        median,
        range: state.max - state.min
      }
    })
  )

const dataStream = Stream.make(5, 2, 8, 1, 9, 3, 7, 4, 6, 10)

Effect.runPromise(
  Stream.run(dataStream, createAnalyticsSink())
).then(console.log)
// Output: { count: 10, sum: 55, avg: 5.5, min: 1, max: 10, median: 6, range: 9 }

API Reference

Конструкторы пользовательских Sink

ФункцияОписание
Sink.foldLeft(init, f)Полная свёртка без остановки
Sink.fold(init, cont, f)Свёртка с предикатом продолжения
Sink.foldUntil(init, max, f)Свёртка до N элементов
Sink.foldWeighted({...})Свёртка по весам
Sink.forEach(f)Эффективная обработка каждого элемента
Sink.fromEffect(effect)Sink из единичного Effect
Sink.unwrap(effect)Sink из Effect, возвращающего Sink
Sink.unwrapScoped(effect)Sink с управлением ресурсами
Sink.succeed(value)Немедленный успех
Sink.fail(error)Немедленная ошибка

Модификаторы

ФункцияОписание
Sink.map(f)Трансформация результата
Sink.mapEffect(f)Эффективная трансформация результата
Sink.mapInput(f)Трансформация входа
Sink.filterInput(pred)Фильтрация входа
Sink.dimap({onInput, onDone})Трансформация входа и выхода

Примеры

💻 Пример 1: Real-time metrics aggregator

import { Stream, Sink, Effect, Ref, Chunk, Schedule } from "effect"

interface MetricEvent {
  readonly name: string
  readonly value: number
  readonly timestamp: number
}

interface AggregatedMetrics {
  readonly byName: Record<string, {
    readonly count: number
    readonly sum: number
    readonly avg: number
    readonly min: number
    readonly max: number
  }>
  readonly totalEvents: number
}

const metricsAggregator = Sink.unwrap(
  Effect.gen(function* () {
    const stateRef = yield* Ref.make<Map<string, {
      count: number
      sum: number
      min: number
      max: number
    }>>(new Map())
    const totalRef = yield* Ref.make(0)

    return Sink.forEach((event: MetricEvent) =>
      Effect.gen(function* () {
        yield* Ref.update(stateRef, (map) => {
          const existing = map.get(event.name)
          const updated = existing
            ? {
                count: existing.count + 1,
                sum: existing.sum + event.value,
                min: Math.min(existing.min, event.value),
                max: Math.max(existing.max, event.value)
              }
            : {
                count: 1,
                sum: event.value,
                min: event.value,
                max: event.value
              }
          return new Map(map).set(event.name, updated)
        })
        yield* Ref.update(totalRef, (t) => t + 1)
      })
    ).pipe(
      Sink.mapEffect(() =>
        Effect.gen(function* () {
          const state = yield* Ref.get(stateRef)
          const total = yield* Ref.get(totalRef)

          const byName: Record<string, any> = {}
          for (const [name, data] of state) {
            byName[name] = {
              ...data,
              avg: data.count > 0 ? data.sum / data.count : 0
            }
          }

          return { byName, totalEvents: total } satisfies AggregatedMetrics
        })
      )
    )
  })
)

const eventStream: Stream.Stream<MetricEvent> = Stream.make(
  { name: "latency", value: 45, timestamp: 1 },
  { name: "latency", value: 52, timestamp: 2 },
  { name: "throughput", value: 1000, timestamp: 3 },
  { name: "latency", value: 38, timestamp: 4 },
  { name: "throughput", value: 1200, timestamp: 5 },
  { name: "errors", value: 2, timestamp: 6 }
)

Effect.runPromise(Stream.run(eventStream, metricsAggregator)).then((m) => {
  console.log(`Total events: ${m.totalEvents}`)
  Object.entries(m.byName).forEach(([name, data]) => {
    console.log(`  ${name}: avg=${data.avg.toFixed(1)}, min=${data.min}, max=${data.max}`)
  })
})
// Output:
// Total events: 6
//   latency: avg=45.0, min=38, max=52
//   throughput: avg=1100.0, min=1000, max=1200
//   errors: avg=2.0, min=2, max=2

💻 Пример 2: Chunked JSON writer

import { Stream, Sink, Effect, Chunk } from "effect"

// Sink, который формирует JSON Array из потока объектов
const jsonArraySink = <A>(): Sink.Sink<string, A, never, never, never> =>
  Sink.collectAll<A>().pipe(
    Sink.map((chunk) => {
      const items = Chunk.toReadonlyArray(chunk)
      return JSON.stringify(items, null, 2)
    })
  )

// Sink, который формирует NDJSON (newline-delimited JSON)
const ndjsonSink = <A>(): Sink.Sink<string, A, never, never, never> =>
  Sink.foldLeft(
    [] as ReadonlyArray<string>,
    (lines, element: A) => [...lines, JSON.stringify(element)]
  ).pipe(
    Sink.map((lines) => lines.join("\n"))
  )

interface User {
  readonly id: number
  readonly name: string
}

const users = Stream.make(
  { id: 1, name: "Alice" },
  { id: 2, name: "Bob" },
  { id: 3, name: "Charlie" }
)

Effect.runPromise(Stream.run(users, ndjsonSink())).then(console.log)
// Output:
// {"id":1,"name":"Alice"}
// {"id":2,"name":"Bob"}
// {"id":3,"name":"Charlie"}

Упражнения

🟢 Basic

Упражнение 1: MinMax Sink

Создайте пользовательский Sink minMax, который за один проход вычисляет минимум и максимум потока числовых значений.

import { Stream, Sink, Effect } from "effect"

const minMax: Sink.Sink<
  { readonly min: number; readonly max: number },
  number,
  never,
  never,
  never
> = // Ваш код здесь

const stream = Stream.make(5, 2, 8, 1, 9, 3, 7)

Решение:

import { Stream, Sink, Effect } from "effect"

const minMax: Sink.Sink<
  { readonly min: number; readonly max: number },
  number,
  never,
  never,
  never
> = Sink.foldLeft(
  { min: Infinity, max: -Infinity },
  (state, n: number) => ({
    min: Math.min(state.min, n),
    max: Math.max(state.max, n)
  })
)

const stream = Stream.make(5, 2, 8, 1, 9, 3, 7)

Effect.runPromise(Stream.run(stream, minMax)).then(console.log)
// Output: { min: 1, max: 9 }

Упражнение 2: Top-N Sink

Создайте Sink, который находит N наибольших элементов потока.

Решение:

import { Stream, Sink, Effect } from "effect"

const topN = (n: number): Sink.Sink<ReadonlyArray<number>, number, never, never, never> =>
  Sink.foldLeft(
    [] as ReadonlyArray<number>,
    (top, value: number) => {
      const withNew = [...top, value].sort((a, b) => b - a)
      return withNew.slice(0, n)
    }
  )

const stream = Stream.make(5, 2, 8, 1, 9, 3, 7, 4, 10, 6)

Effect.runPromise(Stream.run(stream, topN(3))).then(console.log)
// Output: [10, 9, 8]

🟡 Intermediate

Упражнение 3: Sink с дедупликацией

Создайте Sink, который собирает элементы потока, но пропускает последовательные дубликаты (аналогично Unix uniq).

Решение:

import { Stream, Sink, Effect } from "effect"

const uniqueConsecutive = <A>(): Sink.Sink<ReadonlyArray<A>, A, never, never, never> =>
  Sink.foldLeft(
    { result: [] as ReadonlyArray<A>, last: undefined as A | undefined },
    (state, element: A) => {
      if (state.last === element) {
        return state // Пропускаем дубликат
      }
      return {
        result: [...state.result, element],
        last: element
      }
    }
  ).pipe(
    Sink.map((state) => state.result)
  )

const stream = Stream.make(1, 1, 2, 2, 2, 3, 1, 1, 4, 4, 3, 3)

Effect.runPromise(Stream.run(stream, uniqueConsecutive())).then(console.log)
// Output: [1, 2, 3, 1, 4, 3]

Упражнение 4: Sink с таймаутом состояния

Создайте Sink, который аккумулирует числа, но завершается если сумма превышает заданный лимит ИЛИ обработано более N элементов. Вычислите финальную статистику.

Решение:

import { Stream, Sink, Effect } from "effect"

interface BoundedResult {
  readonly sum: number
  readonly count: number
  readonly reason: "limit_reached" | "count_exceeded" | "stream_ended"
}

const boundedAccumulator = (
  sumLimit: number,
  maxCount: number
): Sink.Sink<BoundedResult, number, number, never, never> =>
  Sink.fold(
    { sum: 0, count: 0, reason: "stream_ended" as const } satisfies BoundedResult,
    (state) => state.sum < sumLimit && state.count < maxCount,
    (state, n: number) => {
      const newSum = state.sum + n
      const newCount = state.count + 1
      return {
        sum: newSum,
        count: newCount,
        reason: newSum >= sumLimit
          ? "limit_reached" as const
          : newCount >= maxCount
            ? "count_exceeded" as const
            : "stream_ended" as const
      }
    }
  )

const stream = Stream.iterate(1, (n) => n + 1) // 1, 2, 3, 4, ...

Effect.runPromise(
  Stream.run(stream, boundedAccumulator(50, 20))
).then(console.log)
// Output: { sum: 55, count: 10, reason: "limit_reached" }
// (1+2+...+10 = 55 >= 50)

🔴 Advanced

Упражнение 5: Event Sourcing Sink

Реализуйте Sink в паттерне Event Sourcing: принимает поток событий, применяет их к начальному состоянию и возвращает финальный snapshot.

import { Stream, Sink, Effect } from "effect"

// Domain events
type AccountEvent =
  | { readonly _tag: "Opened"; readonly initialBalance: number }
  | { readonly _tag: "Deposited"; readonly amount: number }
  | { readonly _tag: "Withdrawn"; readonly amount: number }
  | { readonly _tag: "Closed" }

// State
interface AccountState {
  readonly balance: number
  readonly isOpen: boolean
  readonly transactionCount: number
}

// Ваш код: создайте Sink, обрабатывающий поток AccountEvent

Решение:

import { Stream, Sink, Effect } from "effect"

type AccountEvent =
  | { readonly _tag: "Opened"; readonly initialBalance: number }
  | { readonly _tag: "Deposited"; readonly amount: number }
  | { readonly _tag: "Withdrawn"; readonly amount: number }
  | { readonly _tag: "Closed" }

interface AccountState {
  readonly balance: number
  readonly isOpen: boolean
  readonly transactionCount: number
}

const initialState: AccountState = {
  balance: 0,
  isOpen: false,
  transactionCount: 0
}

const applyEvent = (state: AccountState, event: AccountEvent): AccountState => {
  switch (event._tag) {
    case "Opened":
      return {
        balance: event.initialBalance,
        isOpen: true,
        transactionCount: state.transactionCount + 1
      }
    case "Deposited":
      return state.isOpen
        ? {
            ...state,
            balance: state.balance + event.amount,
            transactionCount: state.transactionCount + 1
          }
        : state
    case "Withdrawn":
      return state.isOpen && state.balance >= event.amount
        ? {
            ...state,
            balance: state.balance - event.amount,
            transactionCount: state.transactionCount + 1
          }
        : state
    case "Closed":
      return {
        ...state,
        isOpen: false,
        transactionCount: state.transactionCount + 1
      }
  }
}

// Event Sourcing Sink
const eventSourcingSink: Sink.Sink<AccountState, AccountEvent, never, never, never> =
  Sink.foldLeft(initialState, applyEvent)

// С ранним завершением (остановка после закрытия)
const eventSourcingSinkWithEarlyStop: Sink.Sink<AccountState, AccountEvent, AccountEvent, never, never> =
  Sink.fold(
    initialState,
    (state) => state.isOpen || state.transactionCount === 0,
    applyEvent
  )

// Тест
const events: Stream.Stream<AccountEvent> = Stream.make(
  { _tag: "Opened", initialBalance: 1000 },
  { _tag: "Deposited", amount: 500 },
  { _tag: "Withdrawn", amount: 200 },
  { _tag: "Deposited", amount: 300 },
  { _tag: "Withdrawn", amount: 100 },
  { _tag: "Closed" }
)

Effect.runPromise(Stream.run(events, eventSourcingSink)).then(console.log)
// Output: { balance: 1500, isOpen: false, transactionCount: 6 }

🔗 Модуль завершён. Далее: Модуль 12: Channel — Двунаправленные потоки →