Effect Курс SynchronizedRef

SynchronizedRef

Эффективные обновления.

Теория

Ограничение обычного Ref

Ref.update принимает чистую функцию A => A:

// Ref.update требует чистую функцию
Ref.update(ref, (current) => current + 1) // ✅ OK

// Но что если нам нужно выполнить эффект внутри обновления?
Ref.update(ref, async (current) => {
  const extra = await fetchFromDB()  // ❌ Нельзя!
  return current + extra
})

Это ограничение существует по важной причине: атомарность Ref.update основана на оптимистичном CAS (compare-and-swap), который может повторять функцию при конфликтах. Если функция имеет побочные эффекты, они выполнятся несколько раз.

Что такое SynchronizedRef?

SynchronizedRef<A> решает эту проблему через пессимистичную блокировку (семафор):

┌────────────────────────────────────────────────────────────┐
│                    SynchronizedRef<A>                       │
├────────────────────────────────────────────────────────────┤
│                                                            │
│  ┌──────────────────┐     ┌──────────────────────────┐    │
│  │    Semaphore     │────▶│      Внутренний Ref      │    │
│  │   (permits: 1)   │     │        value: A          │    │
│  └──────────────────┘     └──────────────────────────┘    │
│          │                            │                    │
│          │                            │                    │
│          ▼                            ▼                    │
│   ┌─────────────────────────────────────────────────┐     │
│   │              updateEffect(f: A => Effect<A>)     │     │
│   │                                                  │     │
│   │  1. Acquire semaphore (блокировка)              │     │
│   │  2. Read current value                           │     │
│   │  3. Execute effect: f(current)                   │     │
│   │  4. Write new value                              │     │
│   │  5. Release semaphore                            │     │
│   └─────────────────────────────────────────────────┘     │
│                                                            │
│  ⚠️ Только ОДИН fiber может выполнять updateEffect        │
│     в любой момент времени                                 │
└────────────────────────────────────────────────────────────┘

Семантика блокировки

┌──────────────────────────────────────────────────────────────┐
│           Concurrent Updates to SynchronizedRef              │
├──────────────────────────────────────────────────────────────┤
│                                                              │
│   Fiber A                    Fiber B                         │
│   ───────                    ───────                         │
│      │                          │                            │
│      ├─ acquire semaphore ✓     │                            │
│      │                          ├─ acquire semaphore ⏳      │
│      ├─ read: 10                │      (waiting...)          │
│      │                          │                            │
│      ├─ effect: fetchData()     │                            │
│      │   ... async work ...     │                            │
│      │                          │                            │
│      ├─ write: 15               │                            │
│      ├─ release semaphore       │                            │
│      │                          ├─ acquire semaphore ✓       │
│      │                          ├─ read: 15                  │
│      │                          ├─ effect: compute()         │
│      │                          ├─ write: 20                 │
│      │                          ├─ release semaphore         │
│      ▼                          ▼                            │
│                                                              │
│  Гарантия: эффекты выполняются ПОСЛЕДОВАТЕЛЬНО              │
│            значение КОНСИСТЕНТНО                             │
└──────────────────────────────────────────────────────────────┘

Когда использовать SynchronizedRef

СценарийRefSynchronizedRef
Чистые обновления A => A✅ Оптимально⚠️ Работает, но overhead
Эффективные обновления A => Effect<A>❌ Невозможно✅ Единственный выбор
Высокая конкурентность, короткие операции✅ Лучше (CAS)⚠️ Возможен contention
Длинные асинхронные обновления❌ Невозможно✅ Необходимо
Read-heavy workload✅ Одинаково✅ Одинаково

Проблема обычного Ref

Пример: Кэш с загрузкой данных


// ❌ Неправильный подход с Ref
const badCache = Effect.gen(function* () {
  const cache = yield* Ref.make<Option.Option<string>>(Option.none())
  
  const getData = Effect.gen(function* () {
    const cached = yield* Ref.get(cache)
    
    if (Option.isSome(cached)) {
      return cached.value
    }
    
    // ПРОБЛЕМА: между get и set другой fiber может
    // тоже обнаружить пустой кэш и начать загрузку!
    const data = yield* fetchFromDatabase()
    yield* Ref.set(cache, Option.some(data))
    
    return data
  })
  
  // Два конкурентных вызова → две загрузки из БД!
  yield* Effect.all([getData, getData], { concurrency: 2 })
})

