Cache β€” 75-Minute Interview Guide

Quick Start

What is it? A configurable in-memory cache with pluggable eviction strategies (LRU, LFU, FIFO), write policies (write-through, write-back), TTL expiration, undo/redo commands, snapshots, and event-based observability.

Key Classes: - CacheManager (Singleton): Orchestrates cache operations - CacheEntry: Stores value, size, timestamps, frequency, TTL, dirty flag - EvictionStrategy: LRU, LFU, FIFO implementations - WritePolicy: WriteThroughPolicy, WriteBackPolicy implementations - Command: SetCommand, DeleteCommand for undo/redo

Core Flows: 1. Set: Validate β†’ Evict (if needed) β†’ Insert β†’ Update metadata β†’ Write policy β†’ Emit events 2. Get: Check existence β†’ Expire if TTL passed β†’ Update access metadata β†’ Return 3. Evict: When capacity exceeded, strategy chooses victim, removes, emits event

5 Design Patterns: - Singleton: One CacheManager coordinates all operations - Strategy: Multiple eviction algorithms without code changes - Observer: Event listeners for lifecycle monitoring - Command: Reversible set/delete operations with undo/redo - Memento: Snapshot/restore for whole-system rollback


System Overview

An observable, strategy-driven cache supporting multiple algorithms (LRU, LFU, FIFO), pluggable write policies (write-through / write-back), undoable mutations via Command pattern, snapshot & restore, and rich metrics emission. Core focus: adaptability, reliability, and transparency.

Requirements

Functional: - put(key, value, ttl?) - get(key) with miss/hit/expiration handling - delete(key) - Capacity limits (items and bytes) - Eviction strategies (LRU, LFU, FIFO baseline) - TTL expiration (lazy check on access) - Write policies (write-through vs write-back) - Snapshot & restore - Undo/redo for set/delete operations

Non-Functional: - O(1) average key access - Stable event emission for monitoring - Runtime strategy swapping without downtime - Bounded metadata overhead - Support 1000+ concurrent operations (note: single-threaded in demo, locks in production)

Constraints: - Items capacity: e.g., 1000 max entries - Bytes capacity: e.g., 10MB max memory - TTL: Per-key expiration - Strategy swaps: Preserve entries, only change victim selection


Architecture Diagram (ASCII UML)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚            CacheManager (Singleton)                    β”‚
β”‚  Orchestrates store, strategies, policies, events      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
             β”‚
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚        β”‚            β”‚             β”‚
    β–Ό        β–Ό            β–Ό             β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Store  β”‚ β”‚EvictionStrat.β”‚ β”‚ WritePolicy  β”‚
β”‚(Dict)   β”‚ β”‚(LRU/LFU/FIFO)β”‚ β”‚(Through/Back)β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚
     β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  CacheEntry      β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚+value            β”‚
β”‚+size             β”‚
β”‚+frequency        β”‚
β”‚+last_access      β”‚
β”‚+created_at       β”‚
β”‚+ttl              β”‚
β”‚+dirty            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Command Pattern (Undo/Redo):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Command (ABC)     β”‚
β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
     β”‚
  β”Œβ”€β”€β”΄β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β–Ό     β–Ό          β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚SetCmd    β”‚ β”‚DeleteCmd β”‚ β”‚Undo/Redo   β”‚
β”‚ (holds   β”‚ β”‚ (holds   β”‚ β”‚ Stacks     β”‚
β”‚ old val) β”‚ β”‚ removed) β”‚ β”‚            β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Observer Pattern (Events):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Listeners (callbacks)             β”‚
β”‚ - entry_set, entry_get_hit/miss     β”‚
β”‚ - entry_evicted, entry_expired      β”‚
β”‚ - strategy_swapped, policy_swapped  β”‚
β”‚ - snapshot_taken/restored           β”‚
β”‚ - metrics_updated                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Memento Pattern (Snapshots):
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   Snapshot (Deep Copy)               β”‚
β”‚ - store state                        β”‚
β”‚ - backing store                      β”‚
β”‚ - strategy/policy identifiers        β”‚
β”‚ - metrics baseline                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Interview Q&A (12 Questions)

Basic Level

