Effect Курс Запуск Effect

Запуск Effect

Effect — это описание вычисления, которое само по себе ничего не делает.

Теория

Модель выполнения Effect

Effect использует модель lazy evaluation — создание Effect не запускает вычисление. Запуск происходит только при вызове раннера:

┌────────────────────┐      ┌──────────────────────┐      ┌────────────────┐
│  Описание Effect   │ ───► │   Runtime System     │ ───► │    Результат   │
│  (программа)       │      │   (интерпретатор)    │      │    (Exit<A,E>) │
└────────────────────┘      └──────────────────────┘      └────────────────┘
         │                           │                            │
    Pure Data                   Execution                    Success/Failure

Классификация раннеров

КатегорияРаннерыХарактеристика
СинхронныеrunSync, runSyncExitБлокирующие, только для sync-эффектов
АсинхронныеrunPromise, runPromiseExitВозвращают Promise
ForkrunForkВозвращают Fiber для управления
CallbackrunCallbackДля интеграции с callback API

Правило “Entry Point”

В идеальном Effect-приложении раннер вызывается один раз в точке входа:


// ✅ Хорошо: один раннер в main
const main = Effect.gen(function* () {
  yield* Effect.log("Starting application")
  yield* businessLogic()
  yield* cleanup()
})

Effect.runPromise(main)

// ❌ Плохо: раннеры внутри бизнес-логики
const badPractice = async () => {
  const result = await Effect.runPromise(someEffect) // Анти-паттерн
  return result
}

Синхронные раннеры

Effect.runSync

Выполняет Effect синхронно и возвращает результат. Выбрасывает исключение при ошибке или асинхронном эффекте.


// Сигнатура:
// runSync: <A, E>(effect: Effect<A, E, never>) => A

// Успешное выполнение
const result = Effect.runSync(Effect.succeed(42))
console.log(result) // 42

// Синхронное вычисление
const computed = Effect.runSync(
  Effect.sync(() => Math.pow(2, 10))
)
console.log(computed) // 1024

// ⚠️ При ошибке выбрасывается FiberFailure
try {
  Effect.runSync(Effect.fail(new Error("oops")))
} catch (e) {
  console.log(e) // FiberFailure containing Error("oops")
}

// ⚠️ При асинхронном эффекте выбрасывается исключение
try {
  Effect.runSync(Effect.promise(() => Promise.resolve(42)))
} catch (e) {
  console.log(e) // AsyncFiberException
}

Effect.runSyncExit

Выполняет Effect синхронно и возвращает Exit<A, E> вместо выброса исключения:


// Сигнатура:
// runSyncExit: <A, E>(effect: Effect<A, E, never>) => Exit<A, E>

// Успешный результат
const successExit = Effect.runSyncExit(Effect.succeed(42))
if (Exit.isSuccess(successExit)) {
  console.log(successExit.value) // 42
}

// Ошибка как Exit
const failureExit = Effect.runSyncExit(Effect.fail("error"))
if (Exit.isFailure(failureExit)) {
  console.log(Cause.pretty(failureExit.cause))
}

// Pattern matching через Exit.match
const message = Exit.match(successExit, {
  onSuccess: (value) => `Success: ${value}`,
  onFailure: (cause) => `Failure: ${Cause.pretty(cause)}`
})

Когда использовать синхронные раннеры


// ✅ CLI утилиты с детерминированной логикой
const parseArgs = (args: ReadonlyArray<string>) =>
  Effect.sync(() => {
    // Парсинг аргументов — чисто синхронная операция
    return { verbose: args.includes("-v") }
  })

const cliMain = Effect.runSync(parseArgs(process.argv))

// ✅ Тесты с предсказуемым поведением

describe("Calculator", () => {
  it("adds numbers", () => {
    const result = Effect.runSync(
      Effect.succeed(2).pipe(Effect.map((x) => x + 2))
    )
    expect(result).toBe(4)
  })
})