// Симуляция загрузки из БД
const fetchFromDatabase = Effect.gen(function* () {
  yield* Effect.log("Fetching from database...") // Увидим дважды!
  yield* Effect.sleep("1 second")
  return "expensive data"
})

Решение с SynchronizedRef


// ✅ Правильный подход с SynchronizedRef
const goodCache = Effect.gen(function* () {
  const cache = yield* SynchronizedRef.make<Option.Option<string>>(
    Option.none()
  )
  
  const getData = SynchronizedRef.updateEffect(cache, (cached) => {
    if (Option.isSome(cached)) {
      // Уже есть данные — ничего не делаем
      return Effect.succeed(cached)
    }
    
    // Загружаем атомарно — другие файберы будут ждать
    return Effect.gen(function* () {
      yield* Effect.log("Fetching from database...")
      yield* Effect.sleep("1 second")
      return Option.some("expensive data")
    })
  })
  
  // Два конкурентных вызова → ОДНА загрузка из БД
  yield* Effect.all([getData, getData], { concurrency: 2 })
  // Output: "Fetching from database..." (только один раз!)
})

API Reference

Создание SynchronizedRef

SynchronizedRef.make [STABLE]

Создаёт SynchronizedRef с начальным значением.

declare const make: <A>(value: A) => Effect.Effect<SynchronizedRef<A>>

const program = Effect.gen(function* () {
  // SynchronizedRef с примитивным типом
  const counter = yield* SynchronizedRef.make(0)
  
  // SynchronizedRef со сложным типом
  const state = yield* SynchronizedRef.make({
    users: [] as ReadonlyArray<string>,
    lastUpdated: Date.now()
  })
})

Чтение и запись (унаследовано от Ref)

SynchronizedRef имеет все методы обычного Ref:


const basicOps = Effect.gen(function* () {
  const ref = yield* SynchronizedRef.make(10)
  
  // Чтение
  const value = yield* SynchronizedRef.get(ref)
  
  // Запись
  yield* SynchronizedRef.set(ref, 20)
  
  // Чистое обновление (как в Ref)
  yield* SynchronizedRef.update(ref, (n) => n + 1)
  
  // Чистая модификация
  const old = yield* SynchronizedRef.getAndUpdate(ref, (n) => n * 2)
})

Эффективное обновление

SynchronizedRef.updateEffect [STABLE]

Главная операция — атомарное обновление с эффектом.