Q1: What is the main purpose of the CacheManager? A: Singleton pattern ensures one global instance coordinating all cache operations (put/get/delete), strategies (LRU/LFU/FIFO), write policies (through/back), and metrics/events emission for observability.

Q2: Why do you need multiple eviction strategies? A: Different workloads benefit from different policies: LRU favors recent access patterns, LFU favors frequently accessed items, FIFO is simple for fair rotation. Pluggable Strategy pattern allows swapping at runtime without modifying core code.

Q3: What is TTL and how is it handled? A: TTL (time-to-live) is per-key expiration time. System uses lazy expiration: checks on access. If expired, entry removed, recorded as miss/eviction. Advantage: no background thread overhead. Trade-off: stale entries briefly visible.

Q4: Why separate WritePolicy from eviction strategy? A: Clear separation of concerns: eviction decides residency (which keys stay), write policy decides persistence (when to write backing store). Allows independent swapping and testing.

Intermediate Level

Q5: How does write-through differ from write-back? A: Write-through: immediately persist to backing store on set (safe but slower). Write-back: mark entry dirty, defer writing (fast but risky if crash). System requires explicit flush() for write-back.

Q6: How do you safely swap strategies at runtime? A: Eviction strategy swap rebuilds metadata (LRU frequency counters) on existing entries but preserves all data. Write policy swap flushes if leaving write-back to avoid data loss. Both emit events for monitoring.

Q7: How does the Command pattern enable undo/redo? A: SetCommand and DeleteCommand encapsulate reversible mutations, storing previous state (old entry for set, removed entry for delete). Undo stack reverses last command; redo reapplies. Each new command clears redo stack.

Q8: How is memory managed under capacity pressure? A: _enforce_capacity() runs after each set, repeatedly calling strategy's choose_victim() until item count ≀ capacity_items AND total bytes ≀ capacity_bytes. Victim removed, eviction metric incremented, event emitted.

Advanced Level

Q9: How would you handle concurrent access in production? A: Add threading.Lock() around critical sections: store access, capacity enforcement, metric updates. Strategy's on_access() needs thread-safe frequency updates. Write policy flush needs atomic backing store write.

Q10: How would you optimize LFU for 1M entries? A: Naive LFU scans all entries to find min frequency: O(n) per eviction. Optimize: bucket-based LFU mapping frequency β†’ ordered set of keys. Update on access is O(log n), victim selection O(1). Trade-off: higher memory for faster eviction.

Q11: How would you scale this to distributed cache? A: Shard by key hash (consistent hashing) across multiple nodes. Each shard runs same architecture independently. Promotion strategy: hot keys replicated to fast tier (e.g., local RAM + remote store). Leader for invalidation coordination.

Q12: What security/correctness concerns exist? A: Size accounting prevents memory blowups. TTL prevents eternal residency. Snapshot restore replaces entire state (avoids partial injection). Missing: access control (who can get/set), audit logging, encryption at rest, rate limiting.


Scaling Q&A (12 Questions)

Q1: How does eviction strategy affect performance at 100K keys? A: LRU: maintains last_access timestamps, O(1) per access, victim selection O(n) scan (acceptable). At 100K, could optimize with time-bucketing or segment-based approach. LFU: similar O(n) scan, bucket optimization suggested.

Q2: What happens if the cache fills up and strategy returns no victim? A: Current implementation prevents infinite loop with safety counter (10K iterations). If no victim found, emit eviction_blocked event and stop accepting new keys. Production: reject new puts with error, alert ops.

Q3: How do you prevent memory growth from undo/redo stacks? A: Cap stack sizes (e.g., max 100 commands). When exceeded, pop oldest. Or implement per-session undo (discard when user disconnects). Trade-off: bounded memory vs full history.

Q4: Can snapshot restore happen while writes are in progress? A: In single-threaded demo, no. Production: acquire global write lock during restore to ensure consistency. Alternative: versioned snapshots + eventual consistency model.

Q5: How does write-back handle crash scenarios? A: Current implementation loses dirty writes. Production mitigation: WAL (write-ahead logging) per dirty write before acknowledging set(). On startup, replay log to restore dirty entries. Trade-off: write latency vs durability.