// ✅ Инициализация при старте (blocking)
const loadStaticConfig = Effect.sync(() => ({
  port: parseInt(process.env.PORT ?? "3000", 10),
  host: process.env.HOST ?? "localhost"
}))

const config = Effect.runSync(loadStaticConfig)

Асинхронные раннеры

Effect.runPromise

Выполняет Effect и возвращает Promise<A>. При ошибке Promise отклоняется:


// Сигнатура:
// runPromise: <A, E>(effect: Effect<A, E, never>, options?: RunOptions) => Promise<A>

// Базовое использование
const result = await Effect.runPromise(Effect.succeed(42))
console.log(result) // 42

// Асинхронный эффект
const asyncResult = await Effect.runPromise(
  Effect.promise(() => fetch("/api/data").then(r => r.json()))
)

// Обработка ошибок через try/catch
try {
  await Effect.runPromise(Effect.fail(new Error("failed")))
} catch (error) {
  // error — это FiberFailure, содержащий Cause
  console.log(error)
}

// С options — AbortSignal
const controller = new AbortController()
const promise = Effect.runPromise(
  Effect.sleep("10 seconds").pipe(Effect.map(() => "done")),
  { signal: controller.signal }
)

// Через 1 секунду отменяем
setTimeout(() => controller.abort(), 1000)

Effect.runPromiseExit

Возвращает Promise<Exit<A, E>> — никогда не отклоняется:


// Сигнатура:
// runPromiseExit: <A, E>(effect: Effect<A, E, never>) => Promise<Exit<A, E>>

// Всегда успешный Promise
const exit = await Effect.runPromiseExit(Effect.fail("error"))
// Promise resolved (не rejected!)

if (Exit.isFailure(exit)) {
  const error = Cause.failureOption(exit.cause)
  console.log(error) // Option.some("error")
}

// Полезно для интеграции с системами, где rejection нежелателен
const safeApiCall = async <A, E>(effect: Effect.Effect<A, E>): Promise<{
  success: boolean
  data?: A
  error?: E
}> => {
  const exit = await Effect.runPromiseExit(effect)
  
  return Exit.match(exit, {
    onSuccess: (data) => ({ success: true, data }),
    onFailure: (cause) => ({
      success: false,
      error: Cause.failureOption(cause).pipe(
        (opt) => opt._tag === "Some" ? opt.value : undefined
      )
    })
  })
}

Effect.runFork

Запускает Effect в Fiber и возвращает управление немедленно:


// Сигнатура:
// runFork: <A, E>(effect: Effect<A, E, never>, options?: RunOptions) => RuntimeFiber<A, E>

// Запуск в background
const fiber = Effect.runFork(
  Effect.gen(function* () {
    yield* Effect.log("Starting long task...")
    yield* Effect.sleep("5 seconds")
    yield* Effect.log("Done!")
    return 42
  })
)

// Можно продолжить выполнение немедленно
console.log("Fiber started, continuing...")

// Позже — получить результат
const result = await Effect.runPromise(Fiber.join(fiber))
console.log(result) // 42

// Или прервать
await Effect.runPromise(Fiber.interrupt(fiber))

Effect.runCallback

Для интеграции с callback-based API:


// Сигнатура:
// runCallback: <A, E>(effect: Effect<A, E, never>, options?: RunCallbackOptions<A, E>) => Cancel

// Базовое использование
const cancel = Effect.runCallback(
  Effect.succeed(42),
  {
    onExit: (exit) => {
      if (Exit.isSuccess(exit)) {
        console.log("Success:", exit.value)
      } else {
        console.log("Failure:", Cause.pretty(exit.cause))
      }
    }
  }
)

// cancel() — отменяет выполнение

// Пример интеграции с Express middleware

const effectMiddleware = <A>(
  handler: (req: Request) => Effect.Effect<A>
) => (req: Request, res: Response, next: NextFunction) => {
  Effect.runCallback(handler(req), {
    onExit: (exit) => {
      if (Exit.isSuccess(exit)) {
        res.json(exit.value)
      } else {
        next(Cause.squash(exit.cause))
      }
    }
  })
}

