Circuit Breaker Pattern

Provide a way to prevent cascading failures in distributed systems by monitoring for failures and temporarily blocking requests to failing services.


Problem

In distributed systems, services depend on each other. When one service fails, it can cause cascading failures across the entire system. Repeated calls to failing services waste resources.

Solution

The Circuit Breaker pattern monitors for failures and automatically stops sending requests to a failing service, returning an error immediately instead of waiting for timeout.


Implementation

import time
from enum import Enum
from typing import Callable, Any, Optional
from datetime import datetime, timedelta

# ============ CIRCUIT BREAKER STATES ============

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation, requests pass through
    OPEN = "open"          # Failing, requests blocked
    HALF_OPEN = "half_open"  # Testing if service recovered

# ============ BASIC CIRCUIT BREAKER ============

class CircuitBreaker:
    """Simple circuit breaker implementation"""

    def __init__(self, failure_threshold: int = 5, 
                 timeout_seconds: int = 60, 
                 success_threshold: int = 2):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = CircuitState.CLOSED

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function through circuit breaker"""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
            else:
                raise Exception("Circuit breaker is OPEN")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise e

    def _on_success(self):
        """Handle successful call"""
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                print(f"✅ Circuit breaker CLOSED (recovered)")

    def _on_failure(self):
        """Handle failed call"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"🔴 Circuit breaker OPEN (too many failures)")

    def _should_attempt_reset(self) -> bool:
        """Check if timeout has elapsed"""
        if self.last_failure_time is None:
            return False

        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.timeout_seconds

    def get_state(self) -> str:
        return self.state.value

# ============ ADVANCED CIRCUIT BREAKER WITH METRICS ============

class CircuitBreakerMetrics:
    """Track circuit breaker metrics"""

    def __init__(self):
        self.total_calls = 0
        self.successful_calls = 0
        self.failed_calls = 0
        self.blocked_calls = 0

    def record_call(self, success: bool, blocked: bool = False):
        self.total_calls += 1
        if blocked:
            self.blocked_calls += 1
        elif success:
            self.successful_calls += 1
        else:
            self.failed_calls += 1

    def get_success_rate(self) -> float:
        if self.total_calls == 0:
            return 0.0
        return self.successful_calls / self.total_calls

    def __str__(self) -> str:
        return (f"Metrics(total={self.total_calls}, "
                f"success={self.successful_calls}, "
                f"failed={self.failed_calls}, "
                f"blocked={self.blocked_calls}, "
                f"rate={self.get_success_rate():.2%})")

