12-Security-Best-Practices
Security Best Practices
Table of Contents
- Authentication & Authorization
- API Security Best Practices
- Data Security
- Network Security
- Application Security
- Infrastructure Security
- Compliance & Privacy
- Interview Questions & Answers
Authentication & Authorization
Authentication Mechanisms
1. Password-Based Authentication
Best Practices:
Click to view code (python)
import bcrypt
import secrets
from datetime import datetime, timedelta
class AuthService:
def __init__(self):
self.min_password_length = 12
self.password_requirements = {
'uppercase': True,
'lowercase': True,
'digits': True,
'special_chars': True
}
def hash_password(self, password: str) -> bytes:
"""Hash password using bcrypt with salt"""
# bcrypt automatically generates and stores salt
salt = bcrypt.gensalt(rounds=12) # Cost factor 12
return bcrypt.hashpw(password.encode('utf-8'), salt)
def verify_password(self, password: str, hashed: bytes) -> bool:
"""Verify password against hash"""
return bcrypt.checkpw(password.encode('utf-8'), hashed)
def validate_password_strength(self, password: str) -> tuple[bool, str]:
"""Validate password meets security requirements"""
if len(password) < self.min_password_length:
return False, f"Password must be at least {self.min_password_length} characters"
if not any(c.isupper() for c in password):
return False, "Password must contain uppercase letter"
if not any(c.islower() for c in password):
return False, "Password must contain lowercase letter"
if not any(c.isdigit() for c in password):
return False, "Password must contain digit"
special = set('!@#$%^&*()_+-=[]{}|;:,.<>?')
if not any(c in special for c in password):
return False, "Password must contain special character"
# Check against common passwords
if self.is_common_password(password):
return False, "Password is too common"
return True, "Password is strong"
def is_common_password(self, password: str) -> bool:
"""Check against list of common passwords"""
common_passwords = [
'password', '123456', '12345678', 'qwerty',
'abc123', 'monkey', '1234567', 'letmein'
]
return password.lower() in common_passwords
Key Points:
- Use bcrypt, scrypt, or Argon2 for password hashing (NOT MD5/SHA1)
- Minimum 12 characters
- Require uppercase, lowercase, digits, special characters
- Check against common password lists
- Implement account lockout after N failed attempts
- Use rate limiting on login endpoints
2. Multi-Factor Authentication (MFA)
TOTP (Time-based One-Time Password) Implementation:
Click to view code (python)
import pyotp
import qrcode
from io import BytesIO
class MFAService:
def generate_secret(self, user_email: str) -> dict:
"""Generate MFA secret for user"""
secret = pyotp.random_base32()
# Generate QR code
totp = pyotp.TOTP(secret)
provisioning_uri = totp.provisioning_uri(
name=user_email,
issuer_name="MyApp"
)
# Create QR code
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(provisioning_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
return {
'secret': secret,
'qr_code': img,
'provisioning_uri': provisioning_uri
}
def verify_totp(self, secret: str, token: str) -> bool:
"""Verify TOTP token"""
totp = pyotp.TOTP(secret)
# Allow 1 time step before/after (30 seconds window)
return totp.verify(token, valid_window=1)
def generate_backup_codes(self, count: int = 10) -> list[str]:
"""Generate backup codes for MFA recovery"""
codes = []
for _ in range(count):
code = secrets.token_hex(4).upper() # 8 character code
codes.append(code)
return codes
# Usage
mfa = MFAService()
# Setup MFA for user
mfa_data = mfa.generate_secret("user@example.com")
print(f"Secret: {mfa_data['secret']}")
print(f"Scan QR code with authenticator app")
# Verify user's token
is_valid = mfa.verify_totp(mfa_data['secret'], "123456")
# Generate backup codes
backup_codes = mfa.generate_backup_codes()
print(f"Backup codes: {backup_codes}")
MFA Best Practices:
- Require MFA for admin accounts
- Offer multiple MFA methods (TOTP, SMS, email, hardware keys)
- Provide backup codes for account recovery
- Don't allow MFA bypass without proper verification
- Log all MFA events
3. Token-Based Authentication
JWT (JSON Web Token) Implementation
Click to view code (python)
import jwt
from datetime import datetime, timedelta
from typing import Optional
class JWTService:
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.algorithm = 'HS256'
self.access_token_expire = timedelta(minutes=15)
self.refresh_token_expire = timedelta(days=7)
def create_access_token(
self,
user_id: str,
additional_claims: dict = None
) -> str:
"""Create short-lived access token"""
expire = datetime.utcnow() + self.access_token_expire
payload = {
'sub': user_id,
'exp': expire,
'iat': datetime.utcnow(),
'type': 'access'
}
if additional_claims:
payload.update(additional_claims)
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def create_refresh_token(self, user_id: str) -> str:
"""Create long-lived refresh token"""
expire = datetime.utcnow() + self.refresh_token_expire
payload = {
'sub': user_id,
'exp': expire,
'iat': datetime.utcnow(),
'type': 'refresh',
'jti': secrets.token_hex(16) # Token ID for revocation
}
return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
def verify_token(self, token: str) -> Optional[dict]:
"""Verify and decode token"""
try:
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
return payload
except jwt.ExpiredSignatureError:
raise Exception("Token expired")
except jwt.InvalidTokenError:
raise Exception("Invalid token")
def refresh_access_token(self, refresh_token: str) -> str:
"""Generate new access token from refresh token"""
payload = self.verify_token(refresh_token)
if payload.get('type') != 'refresh':
raise Exception("Invalid token type")
# Check if refresh token is blacklisted
if self.is_token_blacklisted(payload.get('jti')):
raise Exception("Token revoked")
return self.create_access_token(payload['sub'])
def is_token_blacklisted(self, token_id: str) -> bool:
"""Check if token is blacklisted (implement with Redis)"""
# Implementation with Redis
return False
# Usage
jwt_service = JWTService(secret_key="your-secret-key-min-32-chars")
# Login
access_token = jwt_service.create_access_token(
user_id="user123",
additional_claims={'role': 'admin'}
)
refresh_token = jwt_service.create_refresh_token("user123")
# Verify token
payload = jwt_service.verify_token(access_token)
print(f"User: {payload['sub']}, Role: {payload['role']}")
# Refresh token
new_access_token = jwt_service.refresh_access_token(refresh_token)
JWT Best Practices:
- Use strong secret keys (min 256 bits)
- Use RS256 for microservices (asymmetric keys)
- Short expiration for access tokens (15 min)
- Implement token refresh mechanism
- Store refresh tokens securely (HttpOnly cookies)
- Implement token blacklist for logout
- Include minimal claims (don't store sensitive data)
- Validate all claims (exp, iat, iss, aud)
OAuth 2.0 / OpenID Connect
Click to view code (python)
from authlib.integrations.flask_client import OAuth
from flask import Flask, redirect, url_for, session
app = Flask(__name__)
app.secret_key = 'secret-key'
oauth = OAuth(app)
# Configure OAuth provider (Google example)
google = oauth.register(
name='google',
client_id='YOUR_GOOGLE_CLIENT_ID',
client_secret='YOUR_GOOGLE_CLIENT_SECRET',
server_metadata_url='https://accounts.google.com/.well-known/openid-configuration',
client_kwargs={
'scope': 'openid email profile'
}
)
@app.route('/login')
def login():
"""Redirect to OAuth provider"""
redirect_uri = url_for('authorize', _external=True)
return google.authorize_redirect(redirect_uri)
@app.route('/authorize')
def authorize():
"""OAuth callback"""
token = google.authorize_access_token()
user_info = google.parse_id_token(token)
# Store user info in session
session['user'] = {
'id': user_info['sub'],
'email': user_info['email'],
'name': user_info['name']
}
return redirect('/dashboard')
@app.route('/logout')
def logout():
"""Logout user"""
session.pop('user', None)
return redirect('/')
OAuth 2.0 Best Practices:
- Use authorization code flow (not implicit flow)
- Implement PKCE (Proof Key for Code Exchange)
- Validate redirect URIs strictly
- Use state parameter to prevent CSRF
- Store tokens securely
- Implement token refresh
- Use HTTPS only
- Validate ID tokens properly
Authorization (Access Control)
Role-Based Access Control (RBAC)
Click to view code (python)
from enum import Enum
from typing import Set
class Role(Enum):
ADMIN = "admin"
USER = "user"
MODERATOR = "moderator"
GUEST = "guest"
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class RBACService:
def __init__(self):
# Define role permissions
self.role_permissions = {
Role.ADMIN: {
Permission.READ,
Permission.WRITE,
Permission.DELETE,
Permission.ADMIN
},
Role.MODERATOR: {
Permission.READ,
Permission.WRITE,
Permission.DELETE
},
Role.USER: {
Permission.READ,
Permission.WRITE
},
Role.GUEST: {
Permission.READ
}
}
def has_permission(
self,
user_role: Role,
required_permission: Permission
) -> bool:
"""Check if role has permission"""
permissions = self.role_permissions.get(user_role, set())
return required_permission in permissions
def has_any_permission(
self,
user_role: Role,
required_permissions: Set[Permission]
) -> bool:
"""Check if role has any of the required permissions"""
role_permissions = self.role_permissions.get(user_role, set())
return bool(role_permissions & required_permissions)
# Decorator for permission checking
from functools import wraps
from flask import request, jsonify
def require_permission(permission: Permission):
def decorator(f):
@wraps(f)
def decorated_function(*args, **kwargs):
# Get user from token/session
user_role = get_user_role_from_token()
rbac = RBACService()
if not rbac.has_permission(user_role, permission):
return jsonify({'error': 'Forbidden'}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
# Usage
@app.route('/api/users/<user_id>', methods=['DELETE'])
@require_permission(Permission.DELETE)
def delete_user(user_id):
# Only roles with DELETE permission can access
db.delete_user(user_id)
return jsonify({'status': 'deleted'})
Attribute-Based Access Control (ABAC)
Click to view code (python)
from typing import Dict, Any
class ABACService:
def evaluate_policy(
self,
user_attributes: Dict[str, Any],
resource_attributes: Dict[str, Any],
action: str,
environment: Dict[str, Any]
) -> bool:
"""Evaluate access based on attributes"""
# Rule 1: Admin can do anything
if user_attributes.get('role') == 'admin':
return True
# Rule 2: Users can read their own data
if action == 'read' and user_attributes['id'] == resource_attributes.get('owner_id'):
return True
# Rule 3: Users can write during business hours
if action == 'write':
if user_attributes.get('role') == 'user':
hour = environment.get('hour', 0)
if 9 <= hour <= 17: # Business hours
return True
# Rule 4: Department-based access
if user_attributes.get('department') == resource_attributes.get('department'):
if action in ['read', 'write']:
return True
return False
# Usage
abac = ABACService()
user = {'id': 'user123', 'role': 'user', 'department': 'engineering'}
resource = {'id': 'doc456', 'owner_id': 'user123', 'department': 'engineering'}
environment = {'hour': 14, 'ip': '192.168.1.1'}
can_write = abac.evaluate_policy(user, resource, 'write', environment)
Authorization Best Practices:
- Implement least privilege principle
- Use RBAC for simple scenarios
- Use ABAC for complex policies
- Centralize authorization logic
- Log all authorization decisions
- Regularly audit permissions
- Implement deny by default
API Security Best Practices
1. Input Validation & Sanitization
Click to view code (python)
from typing import Any
import re
from html import escape
class InputValidator:
@staticmethod
def validate_email(email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
@staticmethod
def validate_phone(phone: str) -> bool:
"""Validate phone number"""
pattern = r'^\+?1?\d{9,15}$'
return bool(re.match(pattern, phone))
@staticmethod
def sanitize_string(text: str) -> str:
"""Sanitize string for XSS prevention"""
# Remove HTML tags
text = re.sub(r'<[^>]*>', '', text)
# Escape special characters
text = escape(text)
return text.strip()
@staticmethod
def validate_integer(value: Any, min_val: int = None, max_val: int = None) -> bool:
"""Validate integer with range"""
try:
val = int(value)
if min_val is not None and val < min_val:
return False
if max_val is not None and val > max_val:
return False
return True
except (ValueError, TypeError):
return False
@staticmethod
def validate_uuid(uuid_string: str) -> bool:
"""Validate UUID format"""
pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(pattern, uuid_string.lower()))
# SQL Injection Prevention
from sqlalchemy import text
class SafeDatabase:
def get_user(self, user_id: str):
"""SAFE: Use parameterized queries"""
query = text("SELECT * FROM users WHERE id = :user_id")
result = db.execute(query, {'user_id': user_id})
return result.fetchone()
def search_users_unsafe(self, search: str):
"""UNSAFE: String concatenation"""
# NEVER DO THIS
query = f"SELECT * FROM users WHERE name = '{search}'"
# Attacker can inject: ' OR '1'='1
def search_users_safe(self, search: str):
"""SAFE: Parameterized query"""
query = text("SELECT * FROM users WHERE name = :search")
result = db.execute(query, {'search': search})
return result.fetchall()
# NoSQL Injection Prevention
class MongoDBSafe:
def get_user(self, username: str):
"""SAFE: Validate input"""
if not isinstance(username, str):
raise ValueError("Username must be string")
# Use exact match, not query operators
return db.users.find_one({'username': username})
def get_user_unsafe(self, username_data):
"""UNSAFE: Accepting dict allows injection"""
# NEVER DO THIS
# Attacker can send: {'$gt': ''}
return db.users.find_one({'username': username_data})
Input Validation Best Practices:
- Validate all inputs (type, format, range, length)
- Use allowlist validation (not blocklist)
- Sanitize output to prevent XSS
- Use parameterized queries (prevent SQL injection)
- Validate JSON schema
- Reject unexpected fields
- Use strong typing (TypeScript, Pydantic)
2. Rate Limiting & Throttling
Click to view code (python)
from datetime import datetime, timedelta
from collections import defaultdict
import time
class RateLimiter:
def __init__(self):
self.requests = defaultdict(list)
self.limits = {
'default': (100, 60), # 100 requests per minute
'login': (5, 300), # 5 requests per 5 minutes
'api': (1000, 3600), # 1000 requests per hour
'premium': (10000, 3600) # 10000 requests per hour
}
def is_allowed(
self,
client_id: str,
endpoint_type: str = 'default'
) -> tuple[bool, dict]:
"""Check if request is allowed"""
max_requests, window_seconds = self.limits.get(
endpoint_type,
self.limits['default']
)
now = time.time()
window_start = now - window_seconds
# Get client's recent requests
client_requests = self.requests[client_id]
# Remove old requests outside window
client_requests[:] = [
req_time for req_time in client_requests
if req_time > window_start
]
# Check if limit exceeded
if len(client_requests) >= max_requests:
retry_after = int(client_requests[0] + window_seconds - now)
return False, {
'retry_after': retry_after,
'limit': max_requests,
'remaining': 0
}
# Add current request
client_requests.append(now)
return True, {
'limit': max_requests,
'remaining': max_requests - len(client_requests),
'reset': int(now + window_seconds)
}
# Redis-based Rate Limiter (Production)
import redis
class RedisRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def is_allowed(
self,
key: str,
max_requests: int = 100,
window_seconds: int = 60
) -> bool:
"""Rate limit using Redis"""
now = time.time()
window_key = f"ratelimit:{key}:{int(now / window_seconds)}"
# Increment counter
count = self.redis.incr(window_key)
# Set expiration on first request
if count == 1:
self.redis.expire(window_key, window_seconds)
return count <= max_requests
# Flask middleware
from flask import Flask, request, jsonify
app = Flask(__name__)
rate_limiter = RateLimiter()
@app.before_request
def rate_limit_middleware():
"""Apply rate limiting to all requests"""
client_id = request.headers.get('X-API-Key') or request.remote_addr
endpoint_type = 'api'
# Special rate limit for login
if request.path == '/api/login':
endpoint_type = 'login'
allowed, info = rate_limiter.is_allowed(client_id, endpoint_type)
# Add rate limit headers
response_headers = {
'X-RateLimit-Limit': str(info['limit']),
'X-RateLimit-Remaining': str(info['remaining'])
}
if 'reset' in info:
response_headers['X-RateLimit-Reset'] = str(info['reset'])
if not allowed:
response_headers['Retry-After'] = str(info['retry_after'])
return jsonify({
'error': 'Rate limit exceeded',
'retry_after': info['retry_after']
}), 429, response_headers
# Store headers for response
request.rate_limit_headers = response_headers
@app.after_request
def add_rate_limit_headers(response):
"""Add rate limit headers to response"""
if hasattr(request, 'rate_limit_headers'):
for key, value in request.rate_limit_headers.items():
response.headers[key] = value
return response
Rate Limiting Best Practices:
- Implement per-user/IP rate limiting
- Different limits for different endpoints
- Use Redis for distributed rate limiting
- Return proper headers (X-RateLimit-*)
- Return 429 status code
- Implement exponential backoff
- Monitor rate limit metrics
3. API Authentication & API Keys
Click to view code (python)
import secrets
import hashlib
from datetime import datetime
class APIKeyService:
def generate_api_key(self, prefix: str = "sk") -> tuple[str, str]:
"""Generate API key and its hash"""
# Generate random key
random_part = secrets.token_urlsafe(32)
api_key = f"{prefix}_{random_part}"
# Hash for storage
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
return api_key, key_hash
def verify_api_key(self, api_key: str, stored_hash: str) -> bool:
"""Verify API key against stored hash"""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
return secrets.compare_digest(key_hash, stored_hash)
def create_api_key_record(self, user_id: str, name: str) -> dict:
"""Create API key record for database"""
api_key, key_hash = self.generate_api_key()
record = {
'user_id': user_id,
'name': name,
'key_hash': key_hash,
'created_at': datetime.utcnow(),
'last_used_at': None,
'is_active': True,
'rate_limit': 1000, # Requests per hour
'scopes': ['read', 'write'] # Permissions
}
# Return key only once (user must save it)
return {
'api_key': api_key, # Show only once
'record': record
}
# Middleware for API key authentication
from flask import request, jsonify, g
def require_api_key(f):
"""Decorator to require API key"""
@wraps(f)
def decorated_function(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
# Validate format
if not api_key.startswith('sk_'):
return jsonify({'error': 'Invalid API key format'}), 401
# Verify against database
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
key_record = db.get_api_key(key_hash)
if not key_record:
return jsonify({'error': 'Invalid API key'}), 401
if not key_record['is_active']:
return jsonify({'error': 'API key revoked'}), 401
# Update last used
db.update_api_key_last_used(key_hash)
# Store user info for request
g.user_id = key_record['user_id']
g.api_key_scopes = key_record['scopes']
return f(*args, **kwargs)
return decorated_function
# Usage
@app.route('/api/data')
@require_api_key
def get_data():
user_id = g.user_id
return jsonify({'data': 'sensitive data', 'user': user_id})
API Key Best Practices:
- Use cryptographically secure random generation
- Store only hashed keys (like passwords)
- Use prefixes (sk, pk) for identification
- Implement key rotation
- Allow users to revoke keys
- Track key usage (last used, request count)
- Implement key scopes/permissions
- Set expiration dates
- Use HTTPS only
4. CORS (Cross-Origin Resource Sharing)
Click to view code (python)
from flask import Flask
from flask_cors import CORS
app = Flask(__name__)
# Restrictive CORS (Recommended for Production)
cors_config = {
'origins': [
'https://app.example.com',
'https://admin.example.com'
],
'methods': ['GET', 'POST', 'PUT', 'DELETE'],
'allow_headers': ['Content-Type', 'Authorization'],
'expose_headers': ['X-Total-Count'],
'supports_credentials': True,
'max_age': 3600
}
CORS(app, resources={r"/api/*": cors_config})
# Custom CORS middleware
@app.after_request
def add_cors_headers(response):
"""Add CORS headers manually"""
origin = request.headers.get('Origin')
# Allowlist of origins
allowed_origins = [
'https://app.example.com',
'https://admin.example.com'
]
if origin in allowed_origins:
response.headers['Access-Control-Allow-Origin'] = origin
response.headers['Access-Control-Allow-Credentials'] = 'true'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type, Authorization'
response.headers['Access-Control-Max-Age'] = '3600'
return response
CORS Best Practices:
- Never use wildcard (*) in production
- Allowlist specific origins
- Validate Origin header
- Don't reflect Origin without validation
- Use credentials only when necessary
- Set appropriate cache time (max-age)
- Restrict methods and headers
5. HTTPS & TLS
Click to view code (python)
# Force HTTPS in Flask
from flask import Flask, redirect, request
app = Flask(__name__)
@app.before_request
def force_https():
"""Redirect HTTP to HTTPS"""
if not request.is_secure and app.config.get('ENV') == 'production':
url = request.url.replace('http://', 'https://', 1)
return redirect(url, code=301)
# Security headers
@app.after_request
def add_security_headers(response):
"""Add security headers to all responses"""
response.headers['Strict-Transport-Security'] = 'max-age=31536000; includeSubDomains'
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
response.headers['X-XSS-Protection'] = '1; mode=block'
response.headers['Content-Security-Policy'] = "default-src 'self'"
response.headers['Referrer-Policy'] = 'strict-origin-when-cross-origin'
response.headers['Permissions-Policy'] = 'geolocation=(), microphone=(), camera=()'
return response
HTTPS Best Practices:
- Use TLS 1.3 (minimum TLS 1.2)
- Strong cipher suites only
- HSTS header (Strict-Transport-Security)
- Valid SSL certificates
- Certificate pinning for mobile apps
- Regular certificate renewal
- Disable older protocols (SSLv3, TLS 1.0, 1.1)
6. Request Signing & HMAC
Click to view code (python)
import hmac
import hashlib
import time
class RequestSigner:
def __init__(self, secret_key: str):
self.secret_key = secret_key.encode()
def sign_request(
self,
method: str,
path: str,
body: str = '',
timestamp: int = None
) -> str:
"""Generate HMAC signature for request"""
if timestamp is None:
timestamp = int(time.time())
# Create signature payload
message = f"{method}\n{path}\n{timestamp}\n{body}"
# Generate HMAC
signature = hmac.new(
self.secret_key,
message.encode(),
hashlib.sha256
).hexdigest()
return signature
def verify_request(
self,
method: str,
path: str,
body: str,
signature: str,
timestamp: int,
max_age: int = 300 # 5 minutes
) -> bool:
"""Verify request signature"""
# Check timestamp (prevent replay attacks)
now = int(time.time())
if abs(now - timestamp) > max_age:
return False
# Compute expected signature
expected_signature = self.sign_request(method, path, body, timestamp)
# Compare signatures (constant time)
return hmac.compare_digest(signature, expected_signature)
# Flask middleware
@app.before_request
def verify_signature():
"""Verify request signature"""
signature = request.headers.get('X-Signature')
timestamp = request.headers.get('X-Timestamp')
if not signature or not timestamp:
return jsonify({'error': 'Missing signature'}), 401
try:
timestamp = int(timestamp)
except ValueError:
return jsonify({'error': 'Invalid timestamp'}), 401
signer = RequestSigner(app.config['SECRET_KEY'])
body = request.get_data(as_text=True)
is_valid = signer.verify_request(
request.method,
request.path,
body,
signature,
timestamp
)
if not is_valid:
return jsonify({'error': 'Invalid signature'}), 401
Request Signing Best Practices:
- Use HMAC-SHA256 (minimum)
- Include timestamp to prevent replay
- Sign method, path, and body
- Use constant-time comparison
- Rotate secret keys regularly
- Implement nonce for additional security
Data Security
1. Encryption at Rest
Click to view code (python)
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
import base64
class DataEncryption:
def __init__(self, password: str, salt: bytes = None):
"""Initialize encryption with password"""
if salt is None:
salt = os.urandom(16)
self.salt = salt
# Derive key from password
kdf = PBKDF2(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
self.cipher = Fernet(key)
def encrypt(self, data: str) -> str:
"""Encrypt data"""
encrypted = self.cipher.encrypt(data.encode())
return base64.urlsafe_b64encode(encrypted).decode()
def decrypt(self, encrypted_data: str) -> str:
"""Decrypt data"""
encrypted = base64.urlsafe_b64decode(encrypted_data)
decrypted = self.cipher.decrypt(encrypted)
return decrypted.decode()
# Field-level encryption for database
class User:
def __init__(self, email: str, ssn: str):
self.email = email
self.ssn_encrypted = self.encrypt_ssn(ssn)
@staticmethod
def encrypt_ssn(ssn: str) -> str:
"""Encrypt sensitive field"""
cipher = DataEncryption(os.environ['ENCRYPTION_KEY'])
return cipher.encrypt(ssn)
def get_ssn(self) -> str:
"""Decrypt sensitive field"""
cipher = DataEncryption(os.environ['ENCRYPTION_KEY'])
return cipher.decrypt(self.ssn_encrypted)
# Database encryption example
# Use transparent data encryption (TDE) for database
# PostgreSQL: Enable pgcrypto extension
# MySQL: Enable InnoDB encryption
# MongoDB: Enable encryption at rest
Encryption at Rest Best Practices:
- Encrypt sensitive data (PII, payment info, passwords)
- Use AES-256 for encryption
- Store encryption keys separately (KMS)
- Rotate encryption keys regularly
- Use database-level encryption (TDE)
- Encrypt backups
- Secure key management (AWS KMS, Azure Key Vault)
2. Encryption in Transit
Click to view code (python)
# Use TLS for all connections
import requests
# GOOD: Verify SSL certificates
response = requests.get('https://api.example.com', verify=True)
# BAD: Don't verify (NEVER DO THIS)
response = requests.get('https://api.example.com', verify=False)
# Database connections with TLS
import psycopg2
conn = psycopg2.connect(
host="db.example.com",
database="mydb",
user="user",
password="pass",
sslmode="require", # Require TLS
sslrootcert="/path/to/ca.crt"
)
# Redis with TLS
import redis
r = redis.Redis(
host='redis.example.com',
port=6380,
ssl=True,
ssl_cert_reqs='required',
ssl_ca_certs='/path/to/ca.crt'
)
3. Data Masking & Tokenization
Click to view code (python)
import re
class DataMasker:
@staticmethod
def mask_email(email: str) -> str:
"""Mask email address"""
username, domain = email.split('@')
if len(username) <= 2:
masked_username = '*' * len(username)
else:
masked_username = username[0] + '*' * (len(username) - 2) + username[-1]
return f"{masked_username}@{domain}"
@staticmethod
def mask_phone(phone: str) -> str:
"""Mask phone number"""
return re.sub(r'\d(?=\d{4})', '*', phone)
@staticmethod
def mask_credit_card(card: str) -> str:
"""Mask credit card (show last 4 digits)"""
return '*' * (len(card) - 4) + card[-4:]
@staticmethod
def mask_ssn(ssn: str) -> str:
"""Mask SSN (show last 4 digits)"""
return '***-**-' + ssn[-4:]
# Usage
masker = DataMasker()
print(masker.mask_email("john.doe@example.com")) # j******e@example.com
print(masker.mask_phone("555-123-4567")) # ***-***-4567
print(masker.mask_credit_card("4532123456789012")) # ************9012
print(masker.mask_ssn("123-45-6789")) # ***-**-6789
Network Security
1. DDoS Protection
Click to view code (python)
# Implement connection limits
from flask import Flask, request
from collections import defaultdict
import time
class DDoSProtection:
def __init__(self):
self.connections = defaultdict(list)
self.max_connections = 10 # Per IP
self.window = 60 # seconds
def is_allowed(self, ip: str) -> bool:
"""Check if IP is allowed to connect"""
now = time.time()
window_start = now - self.window
# Clean old connections
self.connections[ip] = [
t for t in self.connections[ip]
if t > window_start
]
# Check limit
if len(self.connections[ip]) >= self.max_connections:
return False
self.connections[ip].append(now)
return True
# Use CDN (Cloudflare, AWS CloudFront) for DDoS protection
# Configure at infrastructure level
DDoS Protection Best Practices:
- Use CDN with DDoS protection
- Implement rate limiting
- Use AWS Shield / Cloudflare
- Monitor traffic patterns
- Implement CAPTCHA for suspicious traffic
- Use geo-blocking if applicable
- Set connection limits
- Use load balancers
2. Firewall Rules
Click to view code (python)
# Configure security groups (AWS example)
# Allow only necessary ports
security_group_rules = [
{
'IpProtocol': 'tcp',
'FromPort': 443,
'ToPort': 443,
'IpRanges': [{'CidrIp': '0.0.0.0/0'}] # HTTPS
},
{
'IpProtocol': 'tcp',
'FromPort': 22,
'ToPort': 22,
'IpRanges': [{'CidrIp': '10.0.0.0/8'}] # SSH from private network only
},
{
'IpProtocol': 'tcp',
'FromPort': 5432,
'ToPort': 5432,
'IpRanges': [{'CidrIp': '10.0.1.0/24'}] # PostgreSQL from app servers only
}
]
Application Security
1. XSS (Cross-Site Scripting) Prevention
Click to view code (python)
from html import escape
from markupsafe import Markup
class XSSPrevention:
@staticmethod
def sanitize_html(text: str) -> str:
"""Escape HTML to prevent XSS"""
return escape(text)
@staticmethod
def sanitize_for_javascript(text: str) -> str:
"""Escape for JavaScript context"""
# Escape dangerous characters
replacements = {
'<': '\\u003C',
'>': '\\u003E',
'&': '\\u0026',
'"': '\\u0022',
"'": '\\u0027',
'/': '\\u002F'
}
for char, replacement in replacements.items():
text = text.replace(char, replacement)
return text
# Use templating engines that auto-escape
# Jinja2 (Flask)
from flask import render_template
@app.route('/profile')
def profile():
user_input = request.args.get('name', '')
# Jinja2 automatically escapes {{ name }}
return render_template('profile.html', name=user_input)
# Content Security Policy
@app.after_request
def set_csp(response):
response.headers['Content-Security-Policy'] = (
"default-src 'self'; "
"script-src 'self' 'unsafe-inline' https://trusted-cdn.com; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data: https:; "
"font-src 'self' data:; "
"connect-src 'self' https://api.example.com; "
"frame-ancestors 'none'; "
"base-uri 'self'; "
"form-action 'self';"
)
return response
2. CSRF (Cross-Site Request Forgery) Prevention
Click to view code (python)
from flask_wtf.csrf import CSRFProtect
import secrets
csrf = CSRFProtect(app)
# Generate CSRF token
def generate_csrf_token():
if '_csrf_token' not in session:
session['_csrf_token'] = secrets.token_hex(32)
return session['_csrf_token']
# Verify CSRF token
def verify_csrf_token(token):
return token == session.get('_csrf_token')
# Include in forms
# <form method="POST">
# <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
# </form>
# For AJAX requests
# Add X-CSRF-Token header
3. Secure Session Management
Click to view code (python)
from flask import Flask, session
from datetime import timedelta
app = Flask(__name__)
# Secure session configuration
app.config.update(
SECRET_KEY=os.environ['SECRET_KEY'], # Strong random key
SESSION_COOKIE_SECURE=True, # HTTPS only
SESSION_COOKIE_HTTPONLY=True, # No JavaScript access
SESSION_COOKIE_SAMESITE='Lax', # CSRF protection
PERMANENT_SESSION_LIFETIME=timedelta(minutes=30), # Session timeout
SESSION_REFRESH_EACH_REQUEST=True # Extend on activity
)
@app.route('/login', methods=['POST'])
def login():
# After successful authentication
session.permanent = True
session['user_id'] = user.id
session['login_time'] = time.time()
# Regenerate session ID
session.modified = True
@app.route('/logout')
def logout():
session.clear()
return redirect('/login')
# Session validation middleware
@app.before_request
def validate_session():
if 'user_id' in session:
# Check session age
login_time = session.get('login_time', 0)
if time.time() - login_time > 3600: # 1 hour max
session.clear()
return redirect('/login')
Infrastructure Security
1. Container Security
Click to view code (yaml)
# Dockerfile best practices
FROM python:3.11-slim AS base
# Don't run as root
RUN useradd -m -u 1000 appuser
# Copy only necessary files
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY --chown=appuser:appuser . /app
WORKDIR /app
# Switch to non-root user
USER appuser
# Use specific versions
# Don't use :latest
# Scan for vulnerabilities
# docker scan myimage:tag
# Limit resources
# docker run --memory=512m --cpus=0.5 myimage
2. Secrets Management
Click to view code (python)
import os
from azure.keyvault.secrets import SecretClient
from azure.identity import DefaultAzureCredential
class SecretsManager:
def __init__(self):
# Use environment variable for vault URL
vault_url = os.environ['KEY_VAULT_URL']
credential = DefaultAzureCredential()
self.client = SecretClient(vault_url=vault_url, credential=credential)
def get_secret(self, name: str) -> str:
"""Retrieve secret from Key Vault"""
secret = self.client.get_secret(name)
return secret.value
def set_secret(self, name: str, value: str):
"""Store secret in Key Vault"""
self.client.set_secret(name, value)
# Usage
secrets = SecretsManager()
db_password = secrets.get_secret('database-password')
api_key = secrets.get_secret('external-api-key')
# NEVER hardcode secrets
# BAD
db_password = "hardcoded_password"
# GOOD
db_password = os.environ['DB_PASSWORD']
3. Logging & Monitoring
Click to view code (python)
import logging
import json
from datetime import datetime
# Security event logging
class SecurityLogger:
def __init__(self):
self.logger = logging.getLogger('security')
handler = logging.FileHandler('security.log')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_authentication(
self,
user_id: str,
success: bool,
ip: str,
user_agent: str
):
"""Log authentication attempt"""
event = {
'event_type': 'authentication',
'user_id': user_id,
'success': success,
'ip': ip,
'user_agent': user_agent,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(event))
def log_authorization(
self,
user_id: str,
resource: str,
action: str,
granted: bool
):
"""Log authorization decision"""
event = {
'event_type': 'authorization',
'user_id': user_id,
'resource': resource,
'action': action,
'granted': granted,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.info(json.dumps(event))
def log_suspicious_activity(
self,
user_id: str,
activity: str,
details: dict
):
"""Log suspicious activity"""
event = {
'event_type': 'suspicious_activity',
'user_id': user_id,
'activity': activity,
'details': details,
'timestamp': datetime.utcnow().isoformat()
}
self.logger.warning(json.dumps(event))
# Usage
security_logger = SecurityLogger()
@app.route('/login', methods=['POST'])
def login():
user = authenticate(request.form['email'], request.form['password'])
security_logger.log_authentication(
user_id=request.form['email'],
success=user is not None,
ip=request.remote_addr,
user_agent=request.headers.get('User-Agent')
)
if not user:
return jsonify({'error': 'Invalid credentials'}), 401
return jsonify({'token': create_token(user)})
Logging Best Practices:
- Log all authentication events
- Log authorization failures
- Log sensitive data access
- Don't log sensitive data (passwords, tokens)
- Use structured logging (JSON)
- Implement log aggregation (ELK, Splunk)
- Set up alerts for suspicious patterns
- Retain logs for compliance
Compliance & Privacy
1. GDPR Compliance
Click to view code (python)
class GDPRCompliance:
def anonymize_user(self, user_id: str):
"""Anonymize user data (GDPR Right to be Forgotten)"""
user = db.get_user(user_id)
# Replace with anonymous data
user.email = f"deleted_{user_id}@example.com"
user.name = "Deleted User"
user.phone = None
user.address = None
user.is_deleted = True
db.save(user)
def export_user_data(self, user_id: str) -> dict:
"""Export all user data (GDPR Right to Data Portability)"""
user = db.get_user(user_id)
posts = db.get_user_posts(user_id)
comments = db.get_user_comments(user_id)
return {
'personal_info': {
'email': user.email,
'name': user.name,
'phone': user.phone,
'address': user.address
},
'posts': [post.to_dict() for post in posts],
'comments': [comment.to_dict() for comment in comments],
'created_at': user.created_at.isoformat()
}
def get_consent(self, user_id: str, purpose: str) -> bool:
"""Check user consent for data processing"""
consent = db.get_consent(user_id, purpose)
return consent and consent.is_active
def record_consent(
self,
user_id: str,
purpose: str,
granted: bool
):
"""Record user consent"""
db.create_consent_record({
'user_id': user_id,
'purpose': purpose,
'granted': granted,
'timestamp': datetime.utcnow()
})
2. PCI DSS Compliance (Payment Card Industry)
Click to view code (python)
class PCICompliance:
def tokenize_card(self, card_number: str) -> str:
"""Tokenize credit card (never store full card number)"""
# Use payment processor's tokenization (Stripe, PayPal)
token = payment_processor.tokenize(card_number)
return token
def validate_card(self, card_number: str) -> bool:
"""Validate card using Luhn algorithm"""
def luhn_checksum(card_number):
def digits_of(n):
return [int(d) for d in str(n)]
digits = digits_of(card_number)
odd_digits = digits[-1::-2]
even_digits = digits[-2::-2]
checksum = sum(odd_digits)
for d in even_digits:
checksum += sum(digits_of(d * 2))
return checksum % 10
return luhn_checksum(card_number) == 0
Interview Questions & Answers
Q1: How would you implement authentication and authorization for a microservices architecture with 50+ services?
Answer:
Challenge:
- 50+ microservices
- Each service needs to authenticate users
- Need centralized user management
- Services need to authorize actions
- Must be scalable and secure
Solution: OAuth 2.0 + JWT + API Gateway Pattern
Architecture:
Click to view code
Client → API Gateway → Auth Service (OAuth 2.0)
↓
JWT Token
↓
Microservices (JWT validation)
Implementation:
Click to view code (python)
# 1. Centralized Auth Service
from authlib.integrations.flask_oauth2 import AuthorizationServer
from authlib.oauth2.rfc6749 import grants
class AuthService:
def __init__(self):
self.server = AuthorizationServer()
self.jwt_service = JWTService(secret_key=os.environ['JWT_SECRET'])
def authenticate(self, email: str, password: str) -> dict:
"""Authenticate user and return tokens"""
user = db.get_user_by_email(email)
if not user or not bcrypt.checkpw(password, user.password_hash):
raise Exception("Invalid credentials")
# Generate tokens
access_token = self.jwt_service.create_access_token(
user_id=user.id,
additional_claims={
'email': user.email,
'role': user.role,
'permissions': user.permissions
}
)
refresh_token = self.jwt_service.create_refresh_token(user.id)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
'expires_in': 900 # 15 minutes
}
# 2. API Gateway - Token Validation
from fastapi import FastAPI, HTTPException, Depends, Header
import httpx
app = FastAPI()
async def verify_token(authorization: str = Header(None)) -> dict:
"""Verify JWT token from Authorization header"""
if not authorization or not authorization.startswith('Bearer '):
raise HTTPException(status_code=401, detail="Missing token")
token = authorization.split(' ')[1]
try:
# Verify token
jwt_service = JWTService(secret_key=os.environ['JWT_SECRET'])
payload = jwt_service.verify_token(token)
# Check if token is blacklisted
if await redis_client.exists(f"blacklist:{token}"):
raise HTTPException(status_code=401, detail="Token revoked")
return payload
except Exception as e:
raise HTTPException(status_code=401, detail=str(e))
# 3. Microservice - JWT validation
class UserService:
@app.get("/api/users/{user_id}")
async def get_user(
user_id: str,
token_payload: dict = Depends(verify_token)
):
"""Protected endpoint"""
# Extract user info from token
requesting_user = token_payload['sub']
role = token_payload.get('role')
# Authorization check
if requesting_user != user_id and role != 'admin':
raise HTTPException(status_code=403, detail="Forbidden")
user = db.get_user(user_id)
return user
# 4. Service-to-Service Authentication (mTLS)
import ssl
import httpx
class ServiceClient:
def __init__(self):
# Load service certificates
self.ssl_context = ssl.create_default_context(
ssl.Purpose.CLIENT_AUTH
)
self.ssl_context.load_cert_chain(
certfile='service-cert.pem',
keyfile='service-key.pem'
)
async def call_service(self, url: str):
"""Make authenticated request to another service"""
async with httpx.AsyncClient(verify=self.ssl_context) as client:
response = await client.get(url)
return response.json()
Key Components:
- Centralized Auth Service:
- Single source of truth for authentication - Issues JWT tokens - Manages user credentials - Handles token refresh
- API Gateway:
- Single entry point - Validates JWT tokens - Routes requests to services - Rate limiting
- Microservices:
- Validate JWT tokens locally (no round-trip to auth service) - Extract user info from token - Implement authorization logic - Use mTLS for service-to-service
- Token Strategy:
- Access token: 15 min expiry - Refresh token: 7 days expiry - Store refresh tokens in database - Implement token blacklist for logout
Benefits:
- Scalable (no auth service bottleneck)
- Secure (JWT signature verification)
- Fast (no database lookup per request)
- Centralized user management
- Service-to-service security with mTLS
Q2: Design a rate limiting system that prevents API abuse while allowing legitimate burst traffic.
Answer:
Requirements:
- Prevent abuse (DDoS, scraping)
- Allow legitimate bursts (user uploads 10 photos)
- Fair for all users
- Distributed (multiple servers)
- Different limits per plan (free, premium, enterprise)
Solution: Token Bucket Algorithm with Redis
How Token Bucket Works:
Click to view code
Bucket capacity: 100 tokens
Refill rate: 10 tokens/second
User makes request:
- Check if bucket has ≥1 token
- If yes: Remove 1 token, allow request
- If no: Reject request (429 Too Many Requests)
Tokens refill over time up to capacity
Allows bursts up to bucket capacity
Implementation:
Click to view code (python)
import redis
import time
from typing import Tuple
class TokenBucketRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
# Rate limit tiers
self.rate_limits = {
'free': {
'capacity': 100, # Max burst
'refill_rate': 10, # Tokens per second
'cost': 1 # Cost per request
},
'premium': {
'capacity': 1000,
'refill_rate': 100,
'cost': 1
},
'enterprise': {
'capacity': 10000,
'refill_rate': 1000,
'cost': 1
}
}
def is_allowed(
self,
user_id: str,
tier: str = 'free',
cost: int = 1
) -> Tuple[bool, dict]:
"""
Check if request is allowed using token bucket algorithm
Returns: (allowed, info_dict)
"""
config = self.rate_limits[tier]
capacity = config['capacity']
refill_rate = config['refill_rate']
key = f"rate_limit:{tier}:{user_id}"
now = time.time()
# Lua script for atomic operation
lua_script = """
local key = KEYS[1]
local capacity = tonumber(ARGV[1])
local refill_rate = tonumber(ARGV[2])
local cost = tonumber(ARGV[3])
local now = tonumber(ARGV[4])
-- Get current bucket state
local bucket = redis.call('HMGET', key, 'tokens', 'last_refill')
local tokens = tonumber(bucket[1])
local last_refill = tonumber(bucket[2])
-- Initialize if doesn't exist
if tokens == nil then
tokens = capacity
last_refill = now
end
-- Calculate tokens to add
local time_passed = now - last_refill
local tokens_to_add = time_passed * refill_rate
tokens = math.min(capacity, tokens + tokens_to_add)
-- Check if enough tokens
if tokens >= cost then
tokens = tokens - cost
-- Update bucket
redis.call('HMSET', key, 'tokens', tokens, 'last_refill', now)
redis.call('EXPIRE', key, 3600) -- 1 hour expiry
return {1, tokens} -- Allowed
else
return {0, tokens} -- Denied
end
"""
# Execute Lua script
result = self.redis.eval(
lua_script,
1, # Number of keys
key,
capacity,
refill_rate,
cost,
now
)
allowed = bool(result[0])
remaining_tokens = result[1]
# Calculate retry after
retry_after = 0
if not allowed:
retry_after = int((cost - remaining_tokens) / refill_rate)
return allowed, {
'capacity': capacity,
'remaining': int(remaining_tokens),
'refill_rate': refill_rate,
'retry_after': retry_after
}
# Usage with Flask
from flask import Flask, request, jsonify, g
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
rate_limiter = TokenBucketRateLimiter(redis_client)
def get_user_tier(user_id: str) -> str:
"""Get user's subscription tier"""
user = db.get_user(user_id)
return user.subscription_tier if user else 'free'
@app.before_request
def rate_limit_middleware():
"""Apply rate limiting to all requests"""
# Extract user ID from token
user_id = extract_user_id_from_token(request.headers.get('Authorization'))
if not user_id:
user_id = request.remote_addr # Use IP for anonymous
# Get user tier
tier = get_user_tier(user_id)
# Check rate limit
allowed, info = rate_limiter.is_allowed(user_id, tier)
# Store info for response headers
g.rate_limit_info = info
if not allowed:
response = jsonify({
'error': 'Rate limit exceeded',
'retry_after': info['retry_after']
})
response.status_code = 429
response.headers['Retry-After'] = str(info['retry_after'])
response.headers['X-RateLimit-Limit'] = str(info['capacity'])
response.headers['X-RateLimit-Remaining'] = '0'
return response
@app.after_request
def add_rate_limit_headers(response):
"""Add rate limit headers"""
if hasattr(g, 'rate_limit_info'):
info = g.rate_limit_info
response.headers['X-RateLimit-Limit'] = str(info['capacity'])
response.headers['X-RateLimit-Remaining'] = str(info['remaining'])
response.headers['X-RateLimit-Reset'] = str(
int(time.time() + (info['capacity'] - info['remaining']) / info['refill_rate'])
)
return response
# Different costs for different endpoints
@app.route('/api/light-operation')
def light_operation():
# Normal cost (1 token)
return jsonify({'data': 'result'})
@app.route('/api/heavy-operation')
def heavy_operation():
# Higher cost for expensive operations
user_id = g.user_id
tier = get_user_tier(user_id)
# Check with higher cost
allowed, info = rate_limiter.is_allowed(user_id, tier, cost=10)
if not allowed:
return jsonify({'error': 'Rate limit exceeded'}), 429
# Perform expensive operation
result = expensive_computation()
return jsonify({'data': result})
Advanced Features:
Click to view code (python)
# 1. Sliding Window Log (more accurate)
class SlidingWindowRateLimiter:
def is_allowed(self, user_id: str, max_requests: int, window: int):
"""
Track individual request timestamps
More accurate but uses more memory
"""
key = f"sliding:{user_id}"
now = time.time()
window_start = now - window
# Remove old requests
self.redis.zremrangebyscore(key, 0, window_start)
# Count requests in window
count = self.redis.zcard(key)
if count < max_requests:
# Add current request
self.redis.zadd(key, {str(now): now})
self.redis.expire(key, window)
return True
return False
# 2. Distributed Rate Limiting with Multiple Nodes
class DistributedRateLimiter:
def __init__(self, redis_cluster):
self.redis = redis_cluster
# Use consistent hashing to distribute load
def is_allowed(self, user_id: str):
# Rate limit check happens on Redis
# Multiple API servers share same Redis cluster
# Atomic operations ensure accuracy
pass
# 3. Adaptive Rate Limiting
class AdaptiveRateLimiter:
def adjust_rate_limit(self, user_id: str):
"""Adjust rate limit based on behavior"""
# Increase limit for good users
# Decrease for suspicious behavior
user_score = self.calculate_trust_score(user_id)
if user_score > 0.8: # Trusted user
multiplier = 2.0
elif user_score < 0.3: # Suspicious
multiplier = 0.5
else:
multiplier = 1.0
return multiplier
Best Practices:
- Use token bucket for burst traffic
- Different limits per tier
- Higher costs for expensive operations
- Return proper headers (X-RateLimit-*)
- Return 429 with Retry-After
- Use Redis for distributed system
- Monitor rate limit hits
- Allow bursts for legitimate use
Q3: How would you secure API keys and prevent them from being compromised?
Answer:
Challenge:
- API keys can be leaked (GitHub, client-side code)
- Keys can be stolen (MITM, XSS)
- Need to detect compromised keys
- Need to limit blast radius
Solution: Multi-Layered API Key Security
1. Key Generation & Storage:
Click to view code (python)
import secrets
import hashlib
from datetime import datetime, timedelta
class SecureAPIKeyManager:
def generate_key(self, user_id: str, name: str) -> dict:
"""Generate API key with multiple components"""
# Key structure: prefix_publicpart_secretpart
prefix = "sk" # Indicates secret key
public_part = secrets.token_urlsafe(8) # Identifies key
secret_part = secrets.token_urlsafe(32) # Secret component
api_key = f"{prefix}_{public_part}_{secret_part}"
# Store only hash of secret part
secret_hash = hashlib.sha256(secret_part.encode()).hexdigest()
# Create key record
key_record = {
'id': public_part,
'user_id': user_id,
'name': name,
'secret_hash': secret_hash,
'created_at': datetime.utcnow(),
'last_used': None,
'is_active': True,
# Security features
'allowed_ips': [], # IP whitelist
'allowed_origins': [], # Origin whitelist
'scopes': [], # Permissions
'rate_limit': 1000, # Per hour
'expires_at': None, # Optional expiration
# Monitoring
'total_requests': 0,
'failed_attempts': 0,
'last_failure': None
}
db.save_key(key_record)
return {
'api_key': api_key, # Show only once
'key_id': public_part,
'message': 'Save this key securely. It cannot be recovered.'
}
def verify_key(self, api_key: str, request_info: dict) -> bool:
"""Verify API key with security checks"""
try:
# Parse key
parts = api_key.split('_')
if len(parts) != 3 or parts[0] != 'sk':
return False
prefix, public_part, secret_part = parts
# Get key record
key_record = db.get_key_by_id(public_part)
if not key_record or not key_record['is_active']:
return False
# Verify secret part
secret_hash = hashlib.sha256(secret_part.encode()).hexdigest()
if not secrets.compare_digest(secret_hash, key_record['secret_hash']):
self.log_failed_attempt(public_part, request_info)
return False
# Check expiration
if key_record['expires_at']:
if datetime.utcnow() > key_record['expires_at']:
return False
# Check IP whitelist
if key_record['allowed_ips']:
if request_info['ip'] not in key_record['allowed_ips']:
self.log_suspicious_activity(
public_part,
f"Request from unauthorized IP: {request_info['ip']}"
)
return False
# Check origin whitelist
if key_record['allowed_origins']:
if request_info.get('origin') not in key_record['allowed_origins']:
return False
# Update usage
self.update_key_usage(public_part)
return True
except Exception as e:
logging.error(f"Key verification error: {e}")
return False
def log_failed_attempt(self, key_id: str, request_info: dict):
"""Log failed authentication attempt"""
key_record = db.get_key_by_id(key_id)
key_record['failed_attempts'] += 1
key_record['last_failure'] = datetime.utcnow()
# Auto-revoke after too many failures
if key_record['failed_attempts'] >= 5:
key_record['is_active'] = False
self.send_alert(key_record['user_id'], "API key auto-revoked due to failed attempts")
db.save_key(key_record)
def rotate_key(self, old_key_id: str) -> dict:
"""Rotate API key (maintain old key for grace period)"""
old_record = db.get_key_by_id(old_key_id)
# Generate new key
new_key_data = self.generate_key(
old_record['user_id'],
old_record['name'] + ' (rotated)'
)
# Mark old key for deprecation
old_record['deprecated_at'] = datetime.utcnow()
old_record['grace_period_end'] = datetime.utcnow() + timedelta(days=30)
db.save_key(old_record)
return new_key_data
2. Key Restrictions:
Click to view code (python)
class APIKeyRestrictions:
def check_ip_whitelist(self, key: dict, ip: str) -> bool:
"""Only allow from specific IPs"""
if not key['allowed_ips']:
return True # No restriction
return ip in key['allowed_ips']
def check_origin_whitelist(self, key: dict, origin: str) -> bool:
"""Only allow from specific origins (for browser requests)"""
if not key['allowed_origins']:
return True
return origin in key['allowed_origins']
def check_scopes(self, key: dict, required_scope: str) -> bool:
"""Check if key has required permission"""
if not key['scopes']:
return False # Deny by default
return required_scope in key['scopes']
def check_time_window(self, key: dict) -> bool:
"""Only allow during specific hours"""
if 'allowed_hours' not in key:
return True
current_hour = datetime.utcnow().hour
return current_hour in key['allowed_hours']
# Apply restrictions
@app.before_request
def validate_api_key():
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API key required'}), 401
key_manager = SecureAPIKeyManager()
restrictions = APIKeyRestrictions()
# Verify key
request_info = {
'ip': request.remote_addr,
'origin': request.headers.get('Origin'),
'user_agent': request.headers.get('User-Agent')
}
if not key_manager.verify_key(api_key, request_info):
return jsonify({'error': 'Invalid API key'}), 401
# Get key record
key_id = api_key.split('_')[1]
key_record = db.get_key_by_id(key_id)
# Apply restrictions
if not restrictions.check_ip_whitelist(key_record, request.remote_addr):
return jsonify({'error': 'IP not allowed'}), 403
if not restrictions.check_origin_whitelist(
key_record,
request.headers.get('Origin')
):
return jsonify({'error': 'Origin not allowed'}), 403
# Store for later use
g.api_key = key_record
3. Anomaly Detection:
Click to view code (python)
class APIKeyAnomalyDetector:
def detect_anomalies(self, key_id: str, request_info: dict) -> list:
"""Detect suspicious patterns"""
anomalies = []
key_record = db.get_key_by_id(key_id)
# 1. Unusual location
recent_locations = self.get_recent_locations(key_id)
if request_info['ip'] not in recent_locations:
geo_distance = self.calculate_distance(
self.get_location(recent_locations[0]),
self.get_location(request_info['ip'])
)
if geo_distance > 1000: # km
anomalies.append({
'type': 'unusual_location',
'severity': 'high',
'details': f'Request from {geo_distance}km away'
})
# 2. Unusual traffic pattern
request_rate = self.get_request_rate(key_id, window=60) # Last minute
avg_rate = key_record.get('avg_request_rate', 10)
if request_rate > avg_rate * 10:
anomalies.append({
'type': 'unusual_traffic',
'severity': 'medium',
'details': f'Request rate {request_rate}/min vs avg {avg_rate}/min'
})
# 3. Unusual endpoint access
endpoint = request_info['endpoint']
recent_endpoints = self.get_recent_endpoints(key_id)
if endpoint not in recent_endpoints:
anomalies.append({
'type': 'new_endpoint',
'severity': 'low',
'details': f'First access to {endpoint}'
})
# 4. Unusual time
hour = datetime.utcnow().hour
active_hours = key_record.get('typical_active_hours', [])
if active_hours and hour not in active_hours:
anomalies.append({
'type': 'unusual_time',
'severity': 'low',
'details': f'Request at unusual hour: {hour}'
})
# Take action based on severity
if any(a['severity'] == 'high' for a in anomalies):
self.auto_revoke_key(key_id)
self.send_alert(key_record['user_id'], anomalies)
return anomalies
4. Key Leak Detection:
Click to view code (python)
class APIKeyLeakDetector:
def scan_github(self, pattern: str):
"""Scan GitHub for leaked keys"""
# Use GitHub API to search for pattern
# sk_[a-zA-Z0-9_-]{40,}
pass
def scan_pastebin(self, pattern: str):
"""Scan Pastebin for leaked keys"""
pass
def handle_leaked_key(self, key_id: str):
"""Handle detected leaked key"""
# 1. Immediately revoke
key_record = db.get_key_by_id(key_id)
key_record['is_active'] = False
key_record['revoked_reason'] = 'Leaked online'
db.save_key(key_record)
# 2. Alert user
self.send_alert(
key_record['user_id'],
f"API key '{key_record['name']}' was found leaked online and has been revoked"
)
# 3. Log incident
security_logger.log_security_incident({
'type': 'key_leak',
'key_id': key_id,
'user_id': key_record['user_id']
})
Best Practices Summary:
- Key Format: Use prefixes and structure (skpublicsecret)
- Storage: Store only hashes, never plaintext
- Restrictions: IP whitelist, origin whitelist, scopes
- Monitoring: Track usage patterns, detect anomalies
- Rotation: Regular key rotation, grace periods
- Expiration: Set expiration dates
- Rate Limiting: Per-key rate limits
- Revocation: Easy revocation process
- Leak Detection: Automated scanning
- Alerts: Notify users of suspicious activity
Q4: Design end-to-end encryption for a chat application. How do you handle key management?
Answer:
Requirements:
- End-to-end encryption (server can't read messages)
- Forward secrecy (past messages safe even if key compromised)
- Multi-device support
- Group chats
- Secure key exchange
Solution: Signal Protocol (Double Ratchet Algorithm)
Architecture:
Click to view code (python)
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import x25519
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os
class SignalProtocol:
def __init__(self):
self.identity_key = self.generate_identity_key()
self.signed_pre_key = self.generate_pre_key()
self.one_time_pre_keys = [self.generate_pre_key() for _ in range(100)]
def generate_identity_key(self) -> x25519.X25519PrivateKey:
"""Generate long-term identity key"""
return x25519.X25519PrivateKey.generate()
def generate_pre_key(self) -> x25519.X25519PrivateKey:
"""Generate pre-key for key exchange"""
return x25519.X25519PrivateKey.generate()
def register_user(self, user_id: str):
"""Register user and upload public keys to server"""
identity_public_key = self.identity_key.public_key()
signed_pre_key_public = self.signed_pre_key.public_key()
one_time_pre_keys_public = [
key.public_key() for key in self.one_time_pre_keys
]
# Upload to server
server_db.store_user_keys({
'user_id': user_id,
'identity_key': self.serialize_public_key(identity_public_key),
'signed_pre_key': self.serialize_public_key(signed_pre_key_public),
'one_time_pre_keys': [
self.serialize_public_key(k) for k in one_time_pre_keys_public
]
})
def initiate_session(self, sender_id: str, recipient_id: str):
"""Initiate encrypted session (X3DH key exchange)"""
# 1. Fetch recipient's public keys from server
recipient_keys = server_db.get_user_keys(recipient_id)
recipient_identity_key = self.deserialize_public_key(
recipient_keys['identity_key']
)
recipient_signed_pre_key = self.deserialize_public_key(
recipient_keys['signed_pre_key']
)
recipient_one_time_pre_key = self.deserialize_public_key(
recipient_keys['one_time_pre_keys'][0]
)
# 2. Generate ephemeral key
ephemeral_key = x25519.X25519PrivateKey.generate()
# 3. Perform Diffie-Hellman exchanges
dh1 = self.identity_key.exchange(recipient_signed_pre_key)
dh2 = ephemeral_key.exchange(recipient_identity_key)
dh3 = ephemeral_key.exchange(recipient_signed_pre_key)
dh4 = ephemeral_key.exchange(recipient_one_time_pre_key)
# 4. Derive shared secret
shared_secret = dh1 + dh2 + dh3 + dh4
# 5. Derive root key and chain key
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=64,
salt=None,
info=b'Signal'
)
key_material = hkdf.derive(shared_secret)
root_key = key_material[:32]
chain_key = key_material[32:]
# Store session
session = {
'recipient_id': recipient_id,
'root_key': root_key,
'sending_chain_key': chain_key,
'receiving_chain_key': None,
'message_number': 0
}
return session
def encrypt_message(
self,
session: dict,
plaintext: str
) -> dict:
"""Encrypt message using Double Ratchet"""
# 1. Derive message key from chain key
message_key = self.derive_message_key(
session['sending_chain_key'],
session['message_number']
)
# 2. Encrypt message with AES-GCM
iv = os.urandom(12)
cipher = Cipher(
algorithms.AES(message_key),
modes.GCM(iv)
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(plaintext.encode()) + encryptor.finalize()
# 3. Advance chain key (forward secrecy)
session['sending_chain_key'] = self.advance_chain_key(
session['sending_chain_key']
)
session['message_number'] += 1
return {
'ciphertext': ciphertext,
'iv': iv,
'tag': encryptor.tag,
'message_number': session['message_number'] - 1
}
def decrypt_message(
self,
session: dict,
encrypted_message: dict
) -> str:
"""Decrypt message"""
# 1. Derive message key
message_key = self.derive_message_key(
session['receiving_chain_key'],
encrypted_message['message_number']
)
# 2. Decrypt with AES-GCM
cipher = Cipher(
algorithms.AES(message_key),
modes.GCM(encrypted_message['iv'], encrypted_message['tag'])
)
decryptor = cipher.decryptor()
plaintext = decryptor.update(encrypted_message['ciphertext']) + decryptor.finalize()
# 3. Advance chain key
session['receiving_chain_key'] = self.advance_chain_key(
session['receiving_chain_key']
)
return plaintext.decode()
def derive_message_key(self, chain_key: bytes, message_num: int) -> bytes:
"""Derive message key from chain key"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=f'message_{message_num}'.encode()
)
return hkdf.derive(chain_key)
def advance_chain_key(self, chain_key: bytes) -> bytes:
"""Advance chain key (ratchet forward)"""
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'chain_key'
)
return hkdf.derive(chain_key)
def serialize_public_key(self, public_key) -> str:
"""Serialize public key for transmission"""
return public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
).hex()
def deserialize_public_key(self, key_hex: str):
"""Deserialize public key"""
key_bytes = bytes.fromhex(key_hex)
return x25519.X25519PublicKey.from_public_bytes(key_bytes)
Multi-Device Support:
Click to view code (python)
class MultiDeviceManager:
def register_device(self, user_id: str, device_id: str):
"""Register new device for user"""
protocol = SignalProtocol()
protocol.register_user(f"{user_id}:{device_id}")
# Store device keys
db.add_device({
'user_id': user_id,
'device_id': device_id,
'identity_key': protocol.identity_key,
'registered_at': datetime.utcnow()
})
def send_message_to_user(
self,
sender_id: str,
recipient_id: str,
message: str
):
"""Send message to all recipient devices"""
# Get all recipient devices
devices = db.get_user_devices(recipient_id)
encrypted_messages = []
for device in devices:
# Create session with each device
device_full_id = f"{recipient_id}:{device['device_id']}"
session = protocol.initiate_session(sender_id, device_full_id)
# Encrypt message for this device
encrypted = protocol.encrypt_message(session, message)
encrypted_messages.append({
'device_id': device['device_id'],
'encrypted_message': encrypted
})
# Send to server for delivery
server_db.store_messages(recipient_id, encrypted_messages)
Group Chat Encryption:
Click to view code (python)
class GroupChatEncryption:
def create_group(self, creator_id: str, member_ids: list) -> str:
"""Create encrypted group chat"""
group_id = secrets.token_hex(16)
# Generate group symmetric key
group_key = os.urandom(32)
# Encrypt group key for each member
encrypted_keys = {}
for member_id in member_ids:
# Get member's public key
member_keys = server_db.get_user_keys(member_id)
# Encrypt group key with member's public key
# (simplified - use proper hybrid encryption)
encrypted_key = self.encrypt_for_user(group_key, member_keys)
encrypted_keys[member_id] = encrypted_key
# Store group info
db.create_group({
'group_id': group_id,
'creator_id': creator_id,
'member_ids': member_ids,
'encrypted_keys': encrypted_keys,
'created_at': datetime.utcnow()
})
return group_id
def send_group_message(
self,
sender_id: str,
group_id: str,
message: str
):
"""Send message to group"""
# Get group key (encrypted for sender)
group = db.get_group(group_id)
group_key_encrypted = group['encrypted_keys'][sender_id]
# Decrypt group key
group_key = self.decrypt_group_key(group_key_encrypted, sender_id)
# Encrypt message with group key
iv = os.urandom(12)
cipher = Cipher(
algorithms.AES(group_key),
modes.GCM(iv)
)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(message.encode()) + encryptor.finalize()
# Store encrypted message
db.store_group_message({
'group_id': group_id,
'sender_id': sender_id,
'ciphertext': ciphertext,
'iv': iv,
'tag': encryptor.tag,
'timestamp': datetime.utcnow()
})
Key Features:
- Forward Secrecy: Each message uses different key
- Future Secrecy: Compromised key doesn't affect future messages
- Multi-Device: Each device has own keys
- Group Chats: Symmetric group key encrypted per member
- Server Blind: Server can't decrypt messages
Security Properties:
- End-to-end encrypted
- Perfect forward secrecy
- Post-compromise security
- Deniability
- Asynchronous (offline message delivery)
Q5: How would you implement GDPR compliance for user data deletion and export?
Answer:
GDPR Requirements:
- Right to be Forgotten: Delete user data within 30 days
- Right to Data Portability: Export all user data in machine-readable format
- Right to Access: User can view what data is stored
- Consent Management: Track and respect user consent
- Data Breach Notification: Notify within 72 hours
Implementation:
Click to view code (python)
from datetime import datetime, timedelta
import json
import csv
from typing import Dict, List
class GDPRComplianceService:
def __init__(self):
self.deletion_grace_period = timedelta(days=30)
# 1. Right to be Forgotten
def request_data_deletion(self, user_id: str, reason: str = None):
"""
Request user data deletion (GDPR Article 17)
- Don't delete immediately (legal hold, backups)
- Mark for deletion
- Anonymize instead of hard delete
"""
user = db.get_user(user_id)
if not user:
raise Exception("User not found")
# Create deletion request
deletion_request = {
'user_id': user_id,
'requested_at': datetime.utcnow(),
'scheduled_for': datetime.utcnow() + self.deletion_grace_period,
'reason': reason,
'status': 'pending',
'completed_at': None
}
db.create_deletion_request(deletion_request)
# Send confirmation email
self.send_deletion_confirmation(user.email, deletion_request)
return deletion_request
def execute_data_deletion(self, user_id: str):
"""
Execute data deletion after grace period
Strategy: Anonymize instead of hard delete (preserve data integrity)
"""
# 1. Personal Information
user = db.get_user(user_id)
user.email = f"deleted_{user_id}@anonymized.local"
user.name = "Deleted User"
user.phone = None
user.address = None
user.date_of_birth = None
user.profile_picture = None
user.bio = None
user.is_deleted = True
user.deleted_at = datetime.utcnow()
db.save(user)
# 2. Authentication
db.delete_user_sessions(user_id)
db.delete_user_tokens(user_id)
db.delete_user_api_keys(user_id)
db.delete_user_oauth_connections(user_id)
# 3. User-generated content
# Keep content but anonymize authorship
posts = db.get_user_posts(user_id)
for post in posts:
post.author_name = "Deleted User"
post.author_id = None
db.save(post)
comments = db.get_user_comments(user_id)
for comment in comments:
comment.author_name = "Deleted User"
comment.author_id = None
db.save(comment)
# 4. Activity logs
# Keep for legal/audit but anonymize
logs = db.get_user_activity_logs(user_id)
for log in logs:
log.user_id = "anonymized"
log.ip_address = "0.0.0.0"
log.user_agent = "anonymized"
db.save(log)
# 5. Third-party integrations
# Request deletion from integrated services
self.request_third_party_deletion(user_id)
# 6. Backups
# Mark for deletion in next backup cycle
db.mark_for_backup_deletion(user_id)
# 7. Analytics
# Anonymize in analytics database
analytics_db.anonymize_user(user_id)
# 8. Billing (keep for legal requirements)
# Keep billing records but remove PII
invoices = db.get_user_invoices(user_id)
for invoice in invoices:
invoice.customer_name = "Deleted User"
invoice.email = f"deleted_{user_id}@anonymized.local"
invoice.address = None
db.save(invoice)
# Update deletion request
deletion_request = db.get_deletion_request(user_id)
deletion_request['status'] = 'completed'
deletion_request['completed_at'] = datetime.utcnow()
db.save(deletion_request)
# Log for audit
self.log_gdpr_event({
'event': 'data_deletion_completed',
'user_id': user_id,
'timestamp': datetime.utcnow()
})
# 2. Right to Data Portability
def export_user_data(self, user_id: str, format: str = 'json') -> dict:
"""
Export all user data (GDPR Article 20)
Format: JSON or CSV
"""
user = db.get_user(user_id)
if not user:
raise Exception("User not found")
# Collect all user data
export_data = {
'export_info': {
'user_id': user_id,
'exported_at': datetime.utcnow().isoformat(),
'format': format
},
'personal_information': {
'email': user.email,
'name': user.name,
'phone': user.phone,
'address': user.address,
'date_of_birth': user.date_of_birth.isoformat() if user.date_of_birth else None,
'account_created': user.created_at.isoformat(),
'profile_picture_url': user.profile_picture,
'bio': user.bio
},
'posts': [
{
'id': post.id,
'title': post.title,
'content': post.content,
'created_at': post.created_at.isoformat(),
'likes': post.like_count,
'comments': post.comment_count
}
for post in db.get_user_posts(user_id)
],
'comments': [
{
'id': comment.id,
'post_id': comment.post_id,
'content': comment.content,
'created_at': comment.created_at.isoformat()
}
for comment in db.get_user_comments(user_id)
],
'likes': [
{
'post_id': like.post_id,
'liked_at': like.created_at.isoformat()
}
for like in db.get_user_likes(user_id)
],
'followers': [
{
'user_id': follower.id,
'username': follower.username,
'followed_at': follower.followed_at.isoformat()
}
for follower in db.get_user_followers(user_id)
],
'following': [
{
'user_id': following.id,
'username': following.username,
'followed_at': following.followed_at.isoformat()
}
for following in db.get_user_following(user_id)
],
'messages': [
{
'conversation_id': msg.conversation_id,
'content': msg.content,
'sent_at': msg.created_at.isoformat(),
'recipient': msg.recipient.username
}
for msg in db.get_user_messages(user_id)
],
'activity_log': [
{
'action': log.action,
'timestamp': log.timestamp.isoformat(),
'ip_address': log.ip_address,
'user_agent': log.user_agent
}
for log in db.get_user_activity_logs(user_id, limit=1000)
],
'consent_records': [
{
'purpose': consent.purpose,
'granted': consent.granted,
'timestamp': consent.timestamp.isoformat()
}
for consent in db.get_user_consents(user_id)
],
'payment_history': [
{
'invoice_id': payment.invoice_id,
'amount': payment.amount,
'currency': payment.currency,
'date': payment.created_at.isoformat(),
'description': payment.description
}
for payment in db.get_user_payments(user_id)
]
}
# Generate export file
if format == 'json':
export_file = self.generate_json_export(export_data)
elif format == 'csv':
export_file = self.generate_csv_export(export_data)
else:
raise Exception("Unsupported format")
# Store export (temporary, auto-delete after 30 days)
export_record = {
'user_id': user_id,
'file_path': export_file,
'created_at': datetime.utcnow(),
'expires_at': datetime.utcnow() + timedelta(days=30),
'download_count': 0
}
db.create_export_record(export_record)
# Log for audit
self.log_gdpr_event({
'event': 'data_export_created',
'user_id': user_id,
'timestamp': datetime.utcnow()
})
# Send email with download link
self.send_export_ready_email(user.email, export_record)
return export_record
def generate_json_export(self, data: dict) -> str:
"""Generate JSON export file"""
filename = f"data_export_{data['export_info']['user_id']}_{int(time.time())}.json"
filepath = f"/exports/{filename}"
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
return filepath
# 3. Consent Management
def record_consent(
self,
user_id: str,
purpose: str,
granted: bool,
consent_text: str
):
"""Record user consent for data processing"""
consent = {
'user_id': user_id,
'purpose': purpose, # e.g., 'marketing', 'analytics', 'third_party'
'granted': granted,
'consent_text': consent_text,
'timestamp': datetime.utcnow(),
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
}
db.create_consent_record(consent)
# If consent revoked, take action
if not granted:
self.handle_consent_revocation(user_id, purpose)
def handle_consent_revocation(self, user_id: str, purpose: str):
"""Handle consent revocation"""
if purpose == 'marketing':
# Unsubscribe from marketing emails
db.update_user_preferences(user_id, {
'marketing_emails': False
})
elif purpose == 'analytics':
# Stop tracking in analytics
analytics_db.opt_out_user(user_id)
elif purpose == 'third_party':
# Stop sharing with third parties
db.update_user_preferences(user_id, {
'third_party_sharing': False
})
# 4. Data Breach Notification
def handle_data_breach(
self,
affected_user_ids: List[str],
breach_details: dict
):
"""Handle data breach (GDPR Article 33/34)"""
breach_record = {
'id': secrets.token_hex(16),
'detected_at': datetime.utcnow(),
'affected_users': len(affected_user_ids),
'breach_type': breach_details['type'],
'description': breach_details['description'],
'severity': breach_details['severity'],
'notified_at': None,
'authority_notified_at': None
}
db.create_breach_record(breach_record)
# Notify supervisory authority within 72 hours
if breach_details['severity'] in ['high', 'critical']:
self.notify_supervisory_authority(breach_record)
# Notify affected users
for user_id in affected_user_ids:
user = db.get_user(user_id)
self.notify_user_of_breach(user.email, breach_record)
breach_record['notified_at'] = datetime.utcnow()
db.save(breach_record)
# 5. Data Retention Policy
def enforce_retention_policy(self):
"""Enforce data retention policies"""
# Delete old logs
db.delete_logs_older_than(days=365)
# Delete old sessions
db.delete_sessions_older_than(days=30)
# Delete expired exports
db.delete_expired_exports()
# Anonymize old activity logs
db.anonymize_logs_older_than(days=730) # 2 years
# API Endpoints
@app.route('/api/gdpr/delete-account', methods=['POST'])
@require_authentication
def request_account_deletion():
"""Request account deletion"""
user_id = g.user_id
reason = request.json.get('reason')
gdpr = GDPRComplianceService()
deletion_request = gdpr.request_data_deletion(user_id, reason)
return jsonify({
'message': 'Deletion request created',
'scheduled_for': deletion_request['scheduled_for'].isoformat()
})
@app.route('/api/gdpr/export-data', methods=['POST'])
@require_authentication
def export_my_data():
"""Export user data"""
user_id = g.user_id
format = request.json.get('format', 'json')
gdpr = GDPRComplianceService()
export_record = gdpr.export_user_data(user_id, format)
return jsonify({
'message': 'Export created',
'download_url': f"/api/gdpr/download/{export_record['id']}",
'expires_at': export_record['expires_at'].isoformat()
})
@app.route('/api/gdpr/consent', methods=['POST'])
@require_authentication
def update_consent():
"""Update consent preferences"""
user_id = g.user_id
purpose = request.json['purpose']
granted = request.json['granted']
gdpr = GDPRComplianceService()
gdpr.record_consent(user_id, purpose, granted, request.json.get('consent_text', ''))
return jsonify({'message': 'Consent updated'})
Best Practices:
- Anonymize, don't delete: Preserve data integrity
- Grace period: 30 days before deletion
- Audit trail: Log all GDPR operations
- Automation: Scheduled jobs for retention
- Third-party cleanup: Request deletion from integrations
- Backups: Include deletion in backup cycles
- Legal holds: Check before deletion
- Export format: Machine-readable (JSON, CSV)
- Consent tracking: Detailed consent records
- Breach notification: Within 72 hours
This comprehensive security guide covers:
- Authentication & Authorization (passwords, MFA, JWT, OAuth, RBAC, ABAC)
- API Security (input validation, rate limiting, API keys, CORS, HTTPS, request signing)
- Data Security (encryption at rest/transit, masking, tokenization)
- Network Security (DDoS protection, firewall rules)
- Application Security (XSS, CSRF, session management)
- Infrastructure Security (containers, secrets, logging)
- Compliance (GDPR, PCI DSS)
- Interview questions with detailed answers
All with production-ready code examples and best practices!