Runtime System

Что такое Runtime

Runtime — это среда выполнения Effect, которая содержит:

  1. Context — сервисы и зависимости
  2. FiberRefs — локальные переменные файберов
  3. RuntimeFlags — флаги конфигурации

// Стандартный runtime
const defaultRuntime = Runtime.defaultRuntime

// Runtime с кастомным контекстом
class Logger extends Context.Tag("Logger")<Logger, {
  readonly log: (msg: string) => Effect.Effect<void>
}>() {}

const customRuntime = Runtime.make({
  context: Context.make(Logger, {
    log: (msg) => Effect.sync(() => console.log(`[LOG] ${msg}`))
  })
})

// Использование runtime
const program: Effect.Effect<void, never, Logger> = Effect.gen(function* () {
  const logger = yield* Logger
  yield* logger.log("Hello from custom runtime!")
})

// Запуск с кастомным runtime
Runtime.runPromise(customRuntime)(program)

ManagedRuntime

Для приложений с жизненным циклом — создание и уничтожение runtime:


// Определяем сервисы с ресурсами
class Database extends Context.Tag("Database")<Database, {
  readonly query: (sql: string) => Effect.Effect<unknown>
}>() {}

const DatabaseLive = Layer.scoped(
  Database,
  Effect.gen(function* () {
    // Acquire connection
    yield* Effect.log("Connecting to database...")
    const connection = { connected: true }
    
    // Добавляем финализатор
    yield* Effect.addFinalizer(() => 
      Effect.log("Disconnecting from database...")
    )
    
    return {
      query: (sql) => Effect.succeed({ sql, result: [] })
    }
  })
)

// Создаём managed runtime
const runtime = ManagedRuntime.make(DatabaseLive)

// Использование
const program = Effect.gen(function* () {
  const db = yield* Database
  const result = yield* db.query("SELECT * FROM users")
  yield* Effect.log(`Query result: ${JSON.stringify(result)}`)
})

// Запуск
await runtime.runPromise(program)

// Когда приложение завершается — освобождаем ресурсы
await runtime.dispose()
// Output: "Disconnecting from database..."

Runtime для тестов


// Сервис
class UserService extends Context.Tag("UserService")<UserService, {
  readonly getUser: (id: string) => Effect.Effect<{ name: string }>
}>() {}

// Production implementation
const UserServiceLive = Layer.succeed(UserService, {
  getUser: (id) => Effect.succeed({ name: `User ${id}` })
})

// Test implementation с mock
const UserServiceTest = Layer.succeed(UserService, {
  getUser: (id) => Effect.succeed({ name: `MockUser ${id}` })
})

// Бизнес-логика
const greetUser = (id: string): Effect.Effect<string, never, UserService> =>
  Effect.gen(function* () {
    const service = yield* UserService
    const user = yield* service.getUser(id)
    return `Hello, ${user.name}!`
  })

describe("greetUser", () => {
  it("returns greeting with user name", async () => {
    const testLayer = UserServiceTest
    const runtime = Runtime.make({
      context: await Effect.runPromise(
        Layer.toRuntime(testLayer).pipe(Effect.scoped)
      )
    })
    
    const result = await Runtime.runPromise(runtime)(greetUser("123"))
    expect(result).toBe("Hello, MockUser 123!")
  })
})

Exit и результаты выполнения

Структура Exit

Exit<A, E> — это результат выполнения Effect:


// Exit = Success<A> | Failure<E>

type Exit<A, E> = 
  | { readonly _tag: "Success"; readonly value: A }
  | { readonly _tag: "Failure"; readonly cause: Cause<E> }

Работа с Exit


const exit = await Effect.runPromiseExit(
  Effect.gen(function* () {
    const random = yield* Effect.sync(() => Math.random())
    if (random < 0.5) {
      yield* Effect.fail(new Error("Bad luck"))
    }
    return random
  })
)

// Проверка типа
if (Exit.isSuccess(exit)) {
  console.log("Value:", exit.value)
}