class AdvancedCircuitBreaker:
    """Circuit breaker with metrics and detailed tracking"""

    def __init__(self, failure_threshold: int = 5, 
                 timeout_seconds: int = 60,
                 success_threshold: int = 2):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time: Optional[datetime] = None
        self.state = CircuitState.CLOSED
        self.metrics = CircuitBreakerMetrics()

    def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function with metrics"""

        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                print(f"🟡 Circuit breaker HALF_OPEN (testing recovery)")
            else:
                self.metrics.record_call(False, blocked=True)
                raise Exception(f"Circuit breaker OPEN - service unavailable")

        try:
            result = func(*args, **kwargs)
            self.metrics.record_call(success=True)
            self._on_success()
            return result
        except Exception as e:
            self.metrics.record_call(success=False)
            self._on_failure()
            raise e

    def _on_success(self):
        self.failure_count = 0

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                print(f"✅ Circuit breaker CLOSED")

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = datetime.now()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"🔴 Circuit breaker OPEN")

    def _should_attempt_reset(self) -> bool:
        if self.last_failure_time is None:
            return False

        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.timeout_seconds

    def get_metrics(self) -> CircuitBreakerMetrics:
        return self.metrics

# ============ REAL-WORLD EXAMPLE: Payment Service ============

class PaymentService:
    """External payment service that can fail"""

    def __init__(self, fail_rate: float = 0.0):
        self.fail_rate = fail_rate
        self.call_count = 0

    def process_payment(self, amount: float) -> dict:
        """Simulate payment processing with potential failures"""
        self.call_count += 1

        import random
        if random.random() < self.fail_rate:
            raise Exception("Payment service temporarily unavailable")

        return {"status": "success", "amount": amount, "txn_id": f"TXN{self.call_count}"}

class OrderService:
    """Service that depends on payment service"""

    def __init__(self, payment_service: PaymentService):
        self.payment_service = payment_service
        self.circuit_breaker = AdvancedCircuitBreaker(
            failure_threshold=3,
            timeout_seconds=10,
            success_threshold=2
        )

    def place_order(self, order_id: str, amount: float) -> dict:
        """Place order with circuit breaker protection"""
        try:
            result = self.circuit_breaker.call(
                self.payment_service.process_payment,
                amount
            )
            return {
                "order_id": order_id,
                "status": "success",
                "payment": result
            }
        except Exception as e:
            return {
                "order_id": order_id,
                "status": "failed",
                "error": str(e)
            }

    def get_metrics(self):
        return self.circuit_breaker.get_metrics()

# ============ DECORATOR-BASED CIRCUIT BREAKER ============

def circuit_breaker_decorator(failure_threshold: int = 5, 
                               timeout_seconds: int = 60):
    """Decorator to add circuit breaker to any function"""

    def decorator(func: Callable) -> Callable:
        cb = CircuitBreaker(
            failure_threshold=failure_threshold,
            timeout_seconds=timeout_seconds
        )

        def wrapper(*args, **kwargs):
            return cb.call(func, *args, **kwargs)

        wrapper.circuit_breaker = cb
        return wrapper

    return decorator

@circuit_breaker_decorator(failure_threshold=2, timeout_seconds=5)
def external_api_call(endpoint: str) -> str:
    """Simulated external API call"""
    import random
    if random.random() < 0.6:
        raise Exception(f"Failed to reach {endpoint}")
    return f"Response from {endpoint}"

# ============ FALLBACK STRATEGY ============

class CircuitBreakerWithFallback:
    """Circuit breaker with fallback behavior"""

    def __init__(self, primary_func: Callable, 
                 fallback_func: Callable,
                 failure_threshold: int = 5,
                 timeout_seconds: int = 60):
        self.primary_func = primary_func
        self.fallback_func = fallback_func
        self.circuit_breaker = CircuitBreaker(
            failure_threshold=failure_threshold,
            timeout_seconds=timeout_seconds
        )

    def call(self, *args, **kwargs) -> Any:
        """Try primary, fall back on circuit open"""
        try:
            return self.circuit_breaker.call(self.primary_func, *args, **kwargs)
        except Exception as e:
            print(f"⚠️ Falling back due to: {e}")
            return self.fallback_func(*args, **kwargs)

# Demo
if __name__ == "__main__":
    print("=" * 60)
    print("=== BASIC CIRCUIT BREAKER ===")
    print("=" * 60)

    def unreliable_service(should_fail: bool = False):
        if should_fail:
            raise Exception("Service failed")
        return "Success"

    cb = CircuitBreaker(failure_threshold=3, timeout_seconds=5)

    # Simulate failures
    for i in range(5):
        try:
            result = cb.call(unreliable_service, should_fail=(i < 3))
            print(f"Call {i+1}: {result} - State: {cb.get_state()}")
        except Exception as e:
            print(f"Call {i+1}: {e} - State: {cb.get_state()}")

    print("\n" + "=" * 60)
    print("=== ADVANCED CIRCUIT BREAKER WITH METRICS ===")
    print("=" * 60)

    payment_service = PaymentService(fail_rate=0.0)
    order_service = OrderService(payment_service)

    # Successful orders
    for i in range(3):
        result = order_service.place_order(f"ORD{i+1}", 100.0)
        print(f"Order {i+1}: {result['status']}")

    print(f"Metrics: {order_service.get_metrics()}\n")

    # Simulate service failure
    payment_service.fail_rate = 0.9
    print("Service failure rate increased to 90%...\n")

    for i in range(5):
        result = order_service.place_order(f"ORD{i+4}", 100.0)
        print(f"Order {i+4}: {result['status']}")

    print(f"Metrics: {order_service.get_metrics()}\n")

    print("=" * 60)
    print("=== CIRCUIT BREAKER WITH FALLBACK ===")
    print("=" * 60)

    def primary_service():
        import random
        if random.random() < 0.8:
            raise Exception("Primary failed")
        return "Primary response"

    def fallback_service():
        return "Fallback response"

    cb_fallback = CircuitBreakerWithFallback(
        primary_service, fallback_service,
        failure_threshold=2, timeout_seconds=5
    )

    for i in range(5):
        result = cb_fallback.call()
        print(f"Call {i+1}: {result}")

Key Concepts

  • CLOSED State: Normal operation, requests pass through
  • OPEN State: Service failing, requests blocked immediately
  • HALF_OPEN State: Testing if service recovered, limited requests allowed
  • Failure Threshold: Number of failures before opening circuit
  • Timeout: Duration before attempting recovery
  • Success Threshold: Successful calls needed in HALF_OPEN to close circuit

When to Use

✅ Microservices communication
✅ External API calls
✅ Database connections
✅ Prevent cascading failures
✅ Resource protection


Interview Q&A

Q1: What are the three states of a circuit breaker and transitions?

A: - CLOSED → OPEN: Failure count exceeds threshold - OPEN → HALF_OPEN: Timeout period expires - HALF_OPEN → CLOSED: Enough successful calls - HALF_OPEN → OPEN: Failure occurs


Q2: How do you handle timeouts in circuit breaker?

A:

def _should_attempt_reset(self):
    if not self.last_failure_time:
        return False
    elapsed = (datetime.now() - self.last_failure_time).total_seconds()
    return elapsed >= self.timeout_seconds

Q3: What metrics should you track?

A: Success rate, failure rate, blocked calls, state changes, call latency

class CircuitBreakerMetrics:
    def __init__(self):
        self.total_calls = 0
        self.successful_calls = 0
        self.failed_calls = 0
        self.blocked_calls = 0

Q4: How do you combine circuit breaker with fallback?

A:

try:
    return circuit_breaker.call(primary_service)
except Exception:
    return fallback_service()

Q5: How do you test circuit breaker code?

A:

def test_circuit_opens_after_failures():
    cb = CircuitBreaker(failure_threshold=2)

    for _ in range(2):
        try:
            cb.call(failing_function)
        except:
            pass

    assert cb.get_state() == "open"

Trade-offs

Pros: Prevents cascading failures, fast failure, resource protection
Cons: Complexity, requires fallback strategy, state management


Real-World Examples

  • Netflix Hystrix (now Resilience4j)
  • Spring Cloud Circuit Breaker
  • AWS AppConfig
  • Kubernetes liveness probes