SynchronizedRef
Эффективные обновления.
Теория
Ограничение обычного Ref
Ref.update принимает чистую функцию A => A:
// Ref.update требует чистую функцию
Ref.update(ref, (current) => current + 1) // ✅ OK
// Но что если нам нужно выполнить эффект внутри обновления?
Ref.update(ref, async (current) => {
const extra = await fetchFromDB() // ❌ Нельзя!
return current + extra
})
Это ограничение существует по важной причине: атомарность Ref.update основана на оптимистичном CAS (compare-and-swap), который может повторять функцию при конфликтах. Если функция имеет побочные эффекты, они выполнятся несколько раз.
Что такое SynchronizedRef?
SynchronizedRef<A> решает эту проблему через пессимистичную блокировку (семафор):
┌────────────────────────────────────────────────────────────┐
│ SynchronizedRef<A> │
├────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────┐ ┌──────────────────────────┐ │
│ │ Semaphore │────▶│ Внутренний Ref │ │
│ │ (permits: 1) │ │ value: A │ │
│ └──────────────────┘ └──────────────────────────┘ │
│ │ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────────────────┐ │
│ │ updateEffect(f: A => Effect<A>) │ │
│ │ │ │
│ │ 1. Acquire semaphore (блокировка) │ │
│ │ 2. Read current value │ │
│ │ 3. Execute effect: f(current) │ │
│ │ 4. Write new value │ │
│ │ 5. Release semaphore │ │
│ └─────────────────────────────────────────────────┘ │
│ │
│ ⚠️ Только ОДИН fiber может выполнять updateEffect │
│ в любой момент времени │
└────────────────────────────────────────────────────────────┘
Семантика блокировки
┌──────────────────────────────────────────────────────────────┐
│ Concurrent Updates to SynchronizedRef │
├──────────────────────────────────────────────────────────────┤
│ │
│ Fiber A Fiber B │
│ ─────── ─────── │
│ │ │ │
│ ├─ acquire semaphore ✓ │ │
│ │ ├─ acquire semaphore ⏳ │
│ ├─ read: 10 │ (waiting...) │
│ │ │ │
│ ├─ effect: fetchData() │ │
│ │ ... async work ... │ │
│ │ │ │
│ ├─ write: 15 │ │
│ ├─ release semaphore │ │
│ │ ├─ acquire semaphore ✓ │
│ │ ├─ read: 15 │
│ │ ├─ effect: compute() │
│ │ ├─ write: 20 │
│ │ ├─ release semaphore │
│ ▼ ▼ │
│ │
│ Гарантия: эффекты выполняются ПОСЛЕДОВАТЕЛЬНО │
│ значение КОНСИСТЕНТНО │
└──────────────────────────────────────────────────────────────┘
Когда использовать SynchronizedRef
| Сценарий | Ref | SynchronizedRef |
|---|---|---|
Чистые обновления A => A | ✅ Оптимально | ⚠️ Работает, но overhead |
Эффективные обновления A => Effect<A> | ❌ Невозможно | ✅ Единственный выбор |
| Высокая конкурентность, короткие операции | ✅ Лучше (CAS) | ⚠️ Возможен contention |
| Длинные асинхронные обновления | ❌ Невозможно | ✅ Необходимо |
| Read-heavy workload | ✅ Одинаково | ✅ Одинаково |
Проблема обычного Ref
Пример: Кэш с загрузкой данных
// ❌ Неправильный подход с Ref
const badCache = Effect.gen(function* () {
const cache = yield* Ref.make<Option.Option<string>>(Option.none())
const getData = Effect.gen(function* () {
const cached = yield* Ref.get(cache)
if (Option.isSome(cached)) {
return cached.value
}
// ПРОБЛЕМА: между get и set другой fiber может
// тоже обнаружить пустой кэш и начать загрузку!
const data = yield* fetchFromDatabase()
yield* Ref.set(cache, Option.some(data))
return data
})
// Два конкурентных вызова → две загрузки из БД!
yield* Effect.all([getData, getData], { concurrency: 2 })
})
// Симуляция загрузки из БД
const fetchFromDatabase = Effect.gen(function* () {
yield* Effect.log("Fetching from database...") // Увидим дважды!
yield* Effect.sleep("1 second")
return "expensive data"
})
Решение с SynchronizedRef
// ✅ Правильный подход с SynchronizedRef
const goodCache = Effect.gen(function* () {
const cache = yield* SynchronizedRef.make<Option.Option<string>>(
Option.none()
)
const getData = SynchronizedRef.updateEffect(cache, (cached) => {
if (Option.isSome(cached)) {
// Уже есть данные — ничего не делаем
return Effect.succeed(cached)
}
// Загружаем атомарно — другие файберы будут ждать
return Effect.gen(function* () {
yield* Effect.log("Fetching from database...")
yield* Effect.sleep("1 second")
return Option.some("expensive data")
})
})
// Два конкурентных вызова → ОДНА загрузка из БД
yield* Effect.all([getData, getData], { concurrency: 2 })
// Output: "Fetching from database..." (только один раз!)
})
API Reference
Создание SynchronizedRef
SynchronizedRef.make [STABLE]
Создаёт SynchronizedRef с начальным значением.
declare const make: <A>(value: A) => Effect.Effect<SynchronizedRef<A>>
const program = Effect.gen(function* () {
// SynchronizedRef с примитивным типом
const counter = yield* SynchronizedRef.make(0)
// SynchronizedRef со сложным типом
const state = yield* SynchronizedRef.make({
users: [] as ReadonlyArray<string>,
lastUpdated: Date.now()
})
})
Чтение и запись (унаследовано от Ref)
SynchronizedRef имеет все методы обычного Ref:
const basicOps = Effect.gen(function* () {
const ref = yield* SynchronizedRef.make(10)
// Чтение
const value = yield* SynchronizedRef.get(ref)
// Запись
yield* SynchronizedRef.set(ref, 20)
// Чистое обновление (как в Ref)
yield* SynchronizedRef.update(ref, (n) => n + 1)
// Чистая модификация
const old = yield* SynchronizedRef.getAndUpdate(ref, (n) => n * 2)
})
Эффективное обновление
SynchronizedRef.updateEffect [STABLE]
Главная операция — атомарное обновление с эффектом.
declare const updateEffect: <A, E, R>(
f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<void, E, R>
interface CacheEntry {
readonly data: string
readonly fetchedAt: number
}
const program = Effect.gen(function* () {
const cache = yield* SynchronizedRef.make<CacheEntry | null>(null)
// Обновление с эффектом — атомарно!
yield* SynchronizedRef.updateEffect(cache, (current) => {
// Проверяем актуальность кэша
if (current && Date.now() - current.fetchedAt < 60_000) {
return Effect.succeed(current) // Кэш свежий
}
// Загружаем новые данные
return Effect.gen(function* () {
const data = yield* fetchData()
return { data, fetchedAt: Date.now() }
})
})
})
const fetchData = Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return "fresh data"
})
SynchronizedRef.modifyEffect [STABLE]
Атомарная модификация с эффектом и возвратом результата.
declare const modifyEffect: <A, B, E, R>(
f: (a: A) => Effect.Effect<readonly [B, A], E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<B, E, R>
interface AccountState {
readonly balance: number
readonly transactions: ReadonlyArray<string>
}
const program = Effect.gen(function* () {
const account = yield* SynchronizedRef.make<AccountState>({
balance: 1000,
transactions: []
})
// Снятие денег с проверкой и логированием
const withdraw = (amount: number) =>
SynchronizedRef.modifyEffect(account, (state) => {
if (state.balance < amount) {
// Возвращаем ошибку
return Effect.fail(new Error("Insufficient funds"))
}
// Логируем транзакцию (побочный эффект!)
return Effect.gen(function* () {
yield* Effect.log(`Withdrawing ${amount}`)
const newState: AccountState = {
balance: state.balance - amount,
transactions: [...state.transactions, `Withdraw: ${amount}`]
}
return [true, newState] as const // [результат, новое состояние]
})
})
const success = yield* withdraw(300)
console.log(`Success: ${success}`) // true
const finalState = yield* SynchronizedRef.get(account)
console.log(`Balance: ${finalState.balance}`) // 700
})
SynchronizedRef.getAndUpdateEffect [STABLE]
Возвращает текущее значение и обновляет с эффектом.
declare const getAndUpdateEffect: <A, E, R>(
f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<A, E, R>
const program = Effect.gen(function* () {
const ref = yield* SynchronizedRef.make(10)
// Получаем старое значение, применяем асинхронное обновление
const oldValue = yield* SynchronizedRef.getAndUpdateEffect(
ref,
(current) => Effect.gen(function* () {
yield* Effect.sleep("100 millis")
return current * 2
})
)
console.log(oldValue) // 10
console.log(yield* SynchronizedRef.get(ref)) // 20
})
SynchronizedRef.updateAndGetEffect [STABLE]
Обновляет с эффектом и возвращает новое значение.
declare const updateAndGetEffect: <A, E, R>(
f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<A, E, R>
const program = Effect.gen(function* () {
const ref = yield* SynchronizedRef.make(10)
const newValue = yield* SynchronizedRef.updateAndGetEffect(
ref,
(current) => Effect.gen(function* () {
const multiplier = yield* getMultiplier()
return current * multiplier
})
)
console.log(newValue) // Новое значение после обновления
})
const getMultiplier = Effect.succeed(3)
Примеры
Ленивый кэш с TTL
interface CacheEntry<A> {
readonly value: A
readonly expiresAt: number
}
interface LazyCache<A> {
readonly get: Effect.Effect<A>
readonly invalidate: Effect.Effect<void>
}
const makeCache = <A, E, R>(
load: Effect.Effect<A, E, R>,
ttl: Duration.DurationInput
): Effect.Effect<LazyCache<A>, never, R> =>
Effect.gen(function* () {
const ref = yield* SynchronizedRef.make<Option.Option<CacheEntry<A>>>(
Option.none()
)
const ttlMs = Duration.toMillis(Duration.decode(ttl))
return {
get: SynchronizedRef.modifyEffect(ref, (cached) => {
const now = Date.now()
// Проверяем наличие и актуальность
if (Option.isSome(cached) && cached.value.expiresAt > now) {
return Effect.succeed([cached.value.value, cached] as const)
}
// Загружаем новые данные
return Effect.gen(function* () {
const value = yield* load
const entry: CacheEntry<A> = {
value,
expiresAt: now + ttlMs
}
return [value, Option.some(entry)] as const
})
}),
invalidate: SynchronizedRef.set(ref, Option.none())
}
})
// Использование
const program = Effect.gen(function* () {
let fetchCount = 0
const expensiveFetch = Effect.gen(function* () {
fetchCount++
yield* Effect.log(`Fetching... (call #${fetchCount})`)
yield* Effect.sleep("500 millis")
return { data: "important", fetchedAt: Date.now() }
})
const cache = yield* makeCache(expensiveFetch, "5 seconds")
// Первый вызов — загрузка
const data1 = yield* cache.get
yield* Effect.log(`Got: ${JSON.stringify(data1)}`)
// Второй вызов — из кэша
const data2 = yield* cache.get
yield* Effect.log(`Got again: ${JSON.stringify(data2)}`)
// Инвалидация
yield* cache.invalidate
// После инвалидации — снова загрузка
const data3 = yield* cache.get
yield* Effect.log(`After invalidate: ${JSON.stringify(data3)}`)
console.log(`Total fetches: ${fetchCount}`) // 2
})
Effect.runPromise(program)
Rate Limiter с SynchronizedRef
interface RateLimiter {
readonly acquire: Effect.Effect<void>
}
const makeRateLimiter = (
maxRequests: number,
windowMs: number
): Effect.Effect<RateLimiter> =>
Effect.gen(function* () {
interface State {
readonly tokens: number
readonly lastRefill: number
}
const ref = yield* SynchronizedRef.make<State>({
tokens: maxRequests,
lastRefill: Date.now()
})
return {
acquire: SynchronizedRef.updateEffect(ref, (state) =>
Effect.gen(function* () {
const now = Date.now()
const elapsed = now - state.lastRefill
// Пополняем токены если прошло время
let newTokens = state.tokens
let newLastRefill = state.lastRefill
if (elapsed >= windowMs) {
const periods = Math.floor(elapsed / windowMs)
newTokens = Math.min(
maxRequests,
state.tokens + periods * maxRequests
)
newLastRefill = state.lastRefill + periods * windowMs
}
// Проверяем доступность токена
if (newTokens > 0) {
return {
tokens: newTokens - 1,
lastRefill: newLastRefill
}
}
// Ждём до следующего окна
const waitTime = windowMs - (now - newLastRefill)
yield* Effect.sleep(waitTime)
// После ожидания — рекурсивно пробуем снова
// (через updateEffect — атомарно!)
return {
tokens: maxRequests - 1,
lastRefill: Date.now()
}
})
)
}
})
// Использование
const program = Effect.gen(function* () {
// Максимум 3 запроса в секунду
const limiter = yield* makeRateLimiter(3, 1000)
// Симулируем 10 запросов
yield* Effect.forEach(
Array.from({ length: 10 }, (_, i) => i),
(i) =>
Effect.gen(function* () {
yield* limiter.acquire
yield* Effect.log(`Request ${i + 1} processed`)
}),
{ concurrency: "unbounded" }
)
})
Effect.runPromise(program)
Connection Pool State
interface Connection {
readonly id: string
readonly createdAt: number
}
interface PoolState {
readonly available: ReadonlyArray<Connection>
readonly inUse: ReadonlyArray<Connection>
readonly maxSize: number
}
interface ConnectionPool {
readonly acquire: Effect.Effect<Connection>
readonly release: (conn: Connection) => Effect.Effect<void>
readonly stats: Effect.Effect<{ available: number; inUse: number }>
}
const makeConnectionPool = (
maxSize: number,
createConnection: Effect.Effect<Connection>
): Effect.Effect<ConnectionPool> =>
Effect.gen(function* () {
const state = yield* SynchronizedRef.make<PoolState>({
available: [],
inUse: [],
maxSize
})
return {
acquire: SynchronizedRef.modifyEffect(state, (s) => {
// Есть доступные соединения
if (s.available.length > 0) {
const [conn, ...rest] = s.available
return Effect.succeed([
conn!,
{ ...s, available: rest, inUse: [...s.inUse, conn!] }
] as const)
}
// Можем создать новое
if (s.available.length + s.inUse.length < s.maxSize) {
return Effect.gen(function* () {
const conn = yield* createConnection
return [
conn,
{ ...s, inUse: [...s.inUse, conn] }
] as const
})
}
// Пул полон — ждём (в реальности тут была бы очередь)
return Effect.fail(new Error("Pool exhausted"))
}),
release: (conn: Connection) =>
SynchronizedRef.update(state, (s) => ({
...s,
available: [...s.available, conn],
inUse: s.inUse.filter((c) => c.id !== conn.id)
})),
stats: Effect.map(SynchronizedRef.get(state), (s) => ({
available: s.available.length,
inUse: s.inUse.length
}))
}
})
// Использование
const program = Effect.gen(function* () {
let connId = 0
const createConnection = Effect.gen(function* () {
connId++
yield* Effect.log(`Creating connection ${connId}`)
yield* Effect.sleep("100 millis")
return { id: `conn-${connId}`, createdAt: Date.now() }
})
const pool = yield* makeConnectionPool(3, createConnection)
// Симулируем конкурентное использование
yield* Effect.forEach(
Array.range(1, 5),
(i) =>
Effect.gen(function* () {
const conn = yield* pool.acquire
yield* Effect.log(`Task ${i} acquired ${conn.id}`)
yield* Effect.sleep("200 millis")
yield* pool.release(conn)
yield* Effect.log(`Task ${i} released ${conn.id}`)
}),
{ concurrency: 3 }
)
const stats = yield* pool.stats
console.log("Final stats:", stats)
})
Effect.runPromise(program)
Паттерн “Memoization with Effects”
interface Memoized<K, V, E, R> {
readonly get: (key: K) => Effect.Effect<V, E, R>
readonly invalidate: (key: K) => Effect.Effect<void>
readonly clear: Effect.Effect<void>
}
const memoize = <K, V, E, R>(
compute: (key: K) => Effect.Effect<V, E, R>
): Effect.Effect<Memoized<K, V, E, R>> =>
Effect.gen(function* () {
const cache = yield* SynchronizedRef.make<HashMap.HashMap<K, V>>(
HashMap.empty()
)
return {
get: (key: K) =>
SynchronizedRef.modifyEffect(cache, (map) => {
const cached = HashMap.get(map, key)
if (Option.isSome(cached)) {
return Effect.succeed([cached.value, map] as const)
}
return Effect.gen(function* () {
const value = yield* compute(key)
const newMap = HashMap.set(map, key, value)
return [value, newMap] as const
})
}),
invalidate: (key: K) =>
SynchronizedRef.update(cache, (map) =>
HashMap.remove(map, key)
),
clear: SynchronizedRef.set(cache, HashMap.empty())
}
})
// Использование: мемоизация Fibonacci
const program = Effect.gen(function* () {
const fibMemo = yield* memoize((n: number): Effect.Effect<bigint> =>
Effect.gen(function* () {
yield* Effect.log(`Computing fib(${n})`)
if (n <= 1) return BigInt(n)
// Здесь в реальности рекурсивный вызов через memo
// Для примера — простая формула
yield* Effect.sleep("10 millis")
return BigInt(n) // упрощённо
})
)
// Первый вызов — вычисление
yield* fibMemo.get(10)
// Второй вызов — из кэша
yield* fibMemo.get(10)
// Третий вызов для другого ключа — вычисление
yield* fibMemo.get(20)
})
Effect.runPromise(program)
Сравнение с Ref
Таблица сравнения
| Характеристика | Ref<A> | SynchronizedRef<A> |
|---|---|---|
| Обновление | A => A (чистое) | A => Effect<A> (эффективное) |
| Механизм | Optimistic CAS | Pessimistic lock (семафор) |
| Retry при конфликте | Да (автоматически) | Нет (блокировка) |
| Побочные эффекты | ❌ Повторятся | ✅ Один раз |
| Производительность | Выше (lock-free) | Ниже (contention) |
| API | get, set, update, modify | + updateEffect, modifyEffect |
Когда что использовать
// ✅ Ref: чистые обновления
const counterRef = Ref.make(0)
// Ref.update(ref, n => n + 1) — оптимально
// ✅ SynchronizedRef: обновления с эффектами
const cacheRef = SynchronizedRef.make<string | null>(null)
// SynchronizedRef.updateEffect(ref, current =>
// current ? Effect.succeed(current) : fetchFromAPI()
// )
// ⚠️ Анти-паттерн: SynchronizedRef для чистых обновлений
// Работает, но неэффективно из-за overhead семафора
const badUsage = Effect.gen(function* () {
const ref = yield* SynchronizedRef.make(0)
// Лучше использовать обычный Ref!
yield* SynchronizedRef.update(ref, (n) => n + 1)
})
Миграция с Ref на SynchronizedRef
// Было: Ref с race condition
const oldCode = Effect.gen(function* () {
const cache = yield* Ref.make<Option.Option<string>>(Option.none())
const getData = Effect.gen(function* () {
const cached = yield* Ref.get(cache)
if (Option.isSome(cached)) return cached.value
// ⚠️ Race condition здесь!
const data = yield* fetchData()
yield* Ref.set(cache, Option.some(data))
return data
})
})
// Стало: SynchronizedRef без race condition
const newCode = Effect.gen(function* () {
const cache = yield* SynchronizedRef.make<Option.Option<string>>(
Option.none()
)
const getData = SynchronizedRef.modifyEffect(cache, (cached) => {
if (Option.isSome(cached)) {
return Effect.succeed([cached.value, cached] as const)
}
// ✅ Атомарно!
return Effect.gen(function* () {
const data = yield* fetchData()
return [data, Option.some(data)] as const
})
})
})
const fetchData = Effect.succeed("data")
Упражнения
Упражнение 1: Счётчик с логированием
Создайте счётчик, который логирует каждое изменение (логирование — побочный эффект).
import { Effect, SynchronizedRef } from "effect"
interface LoggingCounter {
readonly increment: Effect.Effect<number> // возвращает новое значение
readonly decrement: Effect.Effect<number>
readonly get: Effect.Effect<number>
}
const makeLoggingCounter = (): Effect.Effect<LoggingCounter> =>
// Ваш код здесь
Effect.gen(function* () {
// ???
})import { Effect, SynchronizedRef } from "effect"
interface LoggingCounter {
readonly increment: Effect.Effect<number>
readonly decrement: Effect.Effect<number>
readonly get: Effect.Effect<number>
}
const makeLoggingCounter = (): Effect.Effect<LoggingCounter> =>
Effect.gen(function* () {
const ref = yield* SynchronizedRef.make(0)
const change = (delta: number, op: string) =>
SynchronizedRef.modifyEffect(ref, (current) =>
Effect.gen(function* () {
const newValue = current + delta
yield* Effect.log(`${op}: ${current} -> ${newValue}`)
return [newValue, newValue] as const
})
)
return {
increment: change(1, "INCREMENT"),
decrement: change(-1, "DECREMENT"),
get: SynchronizedRef.get(ref)
}
})Упражнение 2: Debounced Writer
Создайте writer, который записывает данные в “файл” (симуляция) только после того, как прошло 500мс без новых записей.
import { Effect, SynchronizedRef, Fiber } from "effect"
interface DebouncedWriter {
readonly write: (data: string) => Effect.Effect<void>
readonly flush: Effect.Effect<void>
}
const makeDebouncedWriter = (
sink: (data: string) => Effect.Effect<void>
): Effect.Effect<DebouncedWriter> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: храните pending data и scheduled fiber
// ???
})import { Effect, SynchronizedRef, Fiber, Option } from "effect"
interface DebouncedWriter {
readonly write: (data: string) => Effect.Effect<void>
readonly flush: Effect.Effect<void>
}
interface WriterState {
readonly pending: Option.Option<string>
readonly scheduledFlush: Option.Option<Fiber.RuntimeFiber<void, never>>
}
const makeDebouncedWriter = (
sink: (data: string) => Effect.Effect<void>
): Effect.Effect<DebouncedWriter> =>
Effect.gen(function* () {
const state = yield* SynchronizedRef.make<WriterState>({
pending: Option.none(),
scheduledFlush: Option.none()
})
const doFlush = SynchronizedRef.modifyEffect(state, (s) =>
Effect.gen(function* () {
if (Option.isSome(s.pending)) {
yield* sink(s.pending.value)
}
return [
undefined as void,
{ pending: Option.none(), scheduledFlush: Option.none() }
] as const
})
)
return {
write: (data: string) =>
SynchronizedRef.updateEffect(state, (s) =>
Effect.gen(function* () {
// Отменяем предыдущий scheduled flush
if (Option.isSome(s.scheduledFlush)) {
yield* Fiber.interrupt(s.scheduledFlush.value)
}
// Планируем новый flush через 500мс
const flushFiber = yield* Effect.fork(
Effect.delay(doFlush, "500 millis")
)
return {
pending: Option.some(data),
scheduledFlush: Option.some(flushFiber)
}
})
),
flush: doFlush
}
})Упражнение 3: Read-Write Lock
Реализуйте read-write lock: множество читателей ИЛИ один писатель.
import { Effect, SynchronizedRef } from "effect"
interface ReadWriteLock {
readonly readLock: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
readonly writeLock: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
}
const makeReadWriteLock = (): Effect.Effect<ReadWriteLock> =>
// Ваш код здесь
Effect.gen(function* () {
// Подсказка: state = { readers: number, writer: boolean }
// ???
})import { Effect, SynchronizedRef, Deferred } from "effect"
interface ReadWriteLock {
readonly readLock: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
readonly writeLock: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R>
}
interface LockState {
readonly readers: number
readonly writer: boolean
readonly waitingWriters: number
}
const makeReadWriteLock = (): Effect.Effect<ReadWriteLock> =>
Effect.gen(function* () {
const state = yield* SynchronizedRef.make<LockState>({
readers: 0,
writer: false,
waitingWriters: 0
})
const acquireRead: Effect.Effect<void> =
SynchronizedRef.modifyEffect(state, (s) => {
// Можем читать если нет активного или ожидающего писателя
if (!s.writer && s.waitingWriters === 0) {
return Effect.succeed([
undefined as void,
{ ...s, readers: s.readers + 1 }
] as const)
}
// Ждём и пробуем снова
return Effect.gen(function* () {
yield* Effect.sleep("10 millis")
yield* acquireRead
return [undefined as void, s] as const
})
})
const releaseRead: Effect.Effect<void> =
SynchronizedRef.update(state, (s) => ({
...s,
readers: s.readers - 1
}))
const acquireWrite: Effect.Effect<void> =
SynchronizedRef.modifyEffect(state, (s) => {
// Можем писать если нет читателей и писателей
if (s.readers === 0 && !s.writer) {
return Effect.succeed([
undefined as void,
{ ...s, writer: true, waitingWriters: s.waitingWriters - 1 }
] as const)
}
// Регистрируемся как ожидающий писатель и ждём
return Effect.gen(function* () {
if (s.waitingWriters === 0) {
yield* SynchronizedRef.update(state, (s2) => ({
...s2,
waitingWriters: s2.waitingWriters + 1
}))
}
yield* Effect.sleep("10 millis")
yield* acquireWrite
return [undefined as void, s] as const
})
})
const releaseWrite: Effect.Effect<void> =
SynchronizedRef.update(state, (s) => ({
...s,
writer: false
}))
return {
readLock: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
Effect.acquireUseRelease(
acquireRead,
() => effect,
() => releaseRead
),
writeLock: <A, E, R>(effect: Effect.Effect<A, E, R>) =>
Effect.acquireUseRelease(
acquireWrite,
() => effect,
() => releaseWrite
)
}
})Резюме
SynchronizedRef<A> — расширение Ref для случаев, когда обновление само требует выполнения эффектов:
| Аспект | Описание |
|---|---|
| Назначение | Атомарные эффективные обновления |
| Механизм | Пессимистичная блокировка (семафор) |
| Главная операция | updateEffect, modifyEffect |
| Use case | Кэши, ленивая инициализация, rate limiters |
| Ограничение | Один updater в момент времени |
Ключевые отличия от Ref
- ✅ Поддержка
A => Effect<A>обновлений - ✅ Гарантия: эффект выполнится только один раз
- ⚠️ Более высокий contention при высокой конкурентности
- ⚠️ Возможны deadlocks при неправильном использовании