if (Exit.isFailure(exit)) {
  console.log("Cause:", Cause.pretty(exit.cause))
}

// Pattern matching
const message = Exit.match(exit, {
  onSuccess: (value) => `Random: ${value.toFixed(2)}`,
  onFailure: (cause) => {
    // Извлекаем ошибку из Cause
    const failure = Cause.failureOption(cause)
    return Option.match(failure, {
      onNone: () => "Unknown error",
      onSome: (error) => `Error: ${error.message}`
    })
  }
})

// Трансформация Exit
const mapped: Exit.Exit<string, Error> = Exit.map(exit, (n) => n.toString())

// Создание Exit вручную
const success: Exit.Exit<number, never> = Exit.succeed(42)
const failure: Exit.Exit<never, Error> = Exit.fail(new Error("manual failure"))

Cause — детальная информация об ошибке


// Cause может содержать:
// - Fail<E> — типизированная ошибка
// - Die — дефект (unexpected error)
// - Interrupt — прерывание
// - Sequential — последовательные ошибки
// - Parallel — параллельные ошибки

const complexExit = await Effect.runPromiseExit(
  Effect.all([
    Effect.fail("error1"),
    Effect.fail("error2")
  ], { concurrency: "unbounded" })
)

if (Exit.isFailure(complexExit)) {
  // Анализ Cause
  const failures = Cause.failures(complexExit.cause)
  console.log("All failures:", [...failures]) // ["error1", "error2"]
  
  const defects = Cause.defects(complexExit.cause)
  console.log("Defects:", [...defects])
  
  // Красивый вывод
  console.log(Cause.pretty(complexExit.cause))
}

Bun-специфичные паттерны

Entry Point для Bun

// src/main.ts

// Создаём managed runtime с полным графом зависимостей
const runtime = ManagedRuntime.make(AppLayer)

// Graceful shutdown
const shutdown = async () => {
  console.log("Shutting down...")
  await runtime.dispose()
  process.exit(0)
}

process.on("SIGINT", shutdown)
process.on("SIGTERM", shutdown)

// Запуск
const main = async () => {
  try {
    await runtime.runPromise(mainProgram)
  } catch (error) {
    console.error("Application error:", error)
    await runtime.dispose()
    process.exit(1)
  }
}

main()

Bun.serve с Effect


// HTTP Handler сервис
class HttpHandler extends Context.Tag("HttpHandler")<HttpHandler, {
  readonly handle: (req: Request) => Effect.Effect<Response>
}>() {}

const HttpHandlerLive = Layer.succeed(HttpHandler, {
  handle: (req) => Effect.gen(function* () {
    const url = new URL(req.url)
    
    if (url.pathname === "/health") {
      return new Response("OK")
    }
    
    if (url.pathname === "/api/data") {
      yield* Effect.sleep("100 millis") // Simulate work
      return Response.json({ message: "Hello from Effect!" })
    }
    
    return new Response("Not Found", { status: 404 })
  })
})

// Runtime
const runtime = ManagedRuntime.make(HttpHandlerLive)

// Bun.serve
const server = Bun.serve({
  port: 3000,
  async fetch(req) {
    const handler = await runtime.runPromise(HttpHandler)
    const exit = await runtime.runPromiseExit(handler.handle(req))
    
    return Exit.match(exit, {
      onSuccess: (response) => response,
      onFailure: () => new Response("Internal Error", { status: 500 })
    })
  }
})

console.log(`Server running at http://localhost:${server.port}`)

// Cleanup
process.on("SIGINT", async () => {
  server.stop()
  await runtime.dispose()
  process.exit(0)
})

Bun Worker с Effect

// worker.ts

// Определяем сообщения
interface WorkerMessage {
  readonly type: "task"
  readonly payload: string
}

interface WorkerResponse {
  readonly type: "result"
  readonly data: string
}