Q6: What's the memory overhead of tracking frequency + timestamps + TTL per entry? A: CacheEntry: value + size + created_at + last_access + frequency + ttl + dirty β‰ˆ 48+ bytes per entry. For 100K entries: 4.8MB overhead. Acceptable. Optimization: use compact integer timestamps for memory-constrained systems.

Q7: How do you handle TTL expiration at scale? A: Current: lazy expiration on access only (stale entries accumulate). Production: background thread periodic scan + removal. Alternative: delta queue of pending expirations (O(1) add, pop nearest). Trade-off: background CPU vs stale data window.

Q8: How would you monitor cache health metrics? A: Emit metrics_updated event with hit rate, eviction pressure, dirty backlog. Exporter: Prometheus format. Alerts: hit ratio drops below threshold, eviction rate > threshold, dirty backlog growing unbounded.

Q9: Can write policy swap happen mid-flush? A: Current: swap triggers flush if leaving write-back. If flush in progress elsewhere, potential race. Solution: acquire lock during swap, ensure flush completes before releasing policy reference.

Q10: How to partition cache across NUMA nodes for locality? A: Shard by key hash. Each NUMA node owns subset of shards. Thread binds to NUMA node; access local shards (cache-friendly). Remote shard access incurs penalty but infrequent. Performance gain: 20-30% on NUMA systems.

Q11: What if an eviction strategy always returns same victim (bug)? A: Safety counter prevents infinite loop. But cache keeps trying same victim, making no progress. Detect: track prev_victim across iterations. If repeated, trigger strategy fallback (e.g., FIFO) or emit alert.

Q12: How do you handle cache stampede (1000 simultaneous misses after expiration)? A: Lazy expiration causes sudden miss spike. Mitigation: probabilistic early expiration (remove entry slightly before TTL), or lock-free read during fetch (wait for one fetch result, all others use it). Trade-off: extra reads vs stampede risk.


Demo Scenarios (5 Examples)

Demo 1: Basic LRU Put/Get & Eviction

- Set k0 β†’ k6 (7 items, capacity 5)
- Access k3, k4 to make them recent
- Set k_new β†’ triggers eviction of oldest (k0, k1, k2)
- Show remaining: k3, k4, k5, k6, k_new

Demo 2: TTL Expiration

- Set "temp" with ttl=0.2 seconds
- Wait 0.25 seconds
- Get "temp" β†’ None (lazy expiration on access)
- Hit ratio affected (counts as miss)

Demo 3: Eviction Strategy Swap (LRU β†’ LFU)

- Initially LRU
- Access k3 five times, k4 twice
- Swap to LFU
- Set new_item β†’ LFU evicts k4 (lower frequency) not k3

Demo 4: Write Policy Swap (Through β†’ Back β†’ Flush)

- Set with write-through (immediate backing store write)
- Swap to write-back (mark dirty)
- Set multiple items (dirty_pending increases)
- Call flush() β†’ writes all dirty, dirty_pending = 0
- Swap back to write-through

Demo 5: Snapshot & Restore

- Take snapshot of current state
- Set new items
- Restore snapshot β†’ state reverts (items, metrics, strategies)
- Verifies full state recovery

Complete Implementation