declare const updateEffect: <A, E, R>(
  f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<void, E, R>

interface CacheEntry {
  readonly data: string
  readonly fetchedAt: number
}

const program = Effect.gen(function* () {
  const cache = yield* SynchronizedRef.make<CacheEntry | null>(null)
  
  // Обновление с эффектом — атомарно!
  yield* SynchronizedRef.updateEffect(cache, (current) => {
    // Проверяем актуальность кэша
    if (current && Date.now() - current.fetchedAt < 60_000) {
      return Effect.succeed(current) // Кэш свежий
    }
    
    // Загружаем новые данные
    return Effect.gen(function* () {
      const data = yield* fetchData()
      return { data, fetchedAt: Date.now() }
    })
  })
})

const fetchData = Effect.gen(function* () {
  yield* Effect.sleep("100 millis")
  return "fresh data"
})

SynchronizedRef.modifyEffect [STABLE]

Атомарная модификация с эффектом и возвратом результата.

declare const modifyEffect: <A, B, E, R>(
  f: (a: A) => Effect.Effect<readonly [B, A], E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<B, E, R>

interface AccountState {
  readonly balance: number
  readonly transactions: ReadonlyArray<string>
}

const program = Effect.gen(function* () {
  const account = yield* SynchronizedRef.make<AccountState>({
    balance: 1000,
    transactions: []
  })
  
  // Снятие денег с проверкой и логированием
  const withdraw = (amount: number) =>
    SynchronizedRef.modifyEffect(account, (state) => {
      if (state.balance < amount) {
        // Возвращаем ошибку
        return Effect.fail(new Error("Insufficient funds"))
      }
      
      // Логируем транзакцию (побочный эффект!)
      return Effect.gen(function* () {
        yield* Effect.log(`Withdrawing ${amount}`)
        
        const newState: AccountState = {
          balance: state.balance - amount,
          transactions: [...state.transactions, `Withdraw: ${amount}`]
        }
        
        return [true, newState] as const // [результат, новое состояние]
      })
    })
  
  const success = yield* withdraw(300)
  console.log(`Success: ${success}`) // true
  
  const finalState = yield* SynchronizedRef.get(account)
  console.log(`Balance: ${finalState.balance}`) // 700
})

SynchronizedRef.getAndUpdateEffect [STABLE]

Возвращает текущее значение и обновляет с эффектом.

declare const getAndUpdateEffect: <A, E, R>(
  f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<A, E, R>

const program = Effect.gen(function* () {
  const ref = yield* SynchronizedRef.make(10)
  
  // Получаем старое значение, применяем асинхронное обновление
  const oldValue = yield* SynchronizedRef.getAndUpdateEffect(
    ref,
    (current) => Effect.gen(function* () {
      yield* Effect.sleep("100 millis")
      return current * 2
    })
  )
  
  console.log(oldValue) // 10
  console.log(yield* SynchronizedRef.get(ref)) // 20
})

SynchronizedRef.updateAndGetEffect [STABLE]

Обновляет с эффектом и возвращает новое значение.

declare const updateAndGetEffect: <A, E, R>(
  f: (a: A) => Effect.Effect<A, E, R>
) => (self: SynchronizedRef<A>) => Effect.Effect<A, E, R>

const program = Effect.gen(function* () {
  const ref = yield* SynchronizedRef.make(10)
  
  const newValue = yield* SynchronizedRef.updateAndGetEffect(
    ref,
    (current) => Effect.gen(function* () {
      const multiplier = yield* getMultiplier()
      return current * multiplier
    })
  )
  
  console.log(newValue) // Новое значение после обновления
})

const getMultiplier = Effect.succeed(3)

Примеры

Ленивый кэш с TTL


interface CacheEntry<A> {
  readonly value: A
  readonly expiresAt: number
}

interface LazyCache<A> {
  readonly get: Effect.Effect<A>
  readonly invalidate: Effect.Effect<void>
}

const makeCache = <A, E, R>(
  load: Effect.Effect<A, E, R>,
  ttl: Duration.DurationInput
): Effect.Effect<LazyCache<A>, never, R> =>
  Effect.gen(function* () {
    const ref = yield* SynchronizedRef.make<Option.Option<CacheEntry<A>>>(
      Option.none()
    )
    
    const ttlMs = Duration.toMillis(Duration.decode(ttl))
    
    return {
      get: SynchronizedRef.modifyEffect(ref, (cached) => {
        const now = Date.now()
        
        // Проверяем наличие и актуальность
        if (Option.isSome(cached) && cached.value.expiresAt > now) {
          return Effect.succeed([cached.value.value, cached] as const)
        }
        
        // Загружаем новые данные
        return Effect.gen(function* () {
          const value = yield* load
          const entry: CacheEntry<A> = {
            value,
            expiresAt: now + ttlMs
          }
          return [value, Option.some(entry)] as const
        })
      }),
      
      invalidate: SynchronizedRef.set(ref, Option.none())
    }
  })

// Использование
const program = Effect.gen(function* () {
  let fetchCount = 0
  
  const expensiveFetch = Effect.gen(function* () {
    fetchCount++
    yield* Effect.log(`Fetching... (call #${fetchCount})`)
    yield* Effect.sleep("500 millis")
    return { data: "important", fetchedAt: Date.now() }
  })
  
  const cache = yield* makeCache(expensiveFetch, "5 seconds")
  
  // Первый вызов — загрузка
  const data1 = yield* cache.get
  yield* Effect.log(`Got: ${JSON.stringify(data1)}`)
  
  // Второй вызов — из кэша
  const data2 = yield* cache.get
  yield* Effect.log(`Got again: ${JSON.stringify(data2)}`)
  
  // Инвалидация
  yield* cache.invalidate
  
  // После инвалидации — снова загрузка
  const data3 = yield* cache.get
  yield* Effect.log(`After invalidate: ${JSON.stringify(data3)}`)
  
  console.log(`Total fetches: ${fetchCount}`) // 2
})

Effect.runPromise(program)

Rate Limiter с SynchronizedRef


interface RateLimiter {
  readonly acquire: Effect.Effect<void>
}

const makeRateLimiter = (
  maxRequests: number,
  windowMs: number
): Effect.Effect<RateLimiter> =>
  Effect.gen(function* () {
    interface State {
      readonly tokens: number
      readonly lastRefill: number
    }
    
    const ref = yield* SynchronizedRef.make<State>({
      tokens: maxRequests,
      lastRefill: Date.now()
    })
    
    return {
      acquire: SynchronizedRef.updateEffect(ref, (state) =>
        Effect.gen(function* () {
          const now = Date.now()
          const elapsed = now - state.lastRefill
          
          // Пополняем токены если прошло время
          let newTokens = state.tokens
          let newLastRefill = state.lastRefill
          
          if (elapsed >= windowMs) {
            const periods = Math.floor(elapsed / windowMs)
            newTokens = Math.min(
              maxRequests,
              state.tokens + periods * maxRequests
            )
            newLastRefill = state.lastRefill + periods * windowMs
          }
          
          // Проверяем доступность токена
          if (newTokens > 0) {
            return {
              tokens: newTokens - 1,
              lastRefill: newLastRefill
            }
          }
          
          // Ждём до следующего окна
          const waitTime = windowMs - (now - newLastRefill)
          yield* Effect.sleep(waitTime)
          
          // После ожидания — рекурсивно пробуем снова
          // (через updateEffect — атомарно!)
          return {
            tokens: maxRequests - 1,
            lastRefill: Date.now()
          }
        })
      )
    }
  })

// Использование
const program = Effect.gen(function* () {
  // Максимум 3 запроса в секунду
  const limiter = yield* makeRateLimiter(3, 1000)
  
  // Симулируем 10 запросов
  yield* Effect.forEach(
    Array.from({ length: 10 }, (_, i) => i),
    (i) =>
      Effect.gen(function* () {
        yield* limiter.acquire
        yield* Effect.log(`Request ${i + 1} processed`)
      }),
    { concurrency: "unbounded" }
  )
})

Effect.runPromise(program)

Connection Pool State


interface Connection {
  readonly id: string
  readonly createdAt: number
}

interface PoolState {
  readonly available: ReadonlyArray<Connection>
  readonly inUse: ReadonlyArray<Connection>
  readonly maxSize: number
}

interface ConnectionPool {
  readonly acquire: Effect.Effect<Connection>
  readonly release: (conn: Connection) => Effect.Effect<void>
  readonly stats: Effect.Effect<{ available: number; inUse: number }>
}

const makeConnectionPool = (
  maxSize: number,
  createConnection: Effect.Effect<Connection>
): Effect.Effect<ConnectionPool> =>
  Effect.gen(function* () {
    const state = yield* SynchronizedRef.make<PoolState>({
      available: [],
      inUse: [],
      maxSize
    })
    
    return {
      acquire: SynchronizedRef.modifyEffect(state, (s) => {
        // Есть доступные соединения
        if (s.available.length > 0) {
          const [conn, ...rest] = s.available
          return Effect.succeed([
            conn!,
            { ...s, available: rest, inUse: [...s.inUse, conn!] }
          ] as const)
        }
        
        // Можем создать новое
        if (s.available.length + s.inUse.length < s.maxSize) {
          return Effect.gen(function* () {
            const conn = yield* createConnection
            return [
              conn,
              { ...s, inUse: [...s.inUse, conn] }
            ] as const
          })
        }
        
        // Пул полон — ждём (в реальности тут была бы очередь)
        return Effect.fail(new Error("Pool exhausted"))
      }),
      
      release: (conn: Connection) =>
        SynchronizedRef.update(state, (s) => ({
          ...s,
          available: [...s.available, conn],
          inUse: s.inUse.filter((c) => c.id !== conn.id)
        })),
      
      stats: Effect.map(SynchronizedRef.get(state), (s) => ({
        available: s.available.length,
        inUse: s.inUse.length
      }))
    }
  })

// Использование
const program = Effect.gen(function* () {
  let connId = 0
  
  const createConnection = Effect.gen(function* () {
    connId++
    yield* Effect.log(`Creating connection ${connId}`)
    yield* Effect.sleep("100 millis")
    return { id: `conn-${connId}`, createdAt: Date.now() }
  })
  
  const pool = yield* makeConnectionPool(3, createConnection)
  
  // Симулируем конкурентное использование
  yield* Effect.forEach(
    Array.range(1, 5),
    (i) =>
      Effect.gen(function* () {
        const conn = yield* pool.acquire
        yield* Effect.log(`Task ${i} acquired ${conn.id}`)
        yield* Effect.sleep("200 millis")
        yield* pool.release(conn)
        yield* Effect.log(`Task ${i} released ${conn.id}`)
      }),
    { concurrency: 3 }
  )
  
  const stats = yield* pool.stats
  console.log("Final stats:", stats)
})

Effect.runPromise(program)

Паттерн “Memoization with Effects”


interface Memoized<K, V, E, R> {
  readonly get: (key: K) => Effect.Effect<V, E, R>
  readonly invalidate: (key: K) => Effect.Effect<void>
  readonly clear: Effect.Effect<void>
}

const memoize = <K, V, E, R>(
  compute: (key: K) => Effect.Effect<V, E, R>
): Effect.Effect<Memoized<K, V, E, R>> =>
  Effect.gen(function* () {
    const cache = yield* SynchronizedRef.make<HashMap.HashMap<K, V>>(
      HashMap.empty()
    )
    
    return {
      get: (key: K) =>
        SynchronizedRef.modifyEffect(cache, (map) => {
          const cached = HashMap.get(map, key)
          
          if (Option.isSome(cached)) {
            return Effect.succeed([cached.value, map] as const)
          }
          
          return Effect.gen(function* () {
            const value = yield* compute(key)
            const newMap = HashMap.set(map, key, value)
            return [value, newMap] as const
          })
        }),
      
      invalidate: (key: K) =>
        SynchronizedRef.update(cache, (map) =>
          HashMap.remove(map, key)
        ),
      
      clear: SynchronizedRef.set(cache, HashMap.empty())
    }
  })

// Использование: мемоизация Fibonacci
const program = Effect.gen(function* () {
  const fibMemo = yield* memoize((n: number): Effect.Effect<bigint> =>
    Effect.gen(function* () {
      yield* Effect.log(`Computing fib(${n})`)
      if (n <= 1) return BigInt(n)
      
      // Здесь в реальности рекурсивный вызов через memo
      // Для примера — простая формула
      yield* Effect.sleep("10 millis")
      return BigInt(n) // упрощённо
    })
  )
  
  // Первый вызов — вычисление
  yield* fibMemo.get(10)
  
  // Второй вызов — из кэша
  yield* fibMemo.get(10)
  
  // Третий вызов для другого ключа — вычисление
  yield* fibMemo.get(20)
})

Effect.runPromise(program)

Сравнение с Ref

Таблица сравнения

ХарактеристикаRef<A>SynchronizedRef<A>
ОбновлениеA => A (чистое)A => Effect<A> (эффективное)
МеханизмOptimistic CASPessimistic lock (семафор)
Retry при конфликтеДа (автоматически)Нет (блокировка)
Побочные эффекты❌ Повторятся✅ Один раз
ПроизводительностьВыше (lock-free)Ниже (contention)
APIget, set, update, modify+ updateEffect, modifyEffect

Когда что использовать


// ✅ Ref: чистые обновления
const counterRef = Ref.make(0)
// Ref.update(ref, n => n + 1) — оптимально

// ✅ SynchronizedRef: обновления с эффектами
const cacheRef = SynchronizedRef.make<string | null>(null)
// SynchronizedRef.updateEffect(ref, current =>
//   current ? Effect.succeed(current) : fetchFromAPI()
// )

// ⚠️ Анти-паттерн: SynchronizedRef для чистых обновлений
// Работает, но неэффективно из-за overhead семафора
const badUsage = Effect.gen(function* () {
  const ref = yield* SynchronizedRef.make(0)
  // Лучше использовать обычный Ref!
  yield* SynchronizedRef.update(ref, (n) => n + 1)
})

Миграция с Ref на SynchronizedRef


// Было: Ref с race condition
const oldCode = Effect.gen(function* () {
  const cache = yield* Ref.make<Option.Option<string>>(Option.none())
  
  const getData = Effect.gen(function* () {
    const cached = yield* Ref.get(cache)
    if (Option.isSome(cached)) return cached.value
    
    // ⚠️ Race condition здесь!
    const data = yield* fetchData()
    yield* Ref.set(cache, Option.some(data))
    return data
  })
})

// Стало: SynchronizedRef без race condition
const newCode = Effect.gen(function* () {
  const cache = yield* SynchronizedRef.make<Option.Option<string>>(
    Option.none()
  )
  
  const getData = SynchronizedRef.modifyEffect(cache, (cached) => {
    if (Option.isSome(cached)) {
      return Effect.succeed([cached.value, cached] as const)
    }
    
    // ✅ Атомарно!
    return Effect.gen(function* () {
      const data = yield* fetchData()
      return [data, Option.some(data)] as const
    })
  })
})

const fetchData = Effect.succeed("data")

Упражнения

Упражнение

Упражнение 1: Счётчик с логированием

Легко

Создайте счётчик, который логирует каждое изменение (логирование — побочный эффект).

import { Effect, SynchronizedRef } from "effect"

interface LoggingCounter {
  readonly increment: Effect.Effect<number> // возвращает новое значение
  readonly decrement: Effect.Effect<number>
  readonly get: Effect.Effect<number>
}

const makeLoggingCounter = (): Effect.Effect<LoggingCounter> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // ???
  })
Упражнение

Упражнение 2: Debounced Writer

Средне

Создайте writer, который записывает данные в “файл” (симуляция) только после того, как прошло 500мс без новых записей.

import { Effect, SynchronizedRef, Fiber } from "effect"

interface DebouncedWriter {
  readonly write: (data: string) => Effect.Effect<void>
  readonly flush: Effect.Effect<void>
}

const makeDebouncedWriter = (
  sink: (data: string) => Effect.Effect<void>
): Effect.Effect<DebouncedWriter> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // Подсказка: храните pending data и scheduled fiber
    // ???
  })