// Worker effect
const workerEffect = Effect.gen(function* () {
  const messageQueue = yield* Queue.unbounded<WorkerMessage>()
  
  // Listener для сообщений от main thread
  self.onmessage = (event: MessageEvent<WorkerMessage>) => {
    Effect.runSync(Queue.offer(messageQueue, event.data))
  }
  
  // Процессинг сообщений
  yield* Effect.forever(
    Effect.gen(function* () {
      const message = yield* Queue.take(messageQueue)
      
      // Обработка
      yield* Effect.sleep("100 millis")
      const result = `Processed: ${message.payload.toUpperCase()}`
      
      // Отправка результата
      const response: WorkerResponse = { type: "result", data: result }
      self.postMessage(response)
    })
  )
})

Effect.runFork(workerEffect)

Примеры

Пример 1: CLI приложение


// Определяем CLI программу
const cliProgram = Effect.gen(function* () {
  const args = process.argv.slice(2)
  
  if (args.length === 0) {
    yield* Console.log("Usage: bun run cli.ts <command>")
    return yield* Effect.fail("No command provided")
  }
  
  const [command, ...rest] = args
  
  switch (command) {
    case "greet":
      const name = rest[0] ?? "World"
      yield* Console.log(`Hello, ${name}!`)
      return 0
    
    case "add":
      const [a, b] = rest.map(Number)
      if (isNaN(a) || isNaN(b)) {
        return yield* Effect.fail("Invalid numbers")
      }
      yield* Console.log(`${a} + ${b} = ${a + b}`)
      return 0
    
    default:
      yield* Console.error(`Unknown command: ${command}`)
      return yield* Effect.fail(`Unknown command: ${command}`)
  }
})

// Запуск с правильной обработкой exit code
const main = async () => {
  const exit = await Effect.runPromiseExit(cliProgram)
  
  Exit.match(exit, {
    onSuccess: (code) => process.exit(code),
    onFailure: (cause) => {
      const error = Cause.failureOption(cause)
      if (error._tag === "Some") {
        console.error(`Error: ${error.value}`)
      }
      process.exit(1)
    }
  })
}

main()

Пример 2: Параллельные задачи с timeout


// Задача с симуляцией работы
const task = (id: number, duration: number) =>
  Effect.gen(function* () {
    yield* Effect.log(`Task ${id} started`)
    yield* Effect.sleep(Duration.millis(duration))
    yield* Effect.log(`Task ${id} completed`)
    return id * 10
  })

// Запуск параллельно с таймаутом
const parallelWithTimeout = Effect.gen(function* () {
  // Fork всех задач
  const fibers = yield* Effect.all([
    Effect.fork(task(1, 100)),
    Effect.fork(task(2, 200)),
    Effect.fork(task(3, 5000)) // Эта задача долгая
  ])
  
  // Ждём с таймаутом
  const timeout = Effect.sleep("300 millis").pipe(
    Effect.flatMap(() => Effect.log("Timeout reached!"))
  )
  
  yield* Effect.race([
    Effect.all(fibers.map(Fiber.join)),
    timeout
  ])
  
  // Прерываем все оставшиеся файберы
  yield* Effect.all(fibers.map(Fiber.interrupt))
  
  yield* Effect.log("All done")
})

Effect.runPromise(parallelWithTimeout)

Пример 3: Graceful HTTP Server


// Состояние сервера
interface ServerState {
  readonly isShuttingDown: boolean
  readonly activeRequests: number
}

const ServerState = Ref.make<ServerState>({
  isShuttingDown: false,
  activeRequests: 0
})

// HTTP обработчик с graceful shutdown
const handleRequest = (req: Request, state: Ref.Ref<ServerState>) =>
  Effect.gen(function* () {
    const current = yield* Ref.get(state)
    
    // Reject если shutting down
    if (current.isShuttingDown) {
      return new Response("Service Unavailable", { status: 503 })
    }
    
    // Increment active requests
    yield* Ref.update(state, (s) => ({ 
      ...s, 
      activeRequests: s.activeRequests + 1 
    }))
    
    try {
      // Обработка запроса
      yield* Effect.sleep("50 millis")
      return Response.json({ path: new URL(req.url).pathname })
    } finally {
      // Decrement active requests
      yield* Ref.update(state, (s) => ({
        ...s,
        activeRequests: s.activeRequests - 1
      }))
    }
  })

