Cache β 75-Minute Interview Guide
Quick Start
What is it? A configurable in-memory cache with pluggable eviction strategies (LRU, LFU, FIFO), write policies (write-through, write-back), TTL expiration, undo/redo commands, snapshots, and event-based observability.
Key Classes:
- CacheManager (Singleton): Orchestrates cache operations
- CacheEntry: Stores value, size, timestamps, frequency, TTL, dirty flag
- EvictionStrategy: LRU, LFU, FIFO implementations
- WritePolicy: WriteThroughPolicy, WriteBackPolicy implementations
- Command: SetCommand, DeleteCommand for undo/redo
Core Flows: 1. Set: Validate β Evict (if needed) β Insert β Update metadata β Write policy β Emit events 2. Get: Check existence β Expire if TTL passed β Update access metadata β Return 3. Evict: When capacity exceeded, strategy chooses victim, removes, emits event
5 Design Patterns:
- Singleton: One CacheManager coordinates all operations
- Strategy: Multiple eviction algorithms without code changes
- Observer: Event listeners for lifecycle monitoring
- Command: Reversible set/delete operations with undo/redo
- Memento: Snapshot/restore for whole-system rollback
System Overview
An observable, strategy-driven cache supporting multiple algorithms (LRU, LFU, FIFO), pluggable write policies (write-through / write-back), undoable mutations via Command pattern, snapshot & restore, and rich metrics emission. Core focus: adaptability, reliability, and transparency.
Requirements
Functional:
- put(key, value, ttl?)
- get(key) with miss/hit/expiration handling
- delete(key)
- Capacity limits (items and bytes)
- Eviction strategies (LRU, LFU, FIFO baseline)
- TTL expiration (lazy check on access)
- Write policies (write-through vs write-back)
- Snapshot & restore
- Undo/redo for set/delete operations
Non-Functional: - O(1) average key access - Stable event emission for monitoring - Runtime strategy swapping without downtime - Bounded metadata overhead - Support 1000+ concurrent operations (note: single-threaded in demo, locks in production)
Constraints: - Items capacity: e.g., 1000 max entries - Bytes capacity: e.g., 10MB max memory - TTL: Per-key expiration - Strategy swaps: Preserve entries, only change victim selection
Architecture Diagram (ASCII UML)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CacheManager (Singleton) β
β Orchestrates store, strategies, policies, events β
ββββββββββββββ¬βββββββββββββββββββββββββββββββββββββββββββ
β
ββββββββββΌβββββββββββββ¬ββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
βββββββββββ ββββββββββββββββ ββββββββββββββββ
β Store β βEvictionStrat.β β WritePolicy β
β(Dict) β β(LRU/LFU/FIFO)β β(Through/Back)β
ββββββ¬βββββ ββββββββββββββββ ββββββββββββββββ
β
βΌ
ββββββββββββββββββββ
β CacheEntry β
ββββββββββββββββββββ€
β+value β
β+size β
β+frequency β
β+last_access β
β+created_at β
β+ttl β
β+dirty β
ββββββββββββββββββββ
Command Pattern (Undo/Redo):
βββββββββββββββββββββββ
β Command (ABC) β
ββββββ¬βββββββββββββββββ
β
ββββ΄βββ¬βββββββββββ
βΌ βΌ βΌ
ββββββββββββ ββββββββββββ βββββββββββββββ
βSetCmd β βDeleteCmd β βUndo/Redo β
β (holds β β (holds β β Stacks β
β old val) β β removed) β β β
ββββββββββββ ββββββββββββ βββββββββββββββ
Observer Pattern (Events):
βββββββββββββββββββββββββββββββββββββββ
β Listeners (callbacks) β
β - entry_set, entry_get_hit/miss β
β - entry_evicted, entry_expired β
β - strategy_swapped, policy_swapped β
β - snapshot_taken/restored β
β - metrics_updated β
βββββββββββββββββββββββββββββββββββββββ
Memento Pattern (Snapshots):
ββββββββββββββββββββββββββββββββββββββββ
β Snapshot (Deep Copy) β
β - store state β
β - backing store β
β - strategy/policy identifiers β
β - metrics baseline β
ββββββββββββββββββββββββββββββββββββββββ
Interview Q&A (12 Questions)
Basic Level
Q1: What is the main purpose of the CacheManager? A: Singleton pattern ensures one global instance coordinating all cache operations (put/get/delete), strategies (LRU/LFU/FIFO), write policies (through/back), and metrics/events emission for observability.
Q2: Why do you need multiple eviction strategies? A: Different workloads benefit from different policies: LRU favors recent access patterns, LFU favors frequently accessed items, FIFO is simple for fair rotation. Pluggable Strategy pattern allows swapping at runtime without modifying core code.
Q3: What is TTL and how is it handled? A: TTL (time-to-live) is per-key expiration time. System uses lazy expiration: checks on access. If expired, entry removed, recorded as miss/eviction. Advantage: no background thread overhead. Trade-off: stale entries briefly visible.
Q4: Why separate WritePolicy from eviction strategy? A: Clear separation of concerns: eviction decides residency (which keys stay), write policy decides persistence (when to write backing store). Allows independent swapping and testing.
Intermediate Level
Q5: How does write-through differ from write-back? A: Write-through: immediately persist to backing store on set (safe but slower). Write-back: mark entry dirty, defer writing (fast but risky if crash). System requires explicit flush() for write-back.
Q6: How do you safely swap strategies at runtime? A: Eviction strategy swap rebuilds metadata (LRU frequency counters) on existing entries but preserves all data. Write policy swap flushes if leaving write-back to avoid data loss. Both emit events for monitoring.
Q7: How does the Command pattern enable undo/redo? A: SetCommand and DeleteCommand encapsulate reversible mutations, storing previous state (old entry for set, removed entry for delete). Undo stack reverses last command; redo reapplies. Each new command clears redo stack.
Q8: How is memory managed under capacity pressure?
A: _enforce_capacity() runs after each set, repeatedly calling strategy's choose_victim() until item count β€ capacity_items AND total bytes β€ capacity_bytes. Victim removed, eviction metric incremented, event emitted.
Advanced Level
Q9: How would you handle concurrent access in production? A: Add threading.Lock() around critical sections: store access, capacity enforcement, metric updates. Strategy's on_access() needs thread-safe frequency updates. Write policy flush needs atomic backing store write.
Q10: How would you optimize LFU for 1M entries? A: Naive LFU scans all entries to find min frequency: O(n) per eviction. Optimize: bucket-based LFU mapping frequency β ordered set of keys. Update on access is O(log n), victim selection O(1). Trade-off: higher memory for faster eviction.
Q11: How would you scale this to distributed cache? A: Shard by key hash (consistent hashing) across multiple nodes. Each shard runs same architecture independently. Promotion strategy: hot keys replicated to fast tier (e.g., local RAM + remote store). Leader for invalidation coordination.
Q12: What security/correctness concerns exist? A: Size accounting prevents memory blowups. TTL prevents eternal residency. Snapshot restore replaces entire state (avoids partial injection). Missing: access control (who can get/set), audit logging, encryption at rest, rate limiting.
Scaling Q&A (12 Questions)
Q1: How does eviction strategy affect performance at 100K keys? A: LRU: maintains last_access timestamps, O(1) per access, victim selection O(n) scan (acceptable). At 100K, could optimize with time-bucketing or segment-based approach. LFU: similar O(n) scan, bucket optimization suggested.
Q2: What happens if the cache fills up and strategy returns no victim?
A: Current implementation prevents infinite loop with safety counter (10K iterations). If no victim found, emit eviction_blocked event and stop accepting new keys. Production: reject new puts with error, alert ops.
Q3: How do you prevent memory growth from undo/redo stacks? A: Cap stack sizes (e.g., max 100 commands). When exceeded, pop oldest. Or implement per-session undo (discard when user disconnects). Trade-off: bounded memory vs full history.
Q4: Can snapshot restore happen while writes are in progress? A: In single-threaded demo, no. Production: acquire global write lock during restore to ensure consistency. Alternative: versioned snapshots + eventual consistency model.
Q5: How does write-back handle crash scenarios? A: Current implementation loses dirty writes. Production mitigation: WAL (write-ahead logging) per dirty write before acknowledging set(). On startup, replay log to restore dirty entries. Trade-off: write latency vs durability.
Q6: What's the memory overhead of tracking frequency + timestamps + TTL per entry? A: CacheEntry: value + size + created_at + last_access + frequency + ttl + dirty β 48+ bytes per entry. For 100K entries: 4.8MB overhead. Acceptable. Optimization: use compact integer timestamps for memory-constrained systems.
Q7: How do you handle TTL expiration at scale? A: Current: lazy expiration on access only (stale entries accumulate). Production: background thread periodic scan + removal. Alternative: delta queue of pending expirations (O(1) add, pop nearest). Trade-off: background CPU vs stale data window.
Q8: How would you monitor cache health metrics?
A: Emit metrics_updated event with hit rate, eviction pressure, dirty backlog. Exporter: Prometheus format. Alerts: hit ratio drops below threshold, eviction rate > threshold, dirty backlog growing unbounded.
Q9: Can write policy swap happen mid-flush? A: Current: swap triggers flush if leaving write-back. If flush in progress elsewhere, potential race. Solution: acquire lock during swap, ensure flush completes before releasing policy reference.
Q10: How to partition cache across NUMA nodes for locality? A: Shard by key hash. Each NUMA node owns subset of shards. Thread binds to NUMA node; access local shards (cache-friendly). Remote shard access incurs penalty but infrequent. Performance gain: 20-30% on NUMA systems.
Q11: What if an eviction strategy always returns same victim (bug)? A: Safety counter prevents infinite loop. But cache keeps trying same victim, making no progress. Detect: track prev_victim across iterations. If repeated, trigger strategy fallback (e.g., FIFO) or emit alert.
Q12: How do you handle cache stampede (1000 simultaneous misses after expiration)? A: Lazy expiration causes sudden miss spike. Mitigation: probabilistic early expiration (remove entry slightly before TTL), or lock-free read during fetch (wait for one fetch result, all others use it). Trade-off: extra reads vs stampede risk.
Demo Scenarios (5 Examples)
Demo 1: Basic LRU Put/Get & Eviction
- Set k0 β k6 (7 items, capacity 5)
- Access k3, k4 to make them recent
- Set k_new β triggers eviction of oldest (k0, k1, k2)
- Show remaining: k3, k4, k5, k6, k_new
Demo 2: TTL Expiration
- Set "temp" with ttl=0.2 seconds
- Wait 0.25 seconds
- Get "temp" β None (lazy expiration on access)
- Hit ratio affected (counts as miss)
Demo 3: Eviction Strategy Swap (LRU β LFU)
- Initially LRU
- Access k3 five times, k4 twice
- Swap to LFU
- Set new_item β LFU evicts k4 (lower frequency) not k3
Demo 4: Write Policy Swap (Through β Back β Flush)
- Set with write-through (immediate backing store write)
- Swap to write-back (mark dirty)
- Set multiple items (dirty_pending increases)
- Call flush() β writes all dirty, dirty_pending = 0
- Swap back to write-through
Demo 5: Snapshot & Restore
- Take snapshot of current state
- Set new items
- Restore snapshot β state reverts (items, metrics, strategies)
- Verifies full state recovery
Complete Implementation
"""
π Cache System - Interview Implementation
Demonstrates:
1. Basic LRU put/get & eviction
2. TTL expiration
3. Strategy swap (LRU -> LFU)
4. Write policy swap (write-through -> write-back -> flush)
5. Snapshot & restore
6. High volume insertion & metrics summary
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, List
import time
import copy
# =============================
# Data Structures & Strategies
# =============================
@dataclass
class CacheEntry:
value: Any
size: int
created_at: float
last_access: float
frequency: int = 0
ttl: Optional[float] = None # seconds
dirty: bool = False
def expired(self, now: Optional[float] = None) -> bool:
if self.ttl is None:
return False
ref = now or time.time()
return (ref - self.created_at) >= self.ttl
# ---- Eviction Strategy Interface ----
class EvictionStrategy:
name: str = "base"
def on_insert(self, key: str, entry: CacheEntry) -> None:
pass
def on_access(self, key: str, entry: CacheEntry) -> None:
pass
def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
raise NotImplementedError
class LRUStrategy(EvictionStrategy):
name = "lru"
def on_insert(self, key: str, entry: CacheEntry) -> None:
entry.last_access = time.time()
def on_access(self, key: str, entry: CacheEntry) -> None:
entry.last_access = time.time()
def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
if not store:
return None
return min(store.items(), key=lambda kv: kv[1].last_access)[0]
class LFUStrategy(EvictionStrategy):
name = "lfu"
def on_insert(self, key: str, entry: CacheEntry) -> None:
entry.frequency = 1
entry.last_access = time.time()
def on_access(self, key: str, entry: CacheEntry) -> None:
entry.frequency += 1
entry.last_access = time.time()
def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
if not store:
return None
return min(store.items(), key=lambda kv: (kv[1].frequency, kv[1].last_access))[0]
class FIFOStrategy(EvictionStrategy):
name = "fifo"
def on_insert(self, key: str, entry: CacheEntry) -> None:
pass
def on_access(self, key: str, entry: CacheEntry) -> None:
entry.frequency += 1
def choose_victim(self, store: Dict[str, CacheEntry]) -> Optional[str]:
if not store:
return None
return min(store.items(), key=lambda kv: kv[1].created_at)[0]
# ---- Write Policy Interface ----
class WritePolicy:
name: str = "base"
def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
raise NotImplementedError
def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
raise NotImplementedError
def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
return 0
class WriteThroughPolicy(WritePolicy):
name = "write_through"
def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
backing_store[key] = entry.value
entry.dirty = False
def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
backing_store.pop(key, None)
def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
return 0
class WriteBackPolicy(WritePolicy):
name = "write_back"
def on_set(self, key: str, entry: CacheEntry, backing_store: Dict[str, Any]) -> None:
entry.dirty = True
def on_delete(self, key: str, backing_store: Dict[str, Any]) -> None:
backing_store.pop(key, None)
def flush(self, backing_store: Dict[str, Any], entries: Dict[str, CacheEntry]) -> int:
count = 0
for k, e in entries.items():
if e.dirty:
backing_store[k] = e.value
e.dirty = False
count += 1
return count
# =============================
# Command Pattern
# =============================
class Command:
def execute(self):
raise NotImplementedError
def undo(self):
raise NotImplementedError
class SetCommand(Command):
def __init__(self, cache: 'CacheManager', key: str, value: Any, ttl: Optional[float] = None):
self.cache = cache
self.key = key
self.value = value
self.ttl = ttl
self.prev_entry: Optional[CacheEntry] = None
def execute(self):
self.prev_entry = copy.deepcopy(self.cache._store.get(self.key))
self.cache.set(self.key, self.value, self.ttl, via_command=True)
def undo(self):
if self.prev_entry is None:
self.cache.delete(self.key, via_command=True)
else:
self.cache._direct_restore_entry(self.key, self.prev_entry)
class DeleteCommand(Command):
def __init__(self, cache: 'CacheManager', key: str):
self.cache = cache
self.key = key
self.removed_entry: Optional[CacheEntry] = None
def execute(self):
if self.key in self.cache._store:
self.removed_entry = copy.deepcopy(self.cache._store[self.key])
self.cache.delete(self.key, via_command=True)
def undo(self):
if self.removed_entry:
self.cache._direct_restore_entry(self.key, self.removed_entry)
# =============================
# Cache Manager (Singleton-like)
# =============================
class CacheManager:
def __init__(self,
capacity_items: int,
capacity_bytes: int,
eviction_strategy: EvictionStrategy,
write_policy: WritePolicy):
self.capacity_items = capacity_items
self.capacity_bytes = capacity_bytes
self.eviction_strategy = eviction_strategy
self.write_policy = write_policy
self._store: Dict[str, CacheEntry] = {}
self._backing_store: Dict[str, Any] = {}
self._listeners: List[Callable[[str, Dict[str, Any]], None]] = []
self._undo_stack: List[Command] = []
self._redo_stack: List[Command] = []
self._metrics = {
'hits': 0,
'misses': 0,
'evictions': 0,
'expirations': 0,
'sets': 0,
'deletes': 0,
'flushed_writes': 0,
'dirty_writes_pending': 0,
'current_items': 0,
'total_bytes': 0,
}
def add_listener(self, fn: Callable[[str, Dict[str, Any]], None]):
self._listeners.append(fn)
def set(self, key: str, value: Any, ttl: Optional[float] = None, via_command: bool = False):
size = self._estimate_size(value)
now = time.time()
if key in self._store:
entry = self._store[key]
entry.value = value
entry.size = size
entry.ttl = ttl
entry.last_access = now
else:
entry = CacheEntry(value=value, size=size, created_at=now, last_access=now, ttl=ttl)
self._store[key] = entry
self.eviction_strategy.on_insert(key, entry)
self._enforce_capacity()
self.write_policy.on_set(key, entry, self._backing_store)
self._metrics['sets'] += 1
self._refresh_metrics()
self._emit_event('entry_set', {'key': key, 'size': size})
if not via_command:
self._record_command(SetCommand(self, key, value, ttl), executed=True)
def get(self, key: str) -> Optional[Any]:
now = time.time()
entry = self._store.get(key)
if entry is None:
self._metrics['misses'] += 1
self._refresh_metrics()
self._emit_event('entry_get_miss', {'key': key})
return None
if entry.expired(now):
self._store.pop(key, None)
self._metrics['expirations'] += 1
self._emit_event('entry_expired', {'key': key})
self._metrics['misses'] += 1
self._refresh_metrics()
return None
entry.last_access = now
entry.frequency += 1
self.eviction_strategy.on_access(key, entry)
self._metrics['hits'] += 1
self._refresh_metrics()
self._emit_event('entry_get_hit', {'key': key})
return entry.value
def delete(self, key: str, via_command: bool = False):
if key not in self._store:
return
self._store.pop(key)
self.write_policy.on_delete(key, self._backing_store)
self._metrics['deletes'] += 1
self._refresh_metrics()
self._emit_event('entry_evicted', {'key': key, 'reason': 'delete'})
if not via_command:
self._record_command(DeleteCommand(self, key), executed=True)
def swap_eviction_strategy(self, new_strategy: EvictionStrategy):
prev = self.eviction_strategy.name
self.eviction_strategy = new_strategy
for k, e in self._store.items():
new_strategy.on_insert(k, e)
self._emit_event('strategy_swapped', {'from': prev, 'to': new_strategy.name})
def swap_write_policy(self, new_policy: WritePolicy):
prev = self.write_policy.name
if isinstance(self.write_policy, WriteBackPolicy) and not isinstance(new_policy, WriteBackPolicy):
flushed = self.write_policy.flush(self._backing_store, self._store)
self._metrics['flushed_writes'] += flushed
self._emit_event('flushed', {'flushed_count': flushed})
self.write_policy = new_policy
self._emit_event('write_policy_swapped', {'from': prev, 'to': new_policy.name})
self._refresh_metrics()
def flush(self):
flushed = self.write_policy.flush(self._backing_store, self._store)
if flushed:
self._metrics['flushed_writes'] += flushed
self._emit_event('flushed', {'flushed_count': flushed})
self._refresh_metrics()
def take_snapshot(self) -> Dict[str, Any]:
snap = {
'store': copy.deepcopy(self._store),
'backing': copy.deepcopy(self._backing_store),
'eviction_strategy': self.eviction_strategy.name,
'write_policy': self.write_policy.name,
'metrics': copy.deepcopy(self._metrics),
'timestamp': time.time(),
}
self._emit_event('snapshot_taken', {'items': len(self._store)})
return snap
def restore_snapshot(self, snapshot: Dict[str, Any]):
self._store = snapshot['store']
self._backing_store = snapshot['backing']
self._metrics = snapshot['metrics']
es_name = snapshot['eviction_strategy']
wp_name = snapshot['write_policy']
self.eviction_strategy = {
'lru': LRUStrategy(), 'lfu': LFUStrategy(), 'fifo': FIFOStrategy()
}[es_name]
self.write_policy = {
'write_through': WriteThroughPolicy(), 'write_back': WriteBackPolicy()
}[wp_name]
self._emit_event('snapshot_restored', {})
self._refresh_metrics()
def summary(self) -> Dict[str, Any]:
return copy.deepcopy(self._metrics)
# ---- Internal Helpers ----
def _emit_event(self, name: str, payload: Dict[str, Any]):
for listener_fn in self._listeners:
try:
listener_fn(name, payload)
except Exception:
pass
def _estimate_size(self, value: Any) -> int:
if isinstance(value, (str, bytes)):
return len(value)
return 1
def _current_total_bytes(self) -> int:
return sum(e.size for e in self._store.values())
def _enforce_capacity(self):
safety = 0
while (len(self._store) > self.capacity_items or self._current_total_bytes() > self.capacity_bytes) and safety < 10_000:
victim = self.eviction_strategy.choose_victim(self._store)
if victim is None:
self._emit_event('eviction_blocked', {})
break
removed = self._store.pop(victim)
if removed.dirty:
self.write_policy.on_delete(victim, self._backing_store)
self._metrics['evictions'] += 1
self._emit_event('entry_evicted', {'key': victim, 'reason': 'capacity'})
safety += 1
self._refresh_metrics()
def _refresh_metrics(self):
self._metrics['current_items'] = len(self._store)
self._metrics['total_bytes'] = self._current_total_bytes()
self._metrics['dirty_writes_pending'] = sum(1 for e in self._store.values() if e.dirty)
self._emit_event('metrics_updated', copy.deepcopy(self._metrics))
def _record_command(self, cmd: Command, executed: bool = False):
if executed:
self._undo_stack.append(cmd)
self._redo_stack.clear()
self._emit_event('command_executed', {'type': cmd.__class__.__name__})
else:
cmd.execute()
self._undo_stack.append(cmd)
self._redo_stack.clear()
self._emit_event('command_executed', {'type': cmd.__class__.__name__})
def undo(self):
if not self._undo_stack:
return
cmd = self._undo_stack.pop()
cmd.undo()
self._redo_stack.append(cmd)
self._refresh_metrics()
self._emit_event('command_undone', {'type': cmd.__class__.__name__})
def redo(self):
if not self._redo_stack:
return
cmd = self._redo_stack.pop()
cmd.execute()
self._undo_stack.append(cmd)
self._refresh_metrics()
self._emit_event('command_redone', {'type': cmd.__class__.__name__})
def _direct_restore_entry(self, key: str, entry: CacheEntry):
self._store[key] = entry
self._refresh_metrics()
self._emit_event('entry_set', {'key': key, 'size': entry.size, 'restore': True})
# =============================
# Demo / Showcase
# =============================
def listener(event: str, payload: Dict[str, Any]):
if event in {"metrics_updated"}:
return
print(f"[EVENT] {event} -> {payload}")
def demo_1_basic_lru():
"""Demo 1: Basic LRU & Eviction"""
print("\n" + "="*70)
print("DEMO 1: BASIC LRU & EVICTION")
print("="*70)
cache = CacheManager(capacity_items=5,
capacity_bytes=50,
eviction_strategy=LRUStrategy(),
write_policy=WriteThroughPolicy())
cache.add_listener(listener)
for i in range(7):
cache.set(f"k{i}", f"value-{i}")
cache.get("k3")
cache.get("k4")
cache.set("k_new", "new-value")
print("Summary:", cache.summary())
def demo_2_ttl():
"""Demo 2: TTL Expiration"""
print("\n" + "="*70)
print("DEMO 2: TTL EXPIRATION")
print("="*70)
cache = CacheManager(capacity_items=100,
capacity_bytes=1000,
eviction_strategy=LRUStrategy(),
write_policy=WriteThroughPolicy())
cache.add_listener(listener)
cache.set("temp", "short", ttl=0.2)
time.sleep(0.25)
v = cache.get("temp")
print(f"Value after TTL expiration: {v}")
print("Summary:", cache.summary())
def demo_3_strategy_swap():
"""Demo 3: Strategy Swap (LRU -> LFU)"""
print("\n" + "="*70)
print("DEMO 3: STRATEGY SWAP (LRU -> LFU)")
print("="*70)
cache = CacheManager(capacity_items=10,
capacity_bytes=100,
eviction_strategy=LRUStrategy(),
write_policy=WriteThroughPolicy())
cache.add_listener(listener)
for i in range(5):
cache.set(f"k{i}", f"val{i}")
for _ in range(5):
cache.get("k0")
for _ in range(2):
cache.get("k1")
cache.swap_eviction_strategy(LFUStrategy())
cache.set("new", "item")
print("Summary:", cache.summary())
def demo_4_write_policy():
"""Demo 4: Write Policy Swap (Through -> Back -> Flush)"""
print("\n" + "="*70)
print("DEMO 4: WRITE POLICY SWAP & FLUSH")
print("="*70)
cache = CacheManager(capacity_items=100,
capacity_bytes=1000,
eviction_strategy=LRUStrategy(),
write_policy=WriteThroughPolicy())
cache.add_listener(listener)
cache.set("k1", "v1")
cache.swap_write_policy(WriteBackPolicy())
cache.set("k2", "v2")
cache.set("k3", "v3")
print(f"Dirty before flush: {cache.summary()['dirty_writes_pending']}")
cache.flush()
print(f"Dirty after flush: {cache.summary()['dirty_writes_pending']}")
def demo_5_snapshot():
"""Demo 5: Snapshot & Restore"""
print("\n" + "="*70)
print("DEMO 5: SNAPSHOT & RESTORE")
print("="*70)
cache = CacheManager(capacity_items=100,
capacity_bytes=1000,
eviction_strategy=LRUStrategy(),
write_policy=WriteThroughPolicy())
cache.add_listener(listener)
cache.set("base1", "val1")
cache.set("base2", "val2")
snap = cache.take_snapshot()
cache.set("after_snap", "val3")
print(f"Items after snap: {cache.summary()['current_items']}")
cache.restore_snapshot(snap)
print(f"Items after restore: {cache.summary()['current_items']}")
if __name__ == "__main__":
print("\n" + "="*70)
print("π CACHE SYSTEM - 5 DEMO SCENARIOS")
print("="*70)
demo_1_basic_lru()
demo_2_ttl()
demo_3_strategy_swap()
demo_4_write_policy()
demo_5_snapshot()
print("\n" + "="*70)
print("β
ALL DEMOS COMPLETED")
print("="*70 + "\n")
Design Patterns Explained
| Pattern | Usage | Benefit |
|---|---|---|
| Singleton | CacheManager single global instance |
Coherent state, no conflicts |
| Strategy | EvictionStrategy (LRU/LFU/FIFO), WritePolicy (Through/Back) | Easy swapping, extensibility, loose coupling |
| Observer | Event listeners for cache lifecycle | Observability, monitoring, decoupled notifications |
| Command | SetCommand, DeleteCommand for undo/redo | Reversible mutations, fine-grained history |
| Memento | Snapshot/restore for whole-system state | Coarse rollback, atomic recovery |
Key Metrics Tracked
- hits: Cache hits (key found, not expired)
- misses: Cache misses (key not found or expired)
- evictions: Items evicted due to capacity
- expirations: Items expired via TTL
- sets: Total set operations
- deletes: Total delete operations
- flushed_writes: Writes flushed to backing store
- dirty_writes_pending: Pending write-back writes (unflushed)
- current_items: Current count of entries
- total_bytes: Current memory usage
Summary
β Singleton for global coordination β Strategy for pluggable eviction/write policies β Observer for event-driven observability β Command for reversible mutations (undo/redo) β Memento for snapshot/restore β O(1) average access with O(n) victim selection (optimizable) β Thread-safe (with locks in production version) β Runtime strategy swapping without downtime β TTL expiration with lazy evaluation β Capacity enforcement with metrics
Key Takeaway: Cache demonstrates clean architecture via design patterns, providing adaptability (swap strategies), reliability (correct expiration), and transparency (metrics/events) in a real-world system.