Упражнение

Упражнение 3: Read-Write Lock

Сложно

Реализуйте read-write lock: множество читателей ИЛИ один писатель.

import { Effect, SynchronizedRef } from "effect"

interface ReadWriteLock {
  readonly readLock: <A, E, R>(
    effect: Effect.Effect<A, E, R>
  ) => Effect.Effect<A, E, R>
  
  readonly writeLock: <A, E, R>(
    effect: Effect.Effect<A, E, R>
  ) => Effect.Effect<A, E, R>
}

const makeReadWriteLock = (): Effect.Effect<ReadWriteLock> =>
  // Ваш код здесь
  Effect.gen(function* () {
    // Подсказка: state = { readers: number, writer: boolean }
    // ???
  })

Резюме

SynchronizedRef<A> — расширение Ref для случаев, когда обновление само требует выполнения эффектов:

АспектОписание
НазначениеАтомарные эффективные обновления
МеханизмПессимистичная блокировка (семафор)
Главная операцияupdateEffect, modifyEffect
Use caseКэши, ленивая инициализация, rate limiters
ОграничениеОдин updater в момент времени

Ключевые отличия от Ref

  • ✅ Поддержка A => Effect<A> обновлений
  • ✅ Гарантия: эффект выполнится только один раз
  • ⚠️ Более высокий contention при высокой конкурентности
  • ⚠️ Возможны deadlocks при неправильном использовании