// Graceful shutdown
const gracefulShutdown = (state: Ref.Ref<ServerState>) =>
  Effect.gen(function* () {
    yield* Effect.log("Initiating graceful shutdown...")
    
    // Mark as shutting down
    yield* Ref.update(state, (s) => ({ ...s, isShuttingDown: true }))
    
    // Wait for active requests to complete
    yield* Effect.iterate(0, {
      while: () => true,
      body: () => 
        Effect.gen(function* () {
          const current = yield* Ref.get(state)
          if (current.activeRequests === 0) {
            yield* Effect.log("All requests completed")
            return -1 // Exit loop
          }
          yield* Effect.log(`Waiting for ${current.activeRequests} requests...`)
          yield* Effect.sleep("100 millis")
          return 0
        })
    }).pipe(Effect.timeout("10 seconds"))
    
    yield* Effect.log("Shutdown complete")
  })

// Main
const main = Effect.gen(function* () {
  const state = yield* ServerState
  
  const server = Bun.serve({
    port: 3000,
    async fetch(req) {
      const exit = await Effect.runPromiseExit(handleRequest(req, state))
      return Exit.match(exit, {
        onSuccess: (res) => res,
        onFailure: () => new Response("Error", { status: 500 })
      })
    }
  })
  
  yield* Effect.log(`Server running on port ${server.port}`)
  
  // Setup signal handlers
  const shutdownFiber = yield* Effect.fork(
    Effect.async<void>((resume) => {
      const handler = () => resume(Effect.void)
      process.on("SIGINT", handler)
      process.on("SIGTERM", handler)
    }).pipe(
      Effect.flatMap(() => gracefulShutdown(state)),
      Effect.flatMap(() => Effect.sync(() => {
        server.stop()
        process.exit(0)
      }))
    )
  )
  
  // Keep alive
  yield* Fiber.join(shutdownFiber)
})

Effect.runPromise(main)

Упражнения

Basic

Упражнение 3.1: Выбор раннера

Для каждого сценария выберите подходящий раннер:


// 1. Синхронное вычисление в тесте
const testCase = Effect.succeed(2 + 2)
// Раннер: ???

// 2. HTTP запрос в async handler
const httpCall = Effect.promise(() => fetch("/api"))
// Раннер: ???

// 3. Background task, который нужно отменить позже
const bgTask = Effect.forever(Effect.log("tick"))
// Раннер: ???

// 4. Получить Exit без исключений
const mayFail = Effect.fail("error")
// Раннер: ???

Решение


// 1. Effect.runSync — синхронный результат в тестах
const result1 = Effect.runSync(Effect.succeed(2 + 2))
// 4

// 2. Effect.runPromise — для async контекста
const result2 = await Effect.runPromise(Effect.promise(() => fetch("/api")))

// 3. Effect.runFork — получаем Fiber для управления
const fiber = Effect.runFork(Effect.forever(Effect.log("tick")))
// Позже: await Effect.runPromise(Fiber.interrupt(fiber))

// 4. Effect.runPromiseExit — Exit без исключений
const exit: Exit.Exit<never, string> = await Effect.runPromiseExit(Effect.fail("error"))

Упражнение 3.2: Обработка Exit

Напишите функцию, которая выполняет Effect и возвращает стандартизированный результат:


interface Result<A, E> {
  readonly success: boolean
  readonly data?: A
  readonly error?: E
  readonly isInterrupted: boolean
}

const executeEffect = <A, E>(
  effect: Effect.Effect<A, E>
): Promise<Result<A, E>> => /* ??? */

Решение


interface Result<A, E> {
  readonly success: boolean
  readonly data?: A
  readonly error?: E
  readonly isInterrupted: boolean
}

