Observer Pattern

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified automatically.


Problem

You need to notify multiple objects about state changes without coupling them tightly. For example, when a YouTube channel uploads a video, all subscribers should be notified.

Solution

The Observer pattern defines a Subject that maintains a list of Observers. When state changes, the Subject notifies all Observers automatically.


Implementation

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Callable

# Observer interface
class Observer(ABC):
    @abstractmethod
    def update(self, event: str, data: Any):
        pass

# Subject interface
class Subject:
    def __init__(self):
        self._observers: List[Observer] = []

    def attach(self, observer: Observer):
        if observer not in self._observers:
            self._observers.append(observer)

    def detach(self, observer: Observer):
        if observer in self._observers:
            self._observers.remove(observer)

    def notify(self, event: str, data: Any):
        for observer in self._observers:
            observer.update(event, data)

# Concrete Subject: YouTube Channel
class YouTubeChannel(Subject):
    def __init__(self, name: str):
        super().__init__()
        self.name = name
        self.videos: List[str] = []

    def upload_video(self, title: str):
        self.videos.append(title)
        self.notify("video_uploaded", {"channel": self.name, "title": title})

    def get_subscriber_count(self) -> int:
        return len(self._observers)

# Concrete Observer: User
class YouTubeUser(Observer):
    def __init__(self, name: str):
        self.name = name

    def update(self, event: str, data: Any):
        if event == "video_uploaded":
            print(f"🔔 {self.name}: New video from {data['channel']}: {data['title']}")
        elif event == "live_stream":
            print(f"🔴 {self.name}: Live stream started: {data['title']}")

# Concrete Observer: Email Notifier
class EmailNotifier(Observer):
    def __init__(self, email: str):
        self.email = email

    def update(self, event: str, data: Any):
        print(f"📧 Email sent to {self.email}: {event} - {data}")

# Concrete Observer: Analytics Logger
class AnalyticsLogger(Observer):
    def update(self, event: str, data: Any):
        print(f"📊 Logged event: {event} with data: {data}")

# Functional approach (using callbacks)
class EventEmitter:
    def __init__(self):
        self._listeners: Dict[str, List[Callable]] = {}

    def on(self, event: str, callback: Callable):
        if event not in self._listeners:
            self._listeners[event] = []
        self._listeners[event].append(callback)

    def off(self, event: str, callback: Callable):
        if event in self._listeners:
            self._listeners[event].remove(callback)

    def emit(self, event: str, *args, **kwargs):
        if event in self._listeners:
            for callback in self._listeners[event]:
                callback(*args, **kwargs)

# Demo
if __name__ == "__main__":
    # Observer pattern (class-based)
    channel = YouTubeChannel("TechTalk")

    user1 = YouTubeUser("Alice")
    user2 = YouTubeUser("Bob")
    email_notifier = EmailNotifier("admin@example.com")
    analytics = AnalyticsLogger()

    channel.attach(user1)
    channel.attach(user2)
    channel.attach(email_notifier)
    channel.attach(analytics)

    print("=== Class-based Observer ===")
    channel.upload_video("Design Patterns Explained")
    print(f"Subscribers: {channel.get_subscriber_count()}\n")

    # Event Emitter (functional approach)
    emitter = EventEmitter()

    def on_login(username):
        print(f"✅ {username} logged in")

    def log_event(username):
        print(f"📝 Event logged for {username}")

    emitter.on("login", on_login)
    emitter.on("login", log_event)

    print("=== Functional Observer ===")
    emitter.emit("login", "charlie")

Key Concepts

  • Subject: Maintains list of observers, notifies on state change
  • Observer: Interface for receiving notifications
  • Loose Coupling: Subject and Observer don't know details about each other
  • Push vs Pull: Subject can push data or Observer can pull data

When to Use

✅ One-to-many dependencies between objects
✅ Need to notify multiple objects without knowing who they are
✅ Building event-driven systems
✅ Publish-subscribe messaging


Interview Q&A

Q1: What's the difference between Observer and Pub/Sub?

A: - Observer: Tightly coupled - Observer knows about Subject directly - Pub/Sub: Loosely coupled - Publisher and Subscriber use a Message Broker as intermediary

# Observer (tight coupling)
subject.attach(observer)  # Observer registers with Subject

# Pub/Sub (loose coupling)
message_broker.publish("event_name", data)  # Publisher sends to broker
message_broker.subscribe("event_name", handler)  # Subscriber receives from broker

Q2: How do you handle exceptions in Observer notifications?

A: Wrap in try-catch to prevent one failing observer from breaking others:

def notify(self, event: str, data: Any):
    for observer in self._observers:
        try:
            observer.update(event, data)
        except Exception as e:
            print(f"Error notifying observer: {e}")

Q3: How would you implement weak references to prevent memory leaks?

A:

import weakref

class Subject:
    def __init__(self):
        self._observers = []  # Will store weak references

    def attach(self, observer: Observer):
        weak_observer = weakref.ref(observer, self._on_observer_deleted)
        self._observers.append(weak_observer)

    def _on_observer_deleted(self, weak_ref):
        self._observers.remove(weak_ref)

    def notify(self, event: str, data: Any):
        for weak_observer in self._observers:
            observer = weak_observer()
            if observer is not None:
                observer.update(event, data)

Q4: How do you prevent infinite notification loops?

A: Use a flag to track if we're in a notification cycle:

class Subject:
    def __init__(self):
        self._observers = []
        self._notifying = False

    def notify(self, event: str, data: Any):
        if self._notifying:
            return  # Prevent recursive notifications

        self._notifying = True
        try:
            for observer in self._observers:
                observer.update(event, data)
        finally:
            self._notifying = False

Q5: How would you implement observer priorities (some observers get notified first)?

A:

class PriorityObserver(Observer, ABC):
    def __init__(self, priority: int = 0):
        self.priority = priority

class PrioritySubject(Subject):
    def attach(self, observer: PriorityObserver):
        self._observers.append(observer)
        self._observers.sort(key=lambda o: o.priority, reverse=True)

    def notify(self, event: str, data: Any):
        for observer in self._observers:
            observer.update(event, data)

Trade-offs

Pros: Loose coupling, dynamic relationships, supports many observers
Cons: Memory leaks if not cleaned up, observer notifications non-deterministic, performance overhead


Real-World Examples

  • Event listeners (React component updates)
  • Message queues (Kafka, RabbitMQ)
  • Database triggers (notify on INSERT/UPDATE)
  • Notification systems (email, SMS, push notifications)