Создание собственных Sink
Паттерны конструирования пользовательских потребителей потоков — от простых обёрток над fold до сложных stateful Sink с зависимостями.
Теория
Зачем создавать свои Sink
Встроенные Sink покрывают большинство типовых сценариев: сбор, свёртка, подсчёт. Но production-системы требуют специализированных потребителей:
- Агрегация с бизнес-логикой — вычисление метрик, специфичных для домена (percentile, moving average, custom scoring)
- Sink с побочными эффектами — запись в БД, отправка в очередь, HTTP-вызовы
- Sink с зависимостями — потребитель, требующий сервисы из контекста (Logger, MetricsService, DatabaseClient)
- Sink с ресурсами — потребитель, управляющий жизненным циклом ресурсов (файловые дескрипторы, сетевые соединения)
- Валидирующие Sink — потребитель с проверкой инвариантов и ранним завершением по ошибке
Стратегии конструирования
Существует несколько подходов к созданию пользовательских Sink, от простого к сложному:
Стратегии создания Sink
│
├── 1. Композиция встроенных (самый простой)
│ └── pipe(Sink.sum, Sink.map(...), Sink.filterInput(...))
│
├── 2. fold / foldLeft (аккумулятор)
│ └── Sink.foldLeft(initial, reducer)
│
├── 3. forEach + Ref (stateful с побочными эффектами)
│ └── Sink.unwrap(Ref.make(...).pipe(...))
│
├── 4. flatMap цепочки (протоколы, парсеры)
│ └── Sink.head().pipe(Sink.flatMap(...))
│
├── 5. Sink.fromEffect / Sink.unwrap (эффективное конструирование)
│ └── Sink.unwrap(Effect.gen(...))
│
└── 6. Sink.unwrapScoped (с управлением ресурсами)
└── Sink.unwrapScoped(Effect.acquireRelease(...))
Концепция ФП
Sink как Free Monad
Пользовательские Sink можно рассматривать как программы в свободной монаде потребления. Каждая операция — это инструкция:
Consume(n)— потребить n элементовEmit(result)— завершиться с результатомFail(error)— завершиться ошибкойAccess(service)— получить зависимость
Комбинаторы (flatMap, map, zip) интерпретируют эти инструкции, создавая конкретные стратегии потребления.
Church-кодирование потребителей
Другой взгляд — Sink как Church-кодированный тип данных, определяемый через его элиминатор:
-- Sink определяется тем, как его "разрушить"
type Sink a = forall r.
(a -> r) -- обработка успешного результата
-> (e -> r) -- обработка ошибки
-> (In -> Sink a) -- потребление следующего элемента
-> r
Это означает, что любой Sink полностью определяется тремя вещами:
- Как обработать финальный результат
- Как обработать ошибку
- Как обработать очередной входной элемент
Алгебра F-coalgebra для Sink
Sink можно формализовать как коалгебру — двойственное к алгебре понятие:
Алгебра (fold): F(A) → A (уничтожает структуру)
Коалгебра (unfold): A → F(A) (генерирует структуру)
Sink-коалгебра:
step: State → Input → State + Result + Error
Каждый шаг Sink — это функция из текущего состояния и входного элемента в новое состояние (или результат, или ошибку). Это в точности описывает fold.
Конструирование через fold
Паттерн: fold как универсальный конструктор
Большинство пользовательских Sink можно выразить через foldLeft или fold:
import { Stream, Sink, Effect } from "effect"
// ── Среднее значение через foldLeft ────────────────────
interface AvgState {
readonly sum: number
readonly count: number
}
const averageSink: Sink.Sink<number, number, never, never, never> =
Sink.foldLeft(
{ sum: 0, count: 0 } satisfies AvgState,
(state, n: number) => ({
sum: state.sum + n,
count: state.count + 1
})
).pipe(
Sink.map(({ sum, count }) => count > 0 ? sum / count : 0)
)
const stream = Stream.make(10, 20, 30, 40, 50)
Effect.runPromise(Stream.run(stream, averageSink)).then(console.log)
// Output: 30
Паттерн: fold с ранним завершением
import { Stream, Sink, Effect } from "effect"
// Sink, который ищет первый элемент, удовлетворяющий предикату
const findFirst = <A>(
predicate: (a: A) => boolean
): Sink.Sink<A | null, A, A, never, never> =>
Sink.fold(
null as A | null,
(result) => result === null, // Продолжаем пока не нашли
(_found, element: A) => predicate(element) ? element : null
)
const stream = Stream.make(1, 3, 5, 8, 11, 14)
Effect.runPromise(
Stream.run(stream, findFirst<number>((n) => n > 10))
).then(console.log)
// Output: 11
Паттерн: foldLeft с аккумулятором-записью
import { Stream, Sink, Effect } from "effect"
// Полная статистика потока за один проход
interface Stats {
readonly min: number
readonly max: number
readonly sum: number
readonly count: number
readonly sumOfSquares: number
}
const statsSink: Sink.Sink<Stats & { readonly avg: number; readonly stddev: number }, number, never, never, never> =
Sink.foldLeft(
{
min: Infinity,
max: -Infinity,
sum: 0,
count: 0,
sumOfSquares: 0
} satisfies Stats,
(stats, n: number): Stats => ({
min: Math.min(stats.min, n),
max: Math.max(stats.max, n),
sum: stats.sum + n,
count: stats.count + 1,
sumOfSquares: stats.sumOfSquares + n * n
})
).pipe(
Sink.map((s) => {
const avg = s.count > 0 ? s.sum / s.count : 0
const variance = s.count > 0
? s.sumOfSquares / s.count - avg * avg
: 0
return { ...s, avg, stddev: Math.sqrt(variance) }
})
)
const stream = Stream.make(4, 8, 15, 16, 23, 42)
Effect.runPromise(Stream.run(stream, statsSink)).then(console.log)
// Output: { min: 4, max: 42, sum: 108, count: 6, avg: 18, stddev: ~12.34, ... }
Паттерн: Running window через fold
import { Stream, Sink, Chunk, Effect } from "effect"
// Скользящее окно размера N с вычислением среднего
const movingAverageSink = (windowSize: number) =>
Sink.foldLeft(
{
window: Chunk.empty<number>(),
averages: Chunk.empty<number>()
},
(state, n: number) => {
const newWindow = Chunk.size(state.window) >= windowSize
? Chunk.append(Chunk.drop(state.window, 1), n)
: Chunk.append(state.window, n)
const avg = Chunk.size(newWindow) >= windowSize
? Chunk.reduce(newWindow, 0, (a, b) => a + b) / windowSize
: NaN
const newAverages = isNaN(avg)
? state.averages
: Chunk.append(state.averages, avg)
return { window: newWindow, averages: newAverages }
}
).pipe(
Sink.map((state) => Chunk.toReadonlyArray(state.averages))
)
const stream = Stream.make(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Effect.runPromise(
Stream.run(stream, movingAverageSink(3))
).then(console.log)
// Output: [2, 3, 4, 5, 6, 7, 8, 9]
// (скользящее среднее по окну размера 3)
Конструирование через forEach
Паттерн: forEach + Ref для stateful обработки
Когда нужны побочные эффекты на каждый элемент с накоплением состояния:
import { Stream, Sink, Effect, Ref } from "effect"
// Sink, который считает и логирует каждый N-й элемент
const everyNthLogger = (n: number) =>
Sink.unwrap(
Effect.gen(function* () {
const counter = yield* Ref.make(0)
return Sink.forEach((element: number) =>
Effect.gen(function* () {
const current = yield* Ref.updateAndGet(counter, (c) => c + 1)
if (current % n === 0) {
yield* Effect.log(`[${current}] element: ${element}`)
}
})
).pipe(
Sink.mapEffect(() => Ref.get(counter))
)
})
)
const stream = Stream.range(1, 21) // 1..20
Effect.runPromise(Stream.run(stream, everyNthLogger(5))).then(console.log)
// Output:
// [5] element: 5
// [10] element: 10
// [15] element: 15
// [20] element: 20
// 20
Паттерн: forEach с аккумулятором через Ref
import { Stream, Sink, Effect, Ref, HashMap } from "effect"
// Sink-гистограмма: считает частоту каждого значения
const histogramSink = <A>() =>
Sink.unwrap(
Effect.gen(function* () {
const mapRef = yield* Ref.make(HashMap.empty<A, number>())
return Sink.forEach((element: A) =>
Ref.update(mapRef, (map) =>
HashMap.set(
map,
element,
(HashMap.get(map, element).pipe(
// Если ключ есть — инкрементируем, иначе 1
(opt) => opt._tag === "Some" ? opt.value + 1 : 1
))
)
)
).pipe(
Sink.mapEffect(() => Ref.get(mapRef))
)
})
)
const stream = Stream.make("a", "b", "a", "c", "b", "a", "d", "b")
Effect.runPromise(Stream.run(stream, histogramSink())).then(console.log)
// Output: HashMap { a: 3, b: 3, c: 1, d: 1 }
Sink из Effect
Sink.fromEffect — одноразовый эффект
Sink.fromEffect создаёт Sink, который не потребляет элементов, а выполняет единственный эффект:
import { Stream, Sink, Effect } from "effect"
// Sink, инициализирующий ресурс перед потреблением
const initSink = Sink.fromEffect(
Effect.gen(function* () {
yield* Effect.log("Initializing consumer...")
return Date.now()
})
)
const stream = Stream.make(1, 2, 3)
Effect.runPromise(Stream.run(stream, initSink)).then(console.log)
// Output:
// Initializing consumer...
// <timestamp>
Sink.unwrap — Sink из Effect
Sink.unwrap позволяет сконструировать Sink на основе эффекта. Это ключевой паттерн для динамических Sink:
import { Stream, Sink, Effect, Ref } from "effect"
// Sink, конфигурация которого зависит от внешнего эффекта
const configurableSink = (config: {
readonly maxElements: number
readonly minValue: number
}) =>
Sink.unwrap(
Effect.gen(function* () {
yield* Effect.log(`Configured: max=${config.maxElements}, min=${config.minValue}`)
return Sink.foldUntil(
0,
config.maxElements,
(acc: number, n: number) => n >= config.minValue ? acc + n : acc
)
})
)
const stream = Stream.make(1, 5, 2, 8, 3, 10, 4, 7)
Effect.runPromise(
Stream.run(stream, configurableSink({ maxElements: 5, minValue: 4 }))
).then(console.log)
// Output: 13 (5 + 8, первые 5 элементов, только >= 4)
Паттерн: Sink из конфигурации
import { Stream, Sink, Effect, Context } from "effect"
// Сервис конфигурации
class AggregationConfig extends Context.Tag("AggregationConfig")<
AggregationConfig,
{
readonly batchSize: number
readonly threshold: number
}
>() {}
// Sink, который использует конфигурацию из контекста
const configuredSink = Sink.unwrap(
Effect.gen(function* () {
const config = yield* AggregationConfig
return Sink.fold(
0,
(sum: number) => sum < config.threshold,
(acc: number, n: number) => acc + n
)
})
)
const stream = Stream.make(10, 20, 30, 40, 50)
const program = stream.pipe(
Stream.run(configuredSink),
Effect.provideService(AggregationConfig, {
batchSize: 10,
threshold: 50
})
)
Effect.runPromise(program).then(console.log)
// Output: 60 (10 + 20 + 30, после чего сумма >= 50)
Stateful Sink с Ref
Паттерн: Sink с mutable состоянием
Для сложных stateful-потребителей используется Ref внутри Sink.unwrap:
import { Stream, Sink, Effect, Ref, Chunk } from "effect"
// Sink для вычисления экспоненциального скользящего среднего (EMA)
const emaSink = (alpha: number) =>
Sink.unwrap(
Effect.gen(function* () {
const emaRef = yield* Ref.make<number | null>(null)
const historyRef = yield* Ref.make(Chunk.empty<number>())
return Sink.forEach((value: number) =>
Effect.gen(function* () {
const currentEma = yield* Ref.get(emaRef)
const newEma = currentEma === null
? value
: alpha * value + (1 - alpha) * currentEma
yield* Ref.set(emaRef, newEma)
yield* Ref.update(historyRef, (h) => Chunk.append(h, newEma))
})
).pipe(
Sink.mapEffect(() =>
Effect.gen(function* () {
const history = yield* Ref.get(historyRef)
const finalEma = yield* Ref.get(emaRef)
return {
ema: finalEma ?? 0,
history: Chunk.toReadonlyArray(history)
}
})
)
)
})
)
const prices = Stream.make(100, 102, 101, 105, 110, 108, 112, 115)
Effect.runPromise(Stream.run(prices, emaSink(0.3))).then(console.log)
// Output: { ema: ~109.7, history: [100, 100.6, 100.72, ...] }
Паттерн: Sink с множественными Ref
import { Stream, Sink, Effect, Ref } from "effect"
interface WindowedStats {
readonly windowAvg: number
readonly globalAvg: number
readonly totalProcessed: number
}
// Sink с двумя уровнями агрегации: окно и глобальные
const dualAggregateSink = (windowSize: number) =>
Sink.unwrap(
Effect.gen(function* () {
const windowRef = yield* Ref.make<ReadonlyArray<number>>([])
const globalSumRef = yield* Ref.make(0)
const globalCountRef = yield* Ref.make(0)
return Sink.forEach((n: number) =>
Effect.gen(function* () {
// Обновляем окно
yield* Ref.update(windowRef, (w) => {
const newWindow = [...w, n]
return newWindow.length > windowSize
? newWindow.slice(-windowSize)
: newWindow
})
// Обновляем глобальные
yield* Ref.update(globalSumRef, (s) => s + n)
yield* Ref.update(globalCountRef, (c) => c + 1)
})
).pipe(
Sink.mapEffect(() =>
Effect.gen(function* () {
const window = yield* Ref.get(windowRef)
const globalSum = yield* Ref.get(globalSumRef)
const globalCount = yield* Ref.get(globalCountRef)
const windowAvg = window.length > 0
? window.reduce((a, b) => a + b, 0) / window.length
: 0
return {
windowAvg,
globalAvg: globalCount > 0 ? globalSum / globalCount : 0,
totalProcessed: globalCount
} satisfies WindowedStats
})
)
)
})
)
const stream = Stream.make(10, 20, 30, 40, 50, 60, 70, 80, 90, 100)
Effect.runPromise(Stream.run(stream, dualAggregateSink(3))).then(console.log)
// Output: { windowAvg: 90, globalAvg: 55, totalProcessed: 10 }
Sink с зависимостями (Services)
Паттерн: Sink, требующий сервис
import { Stream, Sink, Effect, Context } from "effect"
// Определяем сервис логирования метрик
class MetricsService extends Context.Tag("MetricsService")<
MetricsService,
{
readonly recordValue: (name: string, value: number) => Effect.Effect<void>
readonly getTotal: (name: string) => Effect.Effect<number>
}
>() {}
// Sink, использующий MetricsService
const metricsSink = (metricName: string) =>
Sink.unwrap(
Effect.gen(function* () {
const metrics = yield* MetricsService
return Sink.forEach((n: number) =>
metrics.recordValue(metricName, n)
).pipe(
Sink.mapEffect(() => metrics.getTotal(metricName))
)
})
)
// Использование
const stream = Stream.make(10, 20, 30, 40, 50)
const program = stream.pipe(
Stream.run(metricsSink("request_latency")),
Effect.provideService(MetricsService, {
recordValue: (name, value) =>
Effect.log(`[${name}] recorded: ${value}`),
getTotal: (_name) => Effect.succeed(150)
})
)
Effect.runPromise(program).then(console.log)
// Output:
// [request_latency] recorded: 10
// [request_latency] recorded: 20
// ...
// 150
Паттерн: Sink с множественными зависимостями
import { Stream, Sink, Effect, Context, Ref } from "effect"
class Logger extends Context.Tag("Logger")<
Logger,
{ readonly info: (msg: string) => Effect.Effect<void> }
>() {}
class Database extends Context.Tag("Database")<
Database,
{ readonly insert: (data: unknown) => Effect.Effect<void> }
>() {}
// Sink, требующий Logger и Database
const persistSink = <A>() =>
Sink.unwrap(
Effect.gen(function* () {
const logger = yield* Logger
const db = yield* Database
const countRef = yield* Ref.make(0)
return Sink.forEach((element: A) =>
Effect.gen(function* () {
yield* logger.info(`Persisting: ${JSON.stringify(element)}`)
yield* db.insert(element)
yield* Ref.update(countRef, (c) => c + 1)
})
).pipe(
Sink.mapEffect(() =>
Effect.gen(function* () {
const count = yield* Ref.get(countRef)
yield* logger.info(`Total persisted: ${count}`)
return count
})
)
)
})
)
// Тип Sink: Sink<number, A, never, never, Logger | Database>
Sink с ресурсами (Scope)
Sink.unwrapScoped — Sink с управлением жизненным циклом
Sink.unwrapScoped позволяет создать Sink, который управляет ресурсами (файлы, соединения, и т.д.):
import { Stream, Sink, Effect, Scope } from "effect"
// Симуляция файлового ресурса
const createFileWriter = (path: string) =>
Effect.acquireRelease(
Effect.gen(function* () {
yield* Effect.log(`Opening file: ${path}`)
const lines: string[] = []
return {
write: (line: string) =>
Effect.sync(() => { lines.push(line) }),
getLines: () => Effect.sync(() => [...lines])
}
}),
(writer) =>
Effect.gen(function* () {
yield* Effect.log(`Closing file: ${path}`)
})
)
// Sink, который пишет в файл и закрывает его по завершении
const fileWriterSink = (path: string) =>
Sink.unwrapScoped(
Effect.gen(function* () {
const writer = yield* createFileWriter(path)
return Sink.forEach((line: string) =>
writer.write(line)
).pipe(
Sink.mapEffect(() => writer.getLines())
)
})
)
const logStream = Stream.make(
"2024-01-01 INFO Server started",
"2024-01-01 WARN High memory",
"2024-01-01 ERROR Timeout"
)
const program = logStream.pipe(
Stream.run(fileWriterSink("/tmp/app.log")),
Effect.scoped
)
Effect.runPromise(program).then(console.log)
// Output:
// Opening file: /tmp/app.log
// Closing file: /tmp/app.log
// ["2024-01-01 INFO Server started", ...]
Паттерны композитного конструирования
Паттерн: Sink-фабрика
Создайте фабричную функцию, возвращающую настроенный Sink:
import { Stream, Sink, Effect, Chunk } from "effect"
// Фабрика Sink для пакетной обработки
const batchProcessor = <A, B>(config: {
readonly batchSize: number
readonly process: (batch: ReadonlyArray<A>) => Effect.Effect<B>
}) => {
const batchSink = Sink.foldUntil(
Chunk.empty<A>(),
config.batchSize,
(acc: Chunk.Chunk<A>, el: A) => Chunk.append(acc, el)
)
// transduce для повторного применения + mapEffect для обработки
return (stream: Stream.Stream<A>) =>
stream.pipe(
Stream.transduce(batchSink),
Stream.mapEffect((batch) =>
config.process(Chunk.toReadonlyArray(batch))
)
)
}
// Использование
const items = Stream.range(1, 21)
const processed = batchProcessor({
batchSize: 5,
process: (batch) =>
Effect.gen(function* () {
yield* Effect.log(`Processing batch of ${batch.length}: [${batch}]`)
return batch.reduce((a, b) => a + b, 0)
})
})(items)
Effect.runPromise(Stream.runCollect(processed)).then(console.log)
// Output:
// Processing batch of 5: [1,2,3,4,5]
// Processing batch of 5: [6,7,8,9,10]
// Processing batch of 5: [11,12,13,14,15]
// Processing batch of 5: [16,17,18,19,20]
// Chunk(15, 50, 65, 90)
Паттерн: Sink с валидацией
import { Stream, Sink, Effect, Either } from "effect"
class ValidationError {
readonly _tag = "ValidationError"
constructor(
readonly field: string,
readonly message: string
) {}
}
// Sink, который валидирует каждый элемент и собирает ошибки
const validatingSink = <A>(
validate: (a: A) => Either.Either<A, ValidationError>
) =>
Sink.foldLeft(
{ valid: [] as ReadonlyArray<A>, errors: [] as ReadonlyArray<ValidationError> },
(state, element: A) => {
const result = validate(element)
return Either.isRight(result)
? { ...state, valid: [...state.valid, result.right] }
: { ...state, errors: [...state.errors, result.left] }
}
)
// Использование
interface UserInput {
readonly name: string
readonly age: number
}
const validateUser = (input: UserInput): Either.Either<UserInput, ValidationError> => {
if (input.name.length < 2) {
return Either.left(new ValidationError("name", `Too short: ${input.name}`))
}
if (input.age < 0 || input.age > 150) {
return Either.left(new ValidationError("age", `Invalid age: ${input.age}`))
}
return Either.right(input)
}
const userStream = Stream.make(
{ name: "Alice", age: 30 },
{ name: "B", age: 25 }, // name too short
{ name: "Charlie", age: -5 }, // invalid age
{ name: "Diana", age: 28 }
)
Effect.runPromise(
Stream.run(userStream, validatingSink(validateUser))
).then((result) => {
console.log(`Valid: ${result.valid.length}`)
console.log(`Errors: ${result.errors.length}`)
result.errors.forEach((e) =>
console.log(` ${e.field}: ${e.message}`)
)
})
// Output:
// Valid: 2
// Errors: 2
// name: Too short: B
// age: Invalid age: -5
Паттерн: Sink с rate-limiting
import { Stream, Sink, Effect, Ref } from "effect"
// Sink, который замедляет потребление до N элементов в секунду
const rateLimitedSink = <A>(
elementsPerSecond: number,
innerSink: Sink.Sink<void, A, never, never, never>
) =>
Sink.unwrap(
Effect.gen(function* () {
const intervalMs = Math.floor(1000 / elementsPerSecond)
return Sink.forEach((element: A) =>
Effect.gen(function* () {
yield* Effect.sleep(`${intervalMs} millis`)
// Делегируем обработку внутреннему потребителю
yield* Stream.run(Stream.make(element), innerSink)
})
)
})
)
Паттерн: Composable Sink через pipe
import { Stream, Sink, Effect, pipe } from "effect"
// Строим сложный Sink пошагово через pipe
const createAnalyticsSink = () =>
pipe(
Sink.foldLeft(
{
count: 0,
sum: 0,
min: Infinity,
max: -Infinity,
values: [] as ReadonlyArray<number>
},
(state, n: number) => ({
count: state.count + 1,
sum: state.sum + n,
min: Math.min(state.min, n),
max: Math.max(state.max, n),
values: [...state.values, n]
})
),
Sink.map((state) => {
const avg = state.count > 0 ? state.sum / state.count : 0
const sorted = [...state.values].sort((a, b) => a - b)
const median = sorted.length > 0
? sorted[Math.floor(sorted.length / 2)] ?? 0
: 0
return {
count: state.count,
sum: state.sum,
avg,
min: state.min,
max: state.max,
median,
range: state.max - state.min
}
})
)
const dataStream = Stream.make(5, 2, 8, 1, 9, 3, 7, 4, 6, 10)
Effect.runPromise(
Stream.run(dataStream, createAnalyticsSink())
).then(console.log)
// Output: { count: 10, sum: 55, avg: 5.5, min: 1, max: 10, median: 6, range: 9 }
API Reference
Конструкторы пользовательских Sink
| Функция | Описание |
|---|---|
Sink.foldLeft(init, f) | Полная свёртка без остановки |
Sink.fold(init, cont, f) | Свёртка с предикатом продолжения |
Sink.foldUntil(init, max, f) | Свёртка до N элементов |
Sink.foldWeighted({...}) | Свёртка по весам |
Sink.forEach(f) | Эффективная обработка каждого элемента |
Sink.fromEffect(effect) | Sink из единичного Effect |
Sink.unwrap(effect) | Sink из Effect, возвращающего Sink |
Sink.unwrapScoped(effect) | Sink с управлением ресурсами |
Sink.succeed(value) | Немедленный успех |
Sink.fail(error) | Немедленная ошибка |
Модификаторы
| Функция | Описание |
|---|---|
Sink.map(f) | Трансформация результата |
Sink.mapEffect(f) | Эффективная трансформация результата |
Sink.mapInput(f) | Трансформация входа |
Sink.filterInput(pred) | Фильтрация входа |
Sink.dimap({onInput, onDone}) | Трансформация входа и выхода |
Примеры
💻 Пример 1: Real-time metrics aggregator
import { Stream, Sink, Effect, Ref, Chunk, Schedule } from "effect"
interface MetricEvent {
readonly name: string
readonly value: number
readonly timestamp: number
}
interface AggregatedMetrics {
readonly byName: Record<string, {
readonly count: number
readonly sum: number
readonly avg: number
readonly min: number
readonly max: number
}>
readonly totalEvents: number
}
const metricsAggregator = Sink.unwrap(
Effect.gen(function* () {
const stateRef = yield* Ref.make<Map<string, {
count: number
sum: number
min: number
max: number
}>>(new Map())
const totalRef = yield* Ref.make(0)
return Sink.forEach((event: MetricEvent) =>
Effect.gen(function* () {
yield* Ref.update(stateRef, (map) => {
const existing = map.get(event.name)
const updated = existing
? {
count: existing.count + 1,
sum: existing.sum + event.value,
min: Math.min(existing.min, event.value),
max: Math.max(existing.max, event.value)
}
: {
count: 1,
sum: event.value,
min: event.value,
max: event.value
}
return new Map(map).set(event.name, updated)
})
yield* Ref.update(totalRef, (t) => t + 1)
})
).pipe(
Sink.mapEffect(() =>
Effect.gen(function* () {
const state = yield* Ref.get(stateRef)
const total = yield* Ref.get(totalRef)
const byName: Record<string, any> = {}
for (const [name, data] of state) {
byName[name] = {
...data,
avg: data.count > 0 ? data.sum / data.count : 0
}
}
return { byName, totalEvents: total } satisfies AggregatedMetrics
})
)
)
})
)
const eventStream: Stream.Stream<MetricEvent> = Stream.make(
{ name: "latency", value: 45, timestamp: 1 },
{ name: "latency", value: 52, timestamp: 2 },
{ name: "throughput", value: 1000, timestamp: 3 },
{ name: "latency", value: 38, timestamp: 4 },
{ name: "throughput", value: 1200, timestamp: 5 },
{ name: "errors", value: 2, timestamp: 6 }
)
Effect.runPromise(Stream.run(eventStream, metricsAggregator)).then((m) => {
console.log(`Total events: ${m.totalEvents}`)
Object.entries(m.byName).forEach(([name, data]) => {
console.log(` ${name}: avg=${data.avg.toFixed(1)}, min=${data.min}, max=${data.max}`)
})
})
// Output:
// Total events: 6
// latency: avg=45.0, min=38, max=52
// throughput: avg=1100.0, min=1000, max=1200
// errors: avg=2.0, min=2, max=2
💻 Пример 2: Chunked JSON writer
import { Stream, Sink, Effect, Chunk } from "effect"
// Sink, который формирует JSON Array из потока объектов
const jsonArraySink = <A>(): Sink.Sink<string, A, never, never, never> =>
Sink.collectAll<A>().pipe(
Sink.map((chunk) => {
const items = Chunk.toReadonlyArray(chunk)
return JSON.stringify(items, null, 2)
})
)
// Sink, который формирует NDJSON (newline-delimited JSON)
const ndjsonSink = <A>(): Sink.Sink<string, A, never, never, never> =>
Sink.foldLeft(
[] as ReadonlyArray<string>,
(lines, element: A) => [...lines, JSON.stringify(element)]
).pipe(
Sink.map((lines) => lines.join("\n"))
)
interface User {
readonly id: number
readonly name: string
}
const users = Stream.make(
{ id: 1, name: "Alice" },
{ id: 2, name: "Bob" },
{ id: 3, name: "Charlie" }
)
Effect.runPromise(Stream.run(users, ndjsonSink())).then(console.log)
// Output:
// {"id":1,"name":"Alice"}
// {"id":2,"name":"Bob"}
// {"id":3,"name":"Charlie"}
Упражнения
🟢 Basic
Упражнение 1: MinMax Sink
Создайте пользовательский Sink minMax, который за один проход вычисляет минимум и максимум потока числовых значений.
import { Stream, Sink, Effect } from "effect"
const minMax: Sink.Sink<
{ readonly min: number; readonly max: number },
number,
never,
never,
never
> = // Ваш код здесь
const stream = Stream.make(5, 2, 8, 1, 9, 3, 7)
Решение:
import { Stream, Sink, Effect } from "effect"
const minMax: Sink.Sink<
{ readonly min: number; readonly max: number },
number,
never,
never,
never
> = Sink.foldLeft(
{ min: Infinity, max: -Infinity },
(state, n: number) => ({
min: Math.min(state.min, n),
max: Math.max(state.max, n)
})
)
const stream = Stream.make(5, 2, 8, 1, 9, 3, 7)
Effect.runPromise(Stream.run(stream, minMax)).then(console.log)
// Output: { min: 1, max: 9 }
Упражнение 2: Top-N Sink
Создайте Sink, который находит N наибольших элементов потока.
Решение:
import { Stream, Sink, Effect } from "effect"
const topN = (n: number): Sink.Sink<ReadonlyArray<number>, number, never, never, never> =>
Sink.foldLeft(
[] as ReadonlyArray<number>,
(top, value: number) => {
const withNew = [...top, value].sort((a, b) => b - a)
return withNew.slice(0, n)
}
)
const stream = Stream.make(5, 2, 8, 1, 9, 3, 7, 4, 10, 6)
Effect.runPromise(Stream.run(stream, topN(3))).then(console.log)
// Output: [10, 9, 8]
🟡 Intermediate
Упражнение 3: Sink с дедупликацией
Создайте Sink, который собирает элементы потока, но пропускает последовательные дубликаты (аналогично Unix uniq).
Решение:
import { Stream, Sink, Effect } from "effect"
const uniqueConsecutive = <A>(): Sink.Sink<ReadonlyArray<A>, A, never, never, never> =>
Sink.foldLeft(
{ result: [] as ReadonlyArray<A>, last: undefined as A | undefined },
(state, element: A) => {
if (state.last === element) {
return state // Пропускаем дубликат
}
return {
result: [...state.result, element],
last: element
}
}
).pipe(
Sink.map((state) => state.result)
)
const stream = Stream.make(1, 1, 2, 2, 2, 3, 1, 1, 4, 4, 3, 3)
Effect.runPromise(Stream.run(stream, uniqueConsecutive())).then(console.log)
// Output: [1, 2, 3, 1, 4, 3]
Упражнение 4: Sink с таймаутом состояния
Создайте Sink, который аккумулирует числа, но завершается если сумма превышает заданный лимит ИЛИ обработано более N элементов. Вычислите финальную статистику.
Решение:
import { Stream, Sink, Effect } from "effect"
interface BoundedResult {
readonly sum: number
readonly count: number
readonly reason: "limit_reached" | "count_exceeded" | "stream_ended"
}
const boundedAccumulator = (
sumLimit: number,
maxCount: number
): Sink.Sink<BoundedResult, number, number, never, never> =>
Sink.fold(
{ sum: 0, count: 0, reason: "stream_ended" as const } satisfies BoundedResult,
(state) => state.sum < sumLimit && state.count < maxCount,
(state, n: number) => {
const newSum = state.sum + n
const newCount = state.count + 1
return {
sum: newSum,
count: newCount,
reason: newSum >= sumLimit
? "limit_reached" as const
: newCount >= maxCount
? "count_exceeded" as const
: "stream_ended" as const
}
}
)
const stream = Stream.iterate(1, (n) => n + 1) // 1, 2, 3, 4, ...
Effect.runPromise(
Stream.run(stream, boundedAccumulator(50, 20))
).then(console.log)
// Output: { sum: 55, count: 10, reason: "limit_reached" }
// (1+2+...+10 = 55 >= 50)
🔴 Advanced
Упражнение 5: Event Sourcing Sink
Реализуйте Sink в паттерне Event Sourcing: принимает поток событий, применяет их к начальному состоянию и возвращает финальный snapshot.
import { Stream, Sink, Effect } from "effect"
// Domain events
type AccountEvent =
| { readonly _tag: "Opened"; readonly initialBalance: number }
| { readonly _tag: "Deposited"; readonly amount: number }
| { readonly _tag: "Withdrawn"; readonly amount: number }
| { readonly _tag: "Closed" }
// State
interface AccountState {
readonly balance: number
readonly isOpen: boolean
readonly transactionCount: number
}
// Ваш код: создайте Sink, обрабатывающий поток AccountEvent
Решение:
import { Stream, Sink, Effect } from "effect"
type AccountEvent =
| { readonly _tag: "Opened"; readonly initialBalance: number }
| { readonly _tag: "Deposited"; readonly amount: number }
| { readonly _tag: "Withdrawn"; readonly amount: number }
| { readonly _tag: "Closed" }
interface AccountState {
readonly balance: number
readonly isOpen: boolean
readonly transactionCount: number
}
const initialState: AccountState = {
balance: 0,
isOpen: false,
transactionCount: 0
}
const applyEvent = (state: AccountState, event: AccountEvent): AccountState => {
switch (event._tag) {
case "Opened":
return {
balance: event.initialBalance,
isOpen: true,
transactionCount: state.transactionCount + 1
}
case "Deposited":
return state.isOpen
? {
...state,
balance: state.balance + event.amount,
transactionCount: state.transactionCount + 1
}
: state
case "Withdrawn":
return state.isOpen && state.balance >= event.amount
? {
...state,
balance: state.balance - event.amount,
transactionCount: state.transactionCount + 1
}
: state
case "Closed":
return {
...state,
isOpen: false,
transactionCount: state.transactionCount + 1
}
}
}
// Event Sourcing Sink
const eventSourcingSink: Sink.Sink<AccountState, AccountEvent, never, never, never> =
Sink.foldLeft(initialState, applyEvent)
// С ранним завершением (остановка после закрытия)
const eventSourcingSinkWithEarlyStop: Sink.Sink<AccountState, AccountEvent, AccountEvent, never, never> =
Sink.fold(
initialState,
(state) => state.isOpen || state.transactionCount === 0,
applyEvent
)
// Тест
const events: Stream.Stream<AccountEvent> = Stream.make(
{ _tag: "Opened", initialBalance: 1000 },
{ _tag: "Deposited", amount: 500 },
{ _tag: "Withdrawn", amount: 200 },
{ _tag: "Deposited", amount: 300 },
{ _tag: "Withdrawn", amount: 100 },
{ _tag: "Closed" }
)
Effect.runPromise(Stream.run(events, eventSourcingSink)).then(console.log)
// Output: { balance: 1500, isOpen: false, transactionCount: 6 }
🔗 Модуль завершён. Далее: Модуль 12: Channel — Двунаправленные потоки →