Запуск Effect
Effect — это описание вычисления, которое само по себе ничего не делает.
Теория
Модель выполнения Effect
Effect использует модель lazy evaluation — создание Effect не запускает вычисление. Запуск происходит только при вызове раннера:
┌────────────────────┐ ┌──────────────────────┐ ┌────────────────┐
│ Описание Effect │ ───► │ Runtime System │ ───► │ Результат │
│ (программа) │ │ (интерпретатор) │ │ (Exit<A,E>) │
└────────────────────┘ └──────────────────────┘ └────────────────┘
│ │ │
Pure Data Execution Success/Failure
Классификация раннеров
| Категория | Раннеры | Характеристика |
|---|---|---|
| Синхронные | runSync, runSyncExit | Блокирующие, только для sync-эффектов |
| Асинхронные | runPromise, runPromiseExit | Возвращают Promise |
| Fork | runFork | Возвращают Fiber для управления |
| Callback | runCallback | Для интеграции с callback API |
Правило “Entry Point”
В идеальном Effect-приложении раннер вызывается один раз в точке входа:
// ✅ Хорошо: один раннер в main
const main = Effect.gen(function* () {
yield* Effect.log("Starting application")
yield* businessLogic()
yield* cleanup()
})
Effect.runPromise(main)
// ❌ Плохо: раннеры внутри бизнес-логики
const badPractice = async () => {
const result = await Effect.runPromise(someEffect) // Анти-паттерн
return result
}
Синхронные раннеры
Effect.runSync
Выполняет Effect синхронно и возвращает результат. Выбрасывает исключение при ошибке или асинхронном эффекте.
// Сигнатура:
// runSync: <A, E>(effect: Effect<A, E, never>) => A
// Успешное выполнение
const result = Effect.runSync(Effect.succeed(42))
console.log(result) // 42
// Синхронное вычисление
const computed = Effect.runSync(
Effect.sync(() => Math.pow(2, 10))
)
console.log(computed) // 1024
// ⚠️ При ошибке выбрасывается FiberFailure
try {
Effect.runSync(Effect.fail(new Error("oops")))
} catch (e) {
console.log(e) // FiberFailure containing Error("oops")
}
// ⚠️ При асинхронном эффекте выбрасывается исключение
try {
Effect.runSync(Effect.promise(() => Promise.resolve(42)))
} catch (e) {
console.log(e) // AsyncFiberException
}
Effect.runSyncExit
Выполняет Effect синхронно и возвращает Exit<A, E> вместо выброса исключения:
// Сигнатура:
// runSyncExit: <A, E>(effect: Effect<A, E, never>) => Exit<A, E>
// Успешный результат
const successExit = Effect.runSyncExit(Effect.succeed(42))
if (Exit.isSuccess(successExit)) {
console.log(successExit.value) // 42
}
// Ошибка как Exit
const failureExit = Effect.runSyncExit(Effect.fail("error"))
if (Exit.isFailure(failureExit)) {
console.log(Cause.pretty(failureExit.cause))
}
// Pattern matching через Exit.match
const message = Exit.match(successExit, {
onSuccess: (value) => `Success: ${value}`,
onFailure: (cause) => `Failure: ${Cause.pretty(cause)}`
})
Когда использовать синхронные раннеры
// ✅ CLI утилиты с детерминированной логикой
const parseArgs = (args: ReadonlyArray<string>) =>
Effect.sync(() => {
// Парсинг аргументов — чисто синхронная операция
return { verbose: args.includes("-v") }
})
const cliMain = Effect.runSync(parseArgs(process.argv))
// ✅ Тесты с предсказуемым поведением
describe("Calculator", () => {
it("adds numbers", () => {
const result = Effect.runSync(
Effect.succeed(2).pipe(Effect.map((x) => x + 2))
)
expect(result).toBe(4)
})
})
// ✅ Инициализация при старте (blocking)
const loadStaticConfig = Effect.sync(() => ({
port: parseInt(process.env.PORT ?? "3000", 10),
host: process.env.HOST ?? "localhost"
}))
const config = Effect.runSync(loadStaticConfig)
Асинхронные раннеры
Effect.runPromise
Выполняет Effect и возвращает Promise<A>. При ошибке Promise отклоняется:
// Сигнатура:
// runPromise: <A, E>(effect: Effect<A, E, never>, options?: RunOptions) => Promise<A>
// Базовое использование
const result = await Effect.runPromise(Effect.succeed(42))
console.log(result) // 42
// Асинхронный эффект
const asyncResult = await Effect.runPromise(
Effect.promise(() => fetch("/api/data").then(r => r.json()))
)
// Обработка ошибок через try/catch
try {
await Effect.runPromise(Effect.fail(new Error("failed")))
} catch (error) {
// error — это FiberFailure, содержащий Cause
console.log(error)
}
// С options — AbortSignal
const controller = new AbortController()
const promise = Effect.runPromise(
Effect.sleep("10 seconds").pipe(Effect.map(() => "done")),
{ signal: controller.signal }
)
// Через 1 секунду отменяем
setTimeout(() => controller.abort(), 1000)
Effect.runPromiseExit
Возвращает Promise<Exit<A, E>> — никогда не отклоняется:
// Сигнатура:
// runPromiseExit: <A, E>(effect: Effect<A, E, never>) => Promise<Exit<A, E>>
// Всегда успешный Promise
const exit = await Effect.runPromiseExit(Effect.fail("error"))
// Promise resolved (не rejected!)
if (Exit.isFailure(exit)) {
const error = Cause.failureOption(exit.cause)
console.log(error) // Option.some("error")
}
// Полезно для интеграции с системами, где rejection нежелателен
const safeApiCall = async <A, E>(effect: Effect.Effect<A, E>): Promise<{
success: boolean
data?: A
error?: E
}> => {
const exit = await Effect.runPromiseExit(effect)
return Exit.match(exit, {
onSuccess: (data) => ({ success: true, data }),
onFailure: (cause) => ({
success: false,
error: Cause.failureOption(cause).pipe(
(opt) => opt._tag === "Some" ? opt.value : undefined
)
})
})
}
Effect.runFork
Запускает Effect в Fiber и возвращает управление немедленно:
// Сигнатура:
// runFork: <A, E>(effect: Effect<A, E, never>, options?: RunOptions) => RuntimeFiber<A, E>
// Запуск в background
const fiber = Effect.runFork(
Effect.gen(function* () {
yield* Effect.log("Starting long task...")
yield* Effect.sleep("5 seconds")
yield* Effect.log("Done!")
return 42
})
)
// Можно продолжить выполнение немедленно
console.log("Fiber started, continuing...")
// Позже — получить результат
const result = await Effect.runPromise(Fiber.join(fiber))
console.log(result) // 42
// Или прервать
await Effect.runPromise(Fiber.interrupt(fiber))
Effect.runCallback
Для интеграции с callback-based API:
// Сигнатура:
// runCallback: <A, E>(effect: Effect<A, E, never>, options?: RunCallbackOptions<A, E>) => Cancel
// Базовое использование
const cancel = Effect.runCallback(
Effect.succeed(42),
{
onExit: (exit) => {
if (Exit.isSuccess(exit)) {
console.log("Success:", exit.value)
} else {
console.log("Failure:", Cause.pretty(exit.cause))
}
}
}
)
// cancel() — отменяет выполнение
// Пример интеграции с Express middleware
const effectMiddleware = <A>(
handler: (req: Request) => Effect.Effect<A>
) => (req: Request, res: Response, next: NextFunction) => {
Effect.runCallback(handler(req), {
onExit: (exit) => {
if (Exit.isSuccess(exit)) {
res.json(exit.value)
} else {
next(Cause.squash(exit.cause))
}
}
})
}
Runtime System
Что такое Runtime
Runtime — это среда выполнения Effect, которая содержит:
- Context — сервисы и зависимости
- FiberRefs — локальные переменные файберов
- RuntimeFlags — флаги конфигурации
// Стандартный runtime
const defaultRuntime = Runtime.defaultRuntime
// Runtime с кастомным контекстом
class Logger extends Context.Tag("Logger")<Logger, {
readonly log: (msg: string) => Effect.Effect<void>
}>() {}
const customRuntime = Runtime.make({
context: Context.make(Logger, {
log: (msg) => Effect.sync(() => console.log(`[LOG] ${msg}`))
})
})
// Использование runtime
const program: Effect.Effect<void, never, Logger> = Effect.gen(function* () {
const logger = yield* Logger
yield* logger.log("Hello from custom runtime!")
})
// Запуск с кастомным runtime
Runtime.runPromise(customRuntime)(program)
ManagedRuntime
Для приложений с жизненным циклом — создание и уничтожение runtime:
// Определяем сервисы с ресурсами
class Database extends Context.Tag("Database")<Database, {
readonly query: (sql: string) => Effect.Effect<unknown>
}>() {}
const DatabaseLive = Layer.scoped(
Database,
Effect.gen(function* () {
// Acquire connection
yield* Effect.log("Connecting to database...")
const connection = { connected: true }
// Добавляем финализатор
yield* Effect.addFinalizer(() =>
Effect.log("Disconnecting from database...")
)
return {
query: (sql) => Effect.succeed({ sql, result: [] })
}
})
)
// Создаём managed runtime
const runtime = ManagedRuntime.make(DatabaseLive)
// Использование
const program = Effect.gen(function* () {
const db = yield* Database
const result = yield* db.query("SELECT * FROM users")
yield* Effect.log(`Query result: ${JSON.stringify(result)}`)
})
// Запуск
await runtime.runPromise(program)
// Когда приложение завершается — освобождаем ресурсы
await runtime.dispose()
// Output: "Disconnecting from database..."
Runtime для тестов
// Сервис
class UserService extends Context.Tag("UserService")<UserService, {
readonly getUser: (id: string) => Effect.Effect<{ name: string }>
}>() {}
// Production implementation
const UserServiceLive = Layer.succeed(UserService, {
getUser: (id) => Effect.succeed({ name: `User ${id}` })
})
// Test implementation с mock
const UserServiceTest = Layer.succeed(UserService, {
getUser: (id) => Effect.succeed({ name: `MockUser ${id}` })
})
// Бизнес-логика
const greetUser = (id: string): Effect.Effect<string, never, UserService> =>
Effect.gen(function* () {
const service = yield* UserService
const user = yield* service.getUser(id)
return `Hello, ${user.name}!`
})
describe("greetUser", () => {
it("returns greeting with user name", async () => {
const testLayer = UserServiceTest
const runtime = Runtime.make({
context: await Effect.runPromise(
Layer.toRuntime(testLayer).pipe(Effect.scoped)
)
})
const result = await Runtime.runPromise(runtime)(greetUser("123"))
expect(result).toBe("Hello, MockUser 123!")
})
})
Exit и результаты выполнения
Структура Exit
Exit<A, E> — это результат выполнения Effect:
// Exit = Success<A> | Failure<E>
type Exit<A, E> =
| { readonly _tag: "Success"; readonly value: A }
| { readonly _tag: "Failure"; readonly cause: Cause<E> }
Работа с Exit
const exit = await Effect.runPromiseExit(
Effect.gen(function* () {
const random = yield* Effect.sync(() => Math.random())
if (random < 0.5) {
yield* Effect.fail(new Error("Bad luck"))
}
return random
})
)
// Проверка типа
if (Exit.isSuccess(exit)) {
console.log("Value:", exit.value)
}
if (Exit.isFailure(exit)) {
console.log("Cause:", Cause.pretty(exit.cause))
}
// Pattern matching
const message = Exit.match(exit, {
onSuccess: (value) => `Random: ${value.toFixed(2)}`,
onFailure: (cause) => {
// Извлекаем ошибку из Cause
const failure = Cause.failureOption(cause)
return Option.match(failure, {
onNone: () => "Unknown error",
onSome: (error) => `Error: ${error.message}`
})
}
})
// Трансформация Exit
const mapped: Exit.Exit<string, Error> = Exit.map(exit, (n) => n.toString())
// Создание Exit вручную
const success: Exit.Exit<number, never> = Exit.succeed(42)
const failure: Exit.Exit<never, Error> = Exit.fail(new Error("manual failure"))
Cause — детальная информация об ошибке
// Cause может содержать:
// - Fail<E> — типизированная ошибка
// - Die — дефект (unexpected error)
// - Interrupt — прерывание
// - Sequential — последовательные ошибки
// - Parallel — параллельные ошибки
const complexExit = await Effect.runPromiseExit(
Effect.all([
Effect.fail("error1"),
Effect.fail("error2")
], { concurrency: "unbounded" })
)
if (Exit.isFailure(complexExit)) {
// Анализ Cause
const failures = Cause.failures(complexExit.cause)
console.log("All failures:", [...failures]) // ["error1", "error2"]
const defects = Cause.defects(complexExit.cause)
console.log("Defects:", [...defects])
// Красивый вывод
console.log(Cause.pretty(complexExit.cause))
}
Bun-специфичные паттерны
Entry Point для Bun
// src/main.ts
// Создаём managed runtime с полным графом зависимостей
const runtime = ManagedRuntime.make(AppLayer)
// Graceful shutdown
const shutdown = async () => {
console.log("Shutting down...")
await runtime.dispose()
process.exit(0)
}
process.on("SIGINT", shutdown)
process.on("SIGTERM", shutdown)
// Запуск
const main = async () => {
try {
await runtime.runPromise(mainProgram)
} catch (error) {
console.error("Application error:", error)
await runtime.dispose()
process.exit(1)
}
}
main()
Bun.serve с Effect
// HTTP Handler сервис
class HttpHandler extends Context.Tag("HttpHandler")<HttpHandler, {
readonly handle: (req: Request) => Effect.Effect<Response>
}>() {}
const HttpHandlerLive = Layer.succeed(HttpHandler, {
handle: (req) => Effect.gen(function* () {
const url = new URL(req.url)
if (url.pathname === "/health") {
return new Response("OK")
}
if (url.pathname === "/api/data") {
yield* Effect.sleep("100 millis") // Simulate work
return Response.json({ message: "Hello from Effect!" })
}
return new Response("Not Found", { status: 404 })
})
})
// Runtime
const runtime = ManagedRuntime.make(HttpHandlerLive)
// Bun.serve
const server = Bun.serve({
port: 3000,
async fetch(req) {
const handler = await runtime.runPromise(HttpHandler)
const exit = await runtime.runPromiseExit(handler.handle(req))
return Exit.match(exit, {
onSuccess: (response) => response,
onFailure: () => new Response("Internal Error", { status: 500 })
})
}
})
console.log(`Server running at http://localhost:${server.port}`)
// Cleanup
process.on("SIGINT", async () => {
server.stop()
await runtime.dispose()
process.exit(0)
})
Bun Worker с Effect
// worker.ts
// Определяем сообщения
interface WorkerMessage {
readonly type: "task"
readonly payload: string
}
interface WorkerResponse {
readonly type: "result"
readonly data: string
}
// Worker effect
const workerEffect = Effect.gen(function* () {
const messageQueue = yield* Queue.unbounded<WorkerMessage>()
// Listener для сообщений от main thread
self.onmessage = (event: MessageEvent<WorkerMessage>) => {
Effect.runSync(Queue.offer(messageQueue, event.data))
}
// Процессинг сообщений
yield* Effect.forever(
Effect.gen(function* () {
const message = yield* Queue.take(messageQueue)
// Обработка
yield* Effect.sleep("100 millis")
const result = `Processed: ${message.payload.toUpperCase()}`
// Отправка результата
const response: WorkerResponse = { type: "result", data: result }
self.postMessage(response)
})
)
})
Effect.runFork(workerEffect)
Примеры
Пример 1: CLI приложение
// Определяем CLI программу
const cliProgram = Effect.gen(function* () {
const args = process.argv.slice(2)
if (args.length === 0) {
yield* Console.log("Usage: bun run cli.ts <command>")
return yield* Effect.fail("No command provided")
}
const [command, ...rest] = args
switch (command) {
case "greet":
const name = rest[0] ?? "World"
yield* Console.log(`Hello, ${name}!`)
return 0
case "add":
const [a, b] = rest.map(Number)
if (isNaN(a) || isNaN(b)) {
return yield* Effect.fail("Invalid numbers")
}
yield* Console.log(`${a} + ${b} = ${a + b}`)
return 0
default:
yield* Console.error(`Unknown command: ${command}`)
return yield* Effect.fail(`Unknown command: ${command}`)
}
})
// Запуск с правильной обработкой exit code
const main = async () => {
const exit = await Effect.runPromiseExit(cliProgram)
Exit.match(exit, {
onSuccess: (code) => process.exit(code),
onFailure: (cause) => {
const error = Cause.failureOption(cause)
if (error._tag === "Some") {
console.error(`Error: ${error.value}`)
}
process.exit(1)
}
})
}
main()
Пример 2: Параллельные задачи с timeout
// Задача с симуляцией работы
const task = (id: number, duration: number) =>
Effect.gen(function* () {
yield* Effect.log(`Task ${id} started`)
yield* Effect.sleep(Duration.millis(duration))
yield* Effect.log(`Task ${id} completed`)
return id * 10
})
// Запуск параллельно с таймаутом
const parallelWithTimeout = Effect.gen(function* () {
// Fork всех задач
const fibers = yield* Effect.all([
Effect.fork(task(1, 100)),
Effect.fork(task(2, 200)),
Effect.fork(task(3, 5000)) // Эта задача долгая
])
// Ждём с таймаутом
const timeout = Effect.sleep("300 millis").pipe(
Effect.flatMap(() => Effect.log("Timeout reached!"))
)
yield* Effect.race([
Effect.all(fibers.map(Fiber.join)),
timeout
])
// Прерываем все оставшиеся файберы
yield* Effect.all(fibers.map(Fiber.interrupt))
yield* Effect.log("All done")
})
Effect.runPromise(parallelWithTimeout)
Пример 3: Graceful HTTP Server
// Состояние сервера
interface ServerState {
readonly isShuttingDown: boolean
readonly activeRequests: number
}
const ServerState = Ref.make<ServerState>({
isShuttingDown: false,
activeRequests: 0
})
// HTTP обработчик с graceful shutdown
const handleRequest = (req: Request, state: Ref.Ref<ServerState>) =>
Effect.gen(function* () {
const current = yield* Ref.get(state)
// Reject если shutting down
if (current.isShuttingDown) {
return new Response("Service Unavailable", { status: 503 })
}
// Increment active requests
yield* Ref.update(state, (s) => ({
...s,
activeRequests: s.activeRequests + 1
}))
try {
// Обработка запроса
yield* Effect.sleep("50 millis")
return Response.json({ path: new URL(req.url).pathname })
} finally {
// Decrement active requests
yield* Ref.update(state, (s) => ({
...s,
activeRequests: s.activeRequests - 1
}))
}
})
// Graceful shutdown
const gracefulShutdown = (state: Ref.Ref<ServerState>) =>
Effect.gen(function* () {
yield* Effect.log("Initiating graceful shutdown...")
// Mark as shutting down
yield* Ref.update(state, (s) => ({ ...s, isShuttingDown: true }))
// Wait for active requests to complete
yield* Effect.iterate(0, {
while: () => true,
body: () =>
Effect.gen(function* () {
const current = yield* Ref.get(state)
if (current.activeRequests === 0) {
yield* Effect.log("All requests completed")
return -1 // Exit loop
}
yield* Effect.log(`Waiting for ${current.activeRequests} requests...`)
yield* Effect.sleep("100 millis")
return 0
})
}).pipe(Effect.timeout("10 seconds"))
yield* Effect.log("Shutdown complete")
})
// Main
const main = Effect.gen(function* () {
const state = yield* ServerState
const server = Bun.serve({
port: 3000,
async fetch(req) {
const exit = await Effect.runPromiseExit(handleRequest(req, state))
return Exit.match(exit, {
onSuccess: (res) => res,
onFailure: () => new Response("Error", { status: 500 })
})
}
})
yield* Effect.log(`Server running on port ${server.port}`)
// Setup signal handlers
const shutdownFiber = yield* Effect.fork(
Effect.async<void>((resume) => {
const handler = () => resume(Effect.void)
process.on("SIGINT", handler)
process.on("SIGTERM", handler)
}).pipe(
Effect.flatMap(() => gracefulShutdown(state)),
Effect.flatMap(() => Effect.sync(() => {
server.stop()
process.exit(0)
}))
)
)
// Keep alive
yield* Fiber.join(shutdownFiber)
})
Effect.runPromise(main)
Упражнения
Basic
Упражнение 3.1: Выбор раннера
Для каждого сценария выберите подходящий раннер:
// 1. Синхронное вычисление в тесте
const testCase = Effect.succeed(2 + 2)
// Раннер: ???
// 2. HTTP запрос в async handler
const httpCall = Effect.promise(() => fetch("/api"))
// Раннер: ???
// 3. Background task, который нужно отменить позже
const bgTask = Effect.forever(Effect.log("tick"))
// Раннер: ???
// 4. Получить Exit без исключений
const mayFail = Effect.fail("error")
// Раннер: ???
Решение
// 1. Effect.runSync — синхронный результат в тестах
const result1 = Effect.runSync(Effect.succeed(2 + 2))
// 4
// 2. Effect.runPromise — для async контекста
const result2 = await Effect.runPromise(Effect.promise(() => fetch("/api")))
// 3. Effect.runFork — получаем Fiber для управления
const fiber = Effect.runFork(Effect.forever(Effect.log("tick")))
// Позже: await Effect.runPromise(Fiber.interrupt(fiber))
// 4. Effect.runPromiseExit — Exit без исключений
const exit: Exit.Exit<never, string> = await Effect.runPromiseExit(Effect.fail("error"))
Упражнение 3.2: Обработка Exit
Напишите функцию, которая выполняет Effect и возвращает стандартизированный результат:
interface Result<A, E> {
readonly success: boolean
readonly data?: A
readonly error?: E
readonly isInterrupted: boolean
}
const executeEffect = <A, E>(
effect: Effect.Effect<A, E>
): Promise<Result<A, E>> => /* ??? */
Решение
interface Result<A, E> {
readonly success: boolean
readonly data?: A
readonly error?: E
readonly isInterrupted: boolean
}
const executeEffect = async <A, E>(
effect: Effect.Effect<A, E>
): Promise<Result<A, E>> => {
const exit = await Effect.runPromiseExit(effect)
return Exit.match(exit, {
onSuccess: (data) => ({
success: true,
data,
isInterrupted: false
}),
onFailure: (cause) => ({
success: false,
error: Option.getOrUndefined(Cause.failureOption(cause)),
isInterrupted: Cause.isInterruptedOnly(cause)
})
})
}
// Тест
const test = async () => {
const success = await executeEffect(Effect.succeed(42))
console.log(success) // { success: true, data: 42, isInterrupted: false }
const failure = await executeEffect(Effect.fail("oops"))
console.log(failure) // { success: false, error: "oops", isInterrupted: false }
const interrupted = await executeEffect(Effect.interrupt)
console.log(interrupted) // { success: false, isInterrupted: true }
}
Intermediate
Упражнение 3.3: Custom Runtime
Создайте runtime с кастомным Logger сервисом:
// Logger сервис
class Logger extends Context.Tag("Logger")<Logger, {
readonly info: (msg: string) => Effect.Effect<void>
readonly error: (msg: string) => Effect.Effect<void>
}>() {}
// Реализуйте:
// 1. ConsoleLogger — выводит в console с timestamps
// 2. Создайте Runtime с этим logger
// 3. Запустите программу, которая использует Logger
const program: Effect.Effect<void, never, Logger> = /* ??? */
Решение
class Logger extends Context.Tag("Logger")<Logger, {
readonly info: (msg: string) => Effect.Effect<void>
readonly error: (msg: string) => Effect.Effect<void>
}>() {}
// Console Logger реализация
const ConsoleLogger = Layer.succeed(Logger, {
info: (msg) => Effect.sync(() => {
console.log(`[${new Date().toISOString()}] INFO: ${msg}`)
}),
error: (msg) => Effect.sync(() => {
console.error(`[${new Date().toISOString()}] ERROR: ${msg}`)
})
})
// Программа
const program: Effect.Effect<void, never, Logger> = Effect.gen(function* () {
const logger = yield* Logger
yield* logger.info("Application started")
yield* logger.info("Processing data...")
yield* logger.error("Something went wrong!")
yield* logger.info("Recovered, continuing...")
})
// Создаём runtime
const createRuntime = async () => {
const context = await Effect.runPromise(
Layer.toRuntime(ConsoleLogger).pipe(Effect.scoped)
)
return Runtime.make({ context })
}
// Запуск
const main = async () => {
const runtime = await createRuntime()
await Runtime.runPromise(runtime)(program)
}
main()
Advanced
Упражнение 3.4: ManagedRuntime с ресурсами
Создайте ManagedRuntime для приложения с database connection pool:
// Database connection
interface Connection {
readonly id: string
readonly query: (sql: string) => Effect.Effect<unknown>
}
// Connection Pool сервис
class ConnectionPool extends Context.Tag("ConnectionPool")<ConnectionPool, {
readonly acquire: Effect.Effect<Connection>
readonly release: (conn: Connection) => Effect.Effect<void>
readonly withConnection: <A, E>(
use: (conn: Connection) => Effect.Effect<A, E>
) => Effect.Effect<A, E>
}>() {}
// Реализуйте:
// 1. ConnectionPoolLive Layer с пулом из 5 соединений
// 2. Логирование acquire/release
// 3. Graceful shutdown — ожидание освобождения всех соединений
// 4. Программу, которая делает 10 параллельных запросов
Решение
interface Connection {
readonly id: string
readonly query: (sql: string) => Effect.Effect<unknown>
}
class ConnectionPool extends Context.Tag("ConnectionPool")<ConnectionPool, {
readonly acquire: Effect.Effect<Connection>
readonly release: (conn: Connection) => Effect.Effect<void>
readonly withConnection: <A, E>(
use: (conn: Connection) => Effect.Effect<A, E>
) => Effect.Effect<A, E>
}>() {}
// Реализация пула
const ConnectionPoolLive = Layer.scoped(
ConnectionPool,
Effect.gen(function* () {
const poolSize = 5
const connections = yield* Queue.bounded<Connection>(poolSize)
const activeConnections = yield* Ref.make(0)
// Создаём соединения
yield* Effect.forEach(
Array.from({ length: poolSize }, (_, i) => i),
(i) => {
const conn: Connection = {
id: `conn-${i}`,
query: (sql) => Effect.gen(function* () {
yield* Effect.sleep("50 millis") // Simulate query
return { sql, rows: [] }
})
}
return Queue.offer(connections, conn)
},
{ discard: true }
)
yield* Effect.log(`Connection pool created with ${poolSize} connections`)
// Финализатор — ждём освобождения
yield* Effect.addFinalizer(() =>
Effect.gen(function* () {
yield* Effect.log("Shutting down connection pool...")
// Ждём пока все соединения вернутся
yield* Effect.iterate(0, {
while: () => true,
body: () =>
Effect.gen(function* () {
const active = yield* Ref.get(activeConnections)
const available = yield* Queue.size(connections)
if (available === poolSize) {
yield* Effect.log("All connections returned")
return -1 // Exit
}
yield* Effect.log(`Waiting: ${active} active, ${available} available`)
yield* Effect.sleep("100 millis")
return 0
})
}).pipe(Effect.timeout("5 seconds"))
yield* Queue.shutdown(connections)
yield* Effect.log("Connection pool shut down")
})
)
return {
acquire: Effect.gen(function* () {
const conn = yield* Queue.take(connections)
yield* Ref.update(activeConnections, (n) => n + 1)
yield* Effect.log(`Acquired ${conn.id}`)
return conn
}),
release: (conn) =>
Effect.gen(function* () {
yield* Queue.offer(connections, conn)
yield* Ref.update(activeConnections, (n) => n - 1)
yield* Effect.log(`Released ${conn.id}`)
}),
withConnection: <A, E>(use: (conn: Connection) => Effect.Effect<A, E>) =>
Effect.acquireUseRelease(
Effect.gen(function* () {
const pool = yield* ConnectionPool
return yield* pool.acquire
}),
use,
(conn) =>
Effect.gen(function* () {
const pool = yield* ConnectionPool
yield* pool.release(conn)
})
)
}
})
)
// Программа
const program = Effect.gen(function* () {
const pool = yield* ConnectionPool
// 10 параллельных запросов с пулом из 5
yield* Effect.all(
Array.from({ length: 10 }, (_, i) =>
pool.withConnection((conn) =>
Effect.gen(function* () {
yield* Effect.log(`Request ${i} using ${conn.id}`)
const result = yield* conn.query(`SELECT * FROM table_${i}`)
yield* Effect.sleep("100 millis") // Simulate work
return result
})
)
),
{ concurrency: "unbounded" }
)
yield* Effect.log("All requests completed")
})
// Runtime
const runtime = ManagedRuntime.make(ConnectionPoolLive)
const main = async () => {
await runtime.runPromise(program)
await runtime.dispose()
}
main()
Ключевые выводы
- runSync — для чисто синхронных эффектов; выбрасывает исключение при ошибке
- runPromise — основной раннер для async кода; отклоняется при ошибке
- runPromiseExit — никогда не отклоняется; возвращает Exit
- runFork — запуск в Fiber для управления выполнением
- Runtime — кастомизация среды выполнения с сервисами
- ManagedRuntime — для приложений с ресурсами и lifecycle
- Один раннер в entry point — best practice для Effect-приложений