"""
πŸ”„ Cache System - Interview Implementation
Demonstrates:
1. Basic LRU put/get & eviction
2. TTL expiration
3. Strategy swap (LRU -> LFU)
4. Write policy swap (write-through -> write-back -> flush)
5. Snapshot & restore
6. High volume insertion & metrics summary
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, List
import time
import copy

# =============================
# Data Structures & Strategies
# =============================
@dataclass
class CacheEntry:
    value: Any
    size: int
    created_at: float
    last_access: float
    frequency: int = 0
    ttl: Optional[float] = None  # seconds
    dirty: bool = False

    def expired(self, now: Optional[float] = None) -> bool:
        if self.ttl is None:
            return False
        ref = now or time.time()
        return (ref - self.created_at) >= self.ttl

# ---- Eviction Strategy Interface ----
class EvictionStrategy:
    name: str = "base"

    def on_insert(self, key: str, entry: CacheEntry) -> None:
        pass

    def on_access(self, key: str, entry: CacheEntry) -> None:
        pass

    def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
        raise NotImplementedError

class LRUStrategy(EvictionStrategy):
    name = "lru"
    def on_insert(self, key: str, entry: CacheEntry) -> None:
        entry.last_access = time.time()
    def on_access(self, key: str, entry: CacheEntry) -> None:
        entry.last_access = time.time()
    def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
        if not store:
            return None
        return min(store.items(), key=lambda kv: kv[1].last_access)[0]

class LFUStrategy(EvictionStrategy):
    name = "lfu"
    def on_insert(self, key: str, entry: CacheEntry) -> None:
        entry.frequency = 1
        entry.last_access = time.time()
    def on_access(self, key: str, entry: CacheEntry) -> None:
        entry.frequency += 1
        entry.last_access = time.time()
    def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
        if not store:
            return None
        return min(store.items(), key=lambda kv: (kv[1].frequency, kv[1].last_access))[0]

class FIFOStrategy(EvictionStrategy):
    name = "fifo"
    def on_insert(self, key: str, entry: CacheEntry) -> None:
        pass
    def on_access(self, key: str, entry: CacheEntry) -> None:
        entry.frequency += 1
    def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
        if not store:
            return None
        return min(store.items(), key=lambda kv: kv[1].created_at)[0]

# ---- Write Policy Interface ----
class WritePolicy:
    name: str = "base"
    def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
        raise NotImplementedError
    def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
        raise NotImplementedError
    def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
        return 0

class WriteThroughPolicy(WritePolicy):
    name = "write_through"
    def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
        backing_store[key] = entry.value
        entry.dirty = False
    def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
        backing_store.pop(key, None)
    def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
        return 0

class WriteBackPolicy(WritePolicy):
    name = "write_back"
    def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
        entry.dirty = True
    def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
        backing_store.pop(key, None)
    def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
        count = 0
        for k, e in entries.items():
            if e.dirty:
                backing_store[k] = e.value
                e.dirty = False
                count += 1
        return count

# =============================
# Command Pattern
# =============================
class Command:
    def execute(self):
        raise NotImplementedError
    def undo(self):
        raise NotImplementedError

class SetCommand(Command):
    def __init__(self, cache: 'CacheManager', key: str, value: Any, ttl: Optional[float] = None):
        self.cache = cache
        self.key = key
        self.value = value
        self.ttl = ttl
        self.prev_entry: Optional[CacheEntry] = None
    def execute(self):
        self.prev_entry = copy.deepcopy(self.cache._store.get(self.key))
        self.cache.set(self.key, self.value, self.ttl, via_command=True)
    def undo(self):
        if self.prev_entry is None:
            self.cache.delete(self.key, via_command=True)
        else:
            self.cache._direct_restore_entry(self.key, self.prev_entry)

class DeleteCommand(Command):
    def __init__(self, cache: 'CacheManager', key: str):
        self.cache = cache
        self.key = key
        self.removed_entry: Optional[CacheEntry] = None
    def execute(self):
        if self.key in self.cache._store:
            self.removed_entry = copy.deepcopy(self.cache._store[self.key])
        self.cache.delete(self.key, via_command=True)
    def undo(self):
        if self.removed_entry:
            self.cache._direct_restore_entry(self.key, self.removed_entry)

# =============================
# Cache Manager (Singleton-like)
# =============================
class CacheManager:
    def __init__(self,
                 capacity_items: int,
                 capacity_bytes: int,
                 eviction_strategy: EvictionStrategy,
                 write_policy: WritePolicy):
        self.capacity_items = capacity_items
        self.capacity_bytes = capacity_bytes
        self.eviction_strategy = eviction_strategy
        self.write_policy = write_policy
        self._store: Dict[str, CacheEntry] = {}
        self._backing_store: Dict[str, Any] = {}
        self._listeners: List[Callable[[str, Dict[str, Any]], None]] = []
        self._undo_stack: List[Command] = []
        self._redo_stack: List[Command] = []
        self._metrics = {
            'hits': 0,
            'misses': 0,
            'evictions': 0,
            'expirations': 0,
            'sets': 0,
            'deletes': 0,
            'flushed_writes': 0,
            'dirty_writes_pending': 0,
            'current_items': 0,
            'total_bytes': 0,
        }

    def add_listener(self, fn: Callable[[str, Dict[str, Any]], None]):
        self._listeners.append(fn)

    def set(self, key: str, value: Any, ttl: Optional[float] = None, via_command: bool = False):
        size = self._estimate_size(value)
        now = time.time()
        if key in self._store:
            entry = self._store[key]
            entry.value = value
            entry.size = size
            entry.ttl = ttl
            entry.last_access = now
        else:
            entry = CacheEntry(value=value, size=size, created_at=now, last_access=now, ttl=ttl)
            self._store[key] = entry
            self.eviction_strategy.on_insert(key, entry)
        self._enforce_capacity()
        self.write_policy.on_set(key, entry, self._backing_store)
        self._metrics['sets'] += 1
        self._refresh_metrics()
        self._emit_event('entry_set', {'key': key, 'size': size})
        if not via_command:
            self._record_command(SetCommand(self, key, value, ttl), executed=True)

    def get(self, key: str) -> Optional[Any]:
        now = time.time()
        entry = self._store.get(key)
        if entry is None:
            self._metrics['misses'] += 1
            self._refresh_metrics()
            self._emit_event('entry_get_miss', {'key': key})
            return None
        if entry.expired(now):
            self._store.pop(key, None)
            self._metrics['expirations'] += 1
            self._emit_event('entry_expired', {'key': key})
            self._metrics['misses'] += 1
            self._refresh_metrics()
            return None
        entry.last_access = now
        entry.frequency += 1
        self.eviction_strategy.on_access(key, entry)
        self._metrics['hits'] += 1
        self._refresh_metrics()
        self._emit_event('entry_get_hit', {'key': key})
        return entry.value

    def delete(self, key: str, via_command: bool = False):
        if key not in self._store:
            return
        self._store.pop(key)
        self.write_policy.on_delete(key, self._backing_store)
        self._metrics['deletes'] += 1
        self._refresh_metrics()
        self._emit_event('entry_evicted', {'key': key, 'reason': 'delete'})
        if not via_command:
            self._record_command(DeleteCommand(self, key), executed=True)

    def swap_eviction_strategy(self, new_strategy: EvictionStrategy):
        prev = self.eviction_strategy.name
        self.eviction_strategy = new_strategy
        for k, e in self._store.items():
            new_strategy.on_insert(k, e)
        self._emit_event('strategy_swapped', {'from': prev, 'to': new_strategy.name})

    def swap_write_policy(self, new_policy: WritePolicy):
        prev = self.write_policy.name
        if isinstance(self.write_policy, WriteBackPolicy) and not isinstance(new_policy, WriteBackPolicy):
            flushed = self.write_policy.flush(self._backing_store, self._store)
            self._metrics['flushed_writes'] += flushed
            self._emit_event('flushed', {'flushed_count': flushed})
        self.write_policy = new_policy
        self._emit_event('write_policy_swapped', {'from': prev, 'to': new_policy.name})
        self._refresh_metrics()

    def flush(self):
        flushed = self.write_policy.flush(self._backing_store, self._store)
        if flushed:
            self._metrics['flushed_writes'] += flushed
            self._emit_event('flushed', {'flushed_count': flushed})
            self._refresh_metrics()

    def take_snapshot(self) -> Dict[str, Any]:
        snap = {
            'store': copy.deepcopy(self._store),
            'backing': copy.deepcopy(self._backing_store),
            'eviction_strategy': self.eviction_strategy.name,
            'write_policy': self.write_policy.name,
            'metrics': copy.deepcopy(self._metrics),
            'timestamp': time.time(),
        }
        self._emit_event('snapshot_taken', {'items': len(self._store)})
        return snap

    def restore_snapshot(self, snapshot: Dict[str, Any]):
        self._store = snapshot['store']
        self._backing_store = snapshot['backing']
        self._metrics = snapshot['metrics']
        es_name = snapshot['eviction_strategy']
        wp_name = snapshot['write_policy']
        self.eviction_strategy = {
            'lru': LRUStrategy(), 'lfu': LFUStrategy(), 'fifo': FIFOStrategy()
        }[es_name]
        self.write_policy = {
            'write_through': WriteThroughPolicy(), 'write_back': WriteBackPolicy()
        }[wp_name]
        self._emit_event('snapshot_restored', {})
        self._refresh_metrics()

    def summary(self) -> Dict[str, Any]:
        return copy.deepcopy(self._metrics)

    # ---- Internal Helpers ----
    def _emit_event(self, name: str, payload: Dict[str, Any]):
        for listener_fn in self._listeners:
            try:
                listener_fn(name, payload)
            except Exception:
                pass

    def _estimate_size(self, value: Any) -> int:
        if isinstance(value, (str, bytes)):
            return len(value)
        return 1

    def _current_total_bytes(self) -> int:
        return sum(e.size for e in self._store.values())

    def _enforce_capacity(self):
        safety = 0
        while (len(self._store) > self.capacity_items or self._current_total_bytes() > self.capacity_bytes) and safety < 10_000:
            victim = self.eviction_strategy.choose_victim(self._store)
            if victim is None:
                self._emit_event('eviction_blocked', {})
                break
            removed = self._store.pop(victim)
            if removed.dirty:
                self.write_policy.on_delete(victim, self._backing_store)
            self._metrics['evictions'] += 1
            self._emit_event('entry_evicted', {'key': victim, 'reason': 'capacity'})
            safety += 1
        self._refresh_metrics()

    def _refresh_metrics(self):
        self._metrics['current_items'] = len(self._store)
        self._metrics['total_bytes'] = self._current_total_bytes()
        self._metrics['dirty_writes_pending'] = sum(1 for e in self._store.values() if e.dirty)
        self._emit_event('metrics_updated', copy.deepcopy(self._metrics))

    def _record_command(self, cmd: Command, executed: bool = False):
        if executed:
            self._undo_stack.append(cmd)
            self._redo_stack.clear()
            self._emit_event('command_executed', {'type': cmd.__class__.__name__})
        else:
            cmd.execute()
            self._undo_stack.append(cmd)
            self._redo_stack.clear()
            self._emit_event('command_executed', {'type': cmd.__class__.__name__})

    def undo(self):
        if not self._undo_stack:
            return
        cmd = self._undo_stack.pop()
        cmd.undo()
        self._redo_stack.append(cmd)
        self._refresh_metrics()
        self._emit_event('command_undone', {'type': cmd.__class__.__name__})

    def redo(self):
        if not self._redo_stack:
            return
        cmd = self._redo_stack.pop()
        cmd.execute()
        self._undo_stack.append(cmd)
        self._refresh_metrics()
        self._emit_event('command_redone', {'type': cmd.__class__.__name__})

    def _direct_restore_entry(self, key: str, entry: CacheEntry):
        self._store[key] = entry
        self._refresh_metrics()
        self._emit_event('entry_set', {'key': key, 'size': entry.size, 'restore': True})

# =============================
# Demo / Showcase
# =============================

def listener(event: str, payload: Dict[str, Any]):
    if event in {"metrics_updated"}:
        return
    print(f"[EVENT] {event} -> {payload}")


def demo_1_basic_lru():
    """Demo 1: Basic LRU & Eviction"""
    print("\n" + "="*70)
    print("DEMO 1: BASIC LRU & EVICTION")
    print("="*70)
    cache = CacheManager(capacity_items=5,
                         capacity_bytes=50,
                         eviction_strategy=LRUStrategy(),
                         write_policy=WriteThroughPolicy())
    cache.add_listener(listener)

    for i in range(7):
        cache.set(f"k{i}", f"value-{i}")
    cache.get("k3")
    cache.get("k4")
    cache.set("k_new", "new-value")
    print("Summary:", cache.summary())

def demo_2_ttl():
    """Demo 2: TTL Expiration"""
    print("\n" + "="*70)
    print("DEMO 2: TTL EXPIRATION")
    print("="*70)
    cache = CacheManager(capacity_items=100,
                         capacity_bytes=1000,
                         eviction_strategy=LRUStrategy(),
                         write_policy=WriteThroughPolicy())
    cache.add_listener(listener)

    cache.set("temp", "short", ttl=0.2)
    time.sleep(0.25)
    v = cache.get("temp")
    print(f"Value after TTL expiration: {v}")
    print("Summary:", cache.summary())

def demo_3_strategy_swap():
    """Demo 3: Strategy Swap (LRU -> LFU)"""
    print("\n" + "="*70)
    print("DEMO 3: STRATEGY SWAP (LRU -> LFU)")
    print("="*70)
    cache = CacheManager(capacity_items=10,
                         capacity_bytes=100,
                         eviction_strategy=LRUStrategy(),
                         write_policy=WriteThroughPolicy())
    cache.add_listener(listener)

    for i in range(5):
        cache.set(f"k{i}", f"val{i}")
    for _ in range(5):
        cache.get("k0")
    for _ in range(2):
        cache.get("k1")

    cache.swap_eviction_strategy(LFUStrategy())
    cache.set("new", "item")
    print("Summary:", cache.summary())

def demo_4_write_policy():
    """Demo 4: Write Policy Swap (Through -> Back -> Flush)"""
    print("\n" + "="*70)
    print("DEMO 4: WRITE POLICY SWAP & FLUSH")
    print("="*70)
    cache = CacheManager(capacity_items=100,
                         capacity_bytes=1000,
                         eviction_strategy=LRUStrategy(),
                         write_policy=WriteThroughPolicy())
    cache.add_listener(listener)

    cache.set("k1", "v1")
    cache.swap_write_policy(WriteBackPolicy())
    cache.set("k2", "v2")
    cache.set("k3", "v3")
    print(f"Dirty before flush: {cache.summary()['dirty_writes_pending']}")
    cache.flush()
    print(f"Dirty after flush: {cache.summary()['dirty_writes_pending']}")

def demo_5_snapshot():
    """Demo 5: Snapshot & Restore"""
    print("\n" + "="*70)
    print("DEMO 5: SNAPSHOT & RESTORE")
    print("="*70)
    cache = CacheManager(capacity_items=100,
                         capacity_bytes=1000,
                         eviction_strategy=LRUStrategy(),
                         write_policy=WriteThroughPolicy())
    cache.add_listener(listener)

    cache.set("base1", "val1")
    cache.set("base2", "val2")
    snap = cache.take_snapshot()
    cache.set("after_snap", "val3")
    print(f"Items after snap: {cache.summary()['current_items']}")
    cache.restore_snapshot(snap)
    print(f"Items after restore: {cache.summary()['current_items']}")

if __name__ == "__main__":
    print("\n" + "="*70)
    print("πŸ”„ CACHE SYSTEM - 5 DEMO SCENARIOS")
    print("="*70)

    demo_1_basic_lru()
    demo_2_ttl()
    demo_3_strategy_swap()
    demo_4_write_policy()
    demo_5_snapshot()

    print("\n" + "="*70)
    print("βœ… ALL DEMOS COMPLETED")
    print("="*70 + "\n")

Design Patterns Explained

Pattern Usage Benefit
Singleton CacheManager single global instance Coherent state, no conflicts
Strategy EvictionStrategy (LRU/LFU/FIFO), WritePolicy (Through/Back) Easy swapping, extensibility, loose coupling
Observer Event listeners for cache lifecycle Observability, monitoring, decoupled notifications
Command SetCommand, DeleteCommand for undo/redo Reversible mutations, fine-grained history
Memento Snapshot/restore for whole-system state Coarse rollback, atomic recovery

Key Metrics Tracked

  • hits: Cache hits (key found, not expired)
  • misses: Cache misses (key not found or expired)
  • evictions: Items evicted due to capacity
  • expirations: Items expired via TTL
  • sets: Total set operations
  • deletes: Total delete operations
  • flushed_writes: Writes flushed to backing store
  • dirty_writes_pending: Pending write-back writes (unflushed)
  • current_items: Current count of entries
  • total_bytes: Current memory usage

Summary

βœ… Singleton for global coordination βœ… Strategy for pluggable eviction/write policies βœ… Observer for event-driven observability βœ… Command for reversible mutations (undo/redo) βœ… Memento for snapshot/restore βœ… O(1) average access with O(n) victim selection (optimizable) βœ… Thread-safe (with locks in production version) βœ… Runtime strategy swapping without downtime βœ… TTL expiration with lazy evaluation βœ… Capacity enforcement with metrics

Key Takeaway: Cache demonstrates clean architecture via design patterns, providing adaptability (swap strategies), reliability (correct expiration), and transparency (metrics/events) in a real-world system.