const executeEffect = async <A, E>(
  effect: Effect.Effect<A, E>
): Promise<Result<A, E>> => {
  const exit = await Effect.runPromiseExit(effect)
  
  return Exit.match(exit, {
    onSuccess: (data) => ({
      success: true,
      data,
      isInterrupted: false
    }),
    onFailure: (cause) => ({
      success: false,
      error: Option.getOrUndefined(Cause.failureOption(cause)),
      isInterrupted: Cause.isInterruptedOnly(cause)
    })
  })
}

// Тест
const test = async () => {
  const success = await executeEffect(Effect.succeed(42))
  console.log(success) // { success: true, data: 42, isInterrupted: false }
  
  const failure = await executeEffect(Effect.fail("oops"))
  console.log(failure) // { success: false, error: "oops", isInterrupted: false }
  
  const interrupted = await executeEffect(Effect.interrupt)
  console.log(interrupted) // { success: false, isInterrupted: true }
}

Intermediate

Упражнение 3.3: Custom Runtime

Создайте runtime с кастомным Logger сервисом:


// Logger сервис
class Logger extends Context.Tag("Logger")<Logger, {
  readonly info: (msg: string) => Effect.Effect<void>
  readonly error: (msg: string) => Effect.Effect<void>
}>() {}

// Реализуйте:
// 1. ConsoleLogger — выводит в console с timestamps
// 2. Создайте Runtime с этим logger
// 3. Запустите программу, которая использует Logger

const program: Effect.Effect<void, never, Logger> = /* ??? */

Решение


class Logger extends Context.Tag("Logger")<Logger, {
  readonly info: (msg: string) => Effect.Effect<void>
  readonly error: (msg: string) => Effect.Effect<void>
}>() {}

// Console Logger реализация
const ConsoleLogger = Layer.succeed(Logger, {
  info: (msg) => Effect.sync(() => {
    console.log(`[${new Date().toISOString()}] INFO: ${msg}`)
  }),
  error: (msg) => Effect.sync(() => {
    console.error(`[${new Date().toISOString()}] ERROR: ${msg}`)
  })
})

// Программа
const program: Effect.Effect<void, never, Logger> = Effect.gen(function* () {
  const logger = yield* Logger
  yield* logger.info("Application started")
  yield* logger.info("Processing data...")
  yield* logger.error("Something went wrong!")
  yield* logger.info("Recovered, continuing...")
})

// Создаём runtime
const createRuntime = async () => {
  const context = await Effect.runPromise(
    Layer.toRuntime(ConsoleLogger).pipe(Effect.scoped)
  )
  return Runtime.make({ context })
}

// Запуск
const main = async () => {
  const runtime = await createRuntime()
  await Runtime.runPromise(runtime)(program)
}

main()

Advanced

Упражнение 3.4: ManagedRuntime с ресурсами

Создайте ManagedRuntime для приложения с database connection pool:


// Database connection
interface Connection {
  readonly id: string
  readonly query: (sql: string) => Effect.Effect<unknown>
}

// Connection Pool сервис
class ConnectionPool extends Context.Tag("ConnectionPool")<ConnectionPool, {
  readonly acquire: Effect.Effect<Connection>
  readonly release: (conn: Connection) => Effect.Effect<void>
  readonly withConnection: <A, E>(
    use: (conn: Connection) => Effect.Effect<A, E>
  ) => Effect.Effect<A, E>
}>() {}

// Реализуйте:
// 1. ConnectionPoolLive Layer с пулом из 5 соединений
// 2. Логирование acquire/release
// 3. Graceful shutdown — ожидание освобождения всех соединений
// 4. Программу, которая делает 10 параллельных запросов

Решение


interface Connection {
  readonly id: string
  readonly query: (sql: string) => Effect.Effect<unknown>
}

class ConnectionPool extends Context.Tag("ConnectionPool")<ConnectionPool, {
  readonly acquire: Effect.Effect<Connection>
  readonly release: (conn: Connection) => Effect.Effect<void>
  readonly withConnection: <A, E>(
    use: (conn: Connection) => Effect.Effect<A, E>
  ) => Effect.Effect<A, E>
}>() {}

// Реализация пула
const ConnectionPoolLive = Layer.scoped(
  ConnectionPool,
  Effect.gen(function* () {
    const poolSize = 5
    const connections = yield* Queue.bounded<Connection>(poolSize)
    const activeConnections = yield* Ref.make(0)
    
    // Создаём соединения
    yield* Effect.forEach(
      Array.from({ length: poolSize }, (_, i) => i),
      (i) => {
        const conn: Connection = {
          id: `conn-${i}`,
          query: (sql) => Effect.gen(function* () {
            yield* Effect.sleep("50 millis") // Simulate query
            return { sql, rows: [] }
          })
        }
        return Queue.offer(connections, conn)
      },
      { discard: true }
    )
    
    yield* Effect.log(`Connection pool created with ${poolSize} connections`)
    
    // Финализатор — ждём освобождения
    yield* Effect.addFinalizer(() =>
      Effect.gen(function* () {
        yield* Effect.log("Shutting down connection pool...")
        
        // Ждём пока все соединения вернутся
        yield* Effect.iterate(0, {
          while: () => true,
          body: () =>
            Effect.gen(function* () {
              const active = yield* Ref.get(activeConnections)
              const available = yield* Queue.size(connections)
              
              if (available === poolSize) {
                yield* Effect.log("All connections returned")
                return -1 // Exit
              }
              
              yield* Effect.log(`Waiting: ${active} active, ${available} available`)
              yield* Effect.sleep("100 millis")
              return 0
            })
        }).pipe(Effect.timeout("5 seconds"))
        
        yield* Queue.shutdown(connections)
        yield* Effect.log("Connection pool shut down")
      })
    )
    
    return {
      acquire: Effect.gen(function* () {
        const conn = yield* Queue.take(connections)
        yield* Ref.update(activeConnections, (n) => n + 1)
        yield* Effect.log(`Acquired ${conn.id}`)
        return conn
      }),
      
      release: (conn) =>
        Effect.gen(function* () {
          yield* Queue.offer(connections, conn)
          yield* Ref.update(activeConnections, (n) => n - 1)
          yield* Effect.log(`Released ${conn.id}`)
        }),
      
      withConnection: <A, E>(use: (conn: Connection) => Effect.Effect<A, E>) =>
        Effect.acquireUseRelease(
          Effect.gen(function* () {
            const pool = yield* ConnectionPool
            return yield* pool.acquire
          }),
          use,
          (conn) =>
            Effect.gen(function* () {
              const pool = yield* ConnectionPool
              yield* pool.release(conn)
            })
        )
    }
  })
)

// Программа
const program = Effect.gen(function* () {
  const pool = yield* ConnectionPool
  
  // 10 параллельных запросов с пулом из 5
  yield* Effect.all(
    Array.from({ length: 10 }, (_, i) =>
      pool.withConnection((conn) =>
        Effect.gen(function* () {
          yield* Effect.log(`Request ${i} using ${conn.id}`)
          const result = yield* conn.query(`SELECT * FROM table_${i}`)
          yield* Effect.sleep("100 millis") // Simulate work
          return result
        })
      )
    ),
    { concurrency: "unbounded" }
  )
  
  yield* Effect.log("All requests completed")
})

// Runtime
const runtime = ManagedRuntime.make(ConnectionPoolLive)

const main = async () => {
  await runtime.runPromise(program)
  await runtime.dispose()
}

main()

Ключевые выводы

  1. runSync — для чисто синхронных эффектов; выбрасывает исключение при ошибке
  2. runPromise — основной раннер для async кода; отклоняется при ошибке
  3. runPromiseExit — никогда не отклоняется; возвращает Exit
  4. runFork — запуск в Fiber для управления выполнением
  5. Runtime — кастомизация среды выполнения с сервисами
  6. ManagedRuntime — для приложений с ресурсами и lifecycle
  7. Один раннер в entry point — best practice для Effect-приложений