Troubleshooting and Error Handling¶
Section Overview
Comprehensive strategies for logging, error handling, debugging, and incident management that ensure system reliability and rapid problem resolution.
Quick Navigation¶
Logging Architecture¶
Build robust logging systems that provide complete observability
| Topic | Focus Area |
|---|---|
| Structured Logging | Machine-parseable log formats |
| Log Levels | Appropriate severity classification |
| Contextual Logging | Request tracing and correlation |
Error Management¶
Systematic approaches to handling and recovering from errors
| Topic | Focus Area |
|---|---|
| Error Classification | Categorizing error types |
| Retry Patterns | Resilient failure handling |
| Circuit Breakers | Preventing cascade failures |
Debugging Tools¶
Effective debugging techniques across different environments
| Topic | Focus Area |
|---|---|
| IDE Debugging | Development environment tools |
| Production Debugging | Safe production troubleshooting |
| Performance Profiling | Identifying bottlenecks |
Incident Management¶
Structured approaches to incident response and resolution
| Topic | Focus Area |
|---|---|
| Severity Levels | Incident classification |
| Response Procedures | Escalation and resolution |
| Post-mortems | Learning from incidents |
Core Principles¶
Observability First
Design systems with observability built-in from the start. Logs, metrics, and traces should be first-class citizens, not afterthoughts.
Fail Gracefully
Systems should degrade gracefully under failure conditions. Provide reduced functionality rather than complete outages whenever possible.
Learn and Improve
Every incident is a learning opportunity. Conduct thorough post-mortems and implement preventive measures systematically.
Key Concepts¶
The Three Pillars of Observability¶
- Logs
- Timestamped records of discrete events in your system
- Metrics
- Numerical measurements of system behavior over time
- Traces
- Records of request flows through distributed systems
Error Handling Hierarchy¶
graph TD
A[Error Occurs] --> B{Can Recover?}
B -->|Yes| C[Handle Locally]
B -->|No| D[Propagate Up]
C --> E[Log and Continue]
D --> F{Critical?}
F -->|Yes| G[Alert and Escalate]
F -->|No| H[Log and Track]
G --> I[Incident Response]
H --> J[Monitor Patterns] Getting Started¶
For New Team Members¶
- Start with Logging - Understand our structured logging approach
- Learn Error Patterns - Review common error scenarios and handling
- Practice Debugging - Set up your debugging environment
- Study Incidents - Review past incident post-mortems
For Experienced Developers¶
Jump directly to specific topics using the navigation tabs above or use the search function to find detailed guidance on particular scenarios.
Best Practices Summary¶
| Practice | Why It Matters |
|---|---|
| Structured Logging | Enables automated analysis and alerting |
| Correlation IDs | Traces requests across distributed systems |
| Graceful Degradation | Maintains service availability during failures |
| Post-mortem Reviews | Prevents incident recurrence |
| Monitoring Thresholds | Detects issues before they become critical |
Common Scenarios¶
Scenario: Intermittent API Failures
Problem: External API occasionally times out
Solution: Implement retry logic with exponential backoff + circuit breaker
Reference: Retry Patterns | Circuit Breakers
Scenario: High Memory Usage
Problem: Application memory consumption growing over time
Solution: Profile application, identify leaks, implement memory monitoring
Reference: Performance Profiling
Scenario: Production Issue Investigation
Problem: Need to debug production issue without disrupting service
Solution: Use structured logging, distributed tracing, and safe debugging techniques
Reference: Production Debugging
Logging Architecture and Best Practices¶
Core Logging Principles¶
Fundamental Principle: Implement a comprehensive logging system that provides complete system observability while maintaining performance and security.
Key Guidelines
- Structure all logs in machine-parseable formats
- Implement consistent logging patterns across services
- Ensure logs provide full request context
- Maintain security and compliance in log content
- Optimize log storage and retention
Why This Matters
A well-designed logging architecture is fundamental for rapid problem diagnosis, performance monitoring, security auditing, compliance requirements, and system behavior analysis.
Structured Logging Implementation¶
Standard Log Entry Fields¶
Every log entry must include required fields for consistency and traceability.
Required Fields:
{
"timestamp": "2024-12-15T10:30:45Z",
"level": "ERROR",
"service": "payment-service",
"environment": "production",
"requestId": "req-abc-123",
"correlationId": "trace-xyz-789",
"message": "Payment processing failed",
"sourceLocation": {
"file": "payment_processor.py",
"line": 145,
"function": "process_payment"
}
}
Conditional Fields:
{
"userId": "user_12345",
"tenantId": "tenant_abc",
"errorDetails": {
"code": "PAYMENT_GATEWAY_ERROR",
"type": "GatewayTimeoutError",
"stack": "..."
},
"performance": {
"duration": 2500,
"memoryUsage": 128000000
}
}
Implementation Examples¶
import structlog
import time
from typing import Dict, Any
from contextlib import contextmanager
class StructuredLogger:
def __init__(self):
self.logger = structlog.get_logger()
self.context: Dict[str, Any] = {}
def set_context(self, **kwargs):
"""Set context that will be included in all subsequent log entries"""
self.context.update(kwargs)
@contextmanager
def operation_logger(self, operation_name: str):
"""Context manager for logging operation duration and status"""
start_time = time.time()
try:
yield
duration = (time.time() - start_time) * 1000
self.logger.info(
f"{operation_name}_completed",
duration_ms=duration,
status="success",
**self.context
)
except Exception as e:
duration = (time.time() - start_time) * 1000
self.logger.error(
f"{operation_name}_failed",
duration_ms=duration,
status="error",
error_type=type(e).__name__,
error_message=str(e),
**self.context
)
raise
def audit_log(self, action: str, resource: str, changes: Dict[str, Any]):
"""Special logging for audit events"""
self.logger.info(
"audit_event",
action=action,
resource=resource,
changes=changes,
timestamp=time.time(),
**self.context
)
# Usage Example
logger = StructuredLogger()
def process_order(order_id: str, user_id: str):
logger.set_context(order_id=order_id, user_id=user_id)
with logger.operation_logger("order_processing"):
# Order processing logic here
order = fetch_order(order_id)
validate_order(order)
process_payment(order)
logger.audit_log(
action="order_processed",
resource=f"order/{order_id}",
changes={"status": "completed", "processed_at": time.time()}
)
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
public class EnhancedLogger {
private static final Logger logger = LogManager.getLogger();
private static final ObjectMapper mapper = new ObjectMapper();
private final ThreadLocal<Map<String, Object>> context =
ThreadLocal.withInitial(ConcurrentHashMap::new);
public class LogBuilder {
private final Map<String, Object> logData = new HashMap<>();
public LogBuilder withField(String key, Object value) {
logData.put(key, value);
return this;
}
public LogBuilder withException(Exception e) {
logData.put("errorType", e.getClass().getSimpleName());
logData.put("errorMessage", e.getMessage());
logData.put("stackTrace", e.getStackTrace());
return this;
}
public void info(String message) {
log("INFO", message, logData);
}
public void error(String message) {
log("ERROR", message, logData);
}
private void log(String level, String message, Map<String, Object> data) {
Map<String, Object> fullLogEntry = new HashMap<>(context.get());
fullLogEntry.putAll(data);
fullLogEntry.put("timestamp", Instant.now().toString());
fullLogEntry.put("level", level);
fullLogEntry.put("message", message);
try {
String jsonLog = mapper.writeValueAsString(fullLogEntry);
if ("ERROR".equals(level)) {
logger.error(jsonLog);
} else {
logger.info(jsonLog);
}
} catch (Exception e) {
logger.error("Failed to serialize log entry", e);
}
}
}
public void setContext(String key, Object value) {
context.get().put(key, value);
}
public LogBuilder log() {
return new LogBuilder();
}
}
const winston = require('winston');
class StructuredLogger {
constructor() {
this.logger = winston.createLogger({
format: winston.format.combine(
winston.format.timestamp(),
winston.format.json()
),
transports: [
new winston.transports.Console(),
new winston.transports.File({ filename: 'application.log' })
]
});
this.context = {};
}
setContext(key, value) {
this.context[key] = value;
}
async withOperation(operationName, fn) {
const startTime = Date.now();
try {
const result = await fn();
const duration = Date.now() - startTime;
this.logger.info({
message: `${operationName}_completed`,
duration,
status: 'success',
...this.context
});
return result;
} catch (error) {
const duration = Date.now() - startTime;
this.logger.error({
message: `${operationName}_failed`,
duration,
status: 'error',
errorType: error.constructor.name,
errorMessage: error.message,
...this.context
});
throw error;
}
}
}
// Usage Example
const logger = new StructuredLogger();
async function processOrder(orderId, userId) {
logger.setContext('orderId', orderId);
logger.setContext('userId', userId);
await logger.withOperation('order_processing', async () => {
// Order processing logic
});
}
Log Levels and Their Application¶
Comprehensive Log Level Guide¶
Understanding when to use each log level is crucial for effective system observability.
| Level | Purpose | When to Use | Example Scenarios |
|---|---|---|---|
| TRACE | Extremely detailed debugging | Function entry/exit, loop iterations | trace("Entering calculateTotal", items=items) |
| DEBUG | Detailed development info | Database queries, API calls, cache operations | debug("Query executed", query=sql, duration=ms) |
| INFO | Normal application flow | Service startup, user actions, scheduled tasks | info("Application started", version=v, env=prod) |
| WARN | Potentially harmful situations | Deprecated features, resource limits, recoverable errors | warn("Rate limit approaching", current=90, limit=100) |
| ERROR | Error events allowing continuation | Failed operations, integration errors | error("Payment failed", order_id=id, error=msg) |
| FATAL | Severe errors causing shutdown | Database unavailable, critical resource failure | fatal("Cannot initialize", reason=db_error) |
Practical Examples by Log Level¶
logger.trace("Entering calculateTotal with items", items=items)
logger.trace("Loop iteration", iteration=i, currentSum=total)
logger.trace("Exiting calculateTotal with result", total=total)
Use for: Granular debugging during development
logger.debug("Database query executed",
query=query_string,
parameters=params,
execution_time=duration)
logger.debug("Cache hit",
key=cache_key,
ttl_remaining=ttl)
Use for: Development and staging diagnostics
logger.info("Application started",
version=app_version,
environment=env,
config=config_summary)
logger.info("User logged in",
user_id=user.id,
auth_method="oauth")
Use for: Production operational visibility
logger.warn("API rate limit approaching",
current_rate=current,
limit=max_limit,
time_window="1 minute")
logger.warn("Deprecated API used",
endpoint="/api/v1/users",
replacement="/api/v2/users")
Use for: Potential issues requiring attention
Contextual Logging Implementation¶
Request Context Tracking¶
Core Principle: Capture and maintain request-level context throughout the entire transaction lifecycle.
import threading
import uuid
import time
from typing import Dict, Any, Optional
class RequestContextManager:
def __init__(self):
# Thread-local storage for context
self._context = threading.local()
def set_context(self, **kwargs):
"""Set context values for the current request/thread"""
if not hasattr(self._context, 'data'):
self._context.data = {}
self._context.data.update(kwargs)
def get_context(self, key: Optional[str] = None, default=None):
"""Retrieve context values"""
if not hasattr(self._context, 'data'):
return default
return self._context.data.get(key, default) if key else self._context.data
def clear_context(self):
"""Clear context for the current request/thread"""
if hasattr(self._context, 'data'):
del self._context.data
def create_context_decorator(self):
"""Decorator to manage context for function calls"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
request_id = str(uuid.uuid4())
self.set_context(
request_id=request_id,
start_time=time.time()
)
try:
return func(*args, **kwargs)
finally:
self.clear_context()
return wrapper
return decorator
# Global context manager
request_context = RequestContextManager()
# Usage example
@request_context.create_context_decorator()
def process_order(order_id):
context = request_context.get_context()
logger.info(f"Processing order {order_id}",
extra={
'request_id': context.get('request_id'),
'start_time': context.get('start_time')
})
Logging Best Practices¶
Performance Considerations
- Use asynchronous logging for high-throughput systems
- Implement log sampling for very verbose operations
- Avoid logging in tight loops without sampling
- Use structured logging to enable efficient log queries
Security Considerations
- Never log sensitive data (passwords, tokens, PII)
- Mask or hash identifiable information
- Implement access controls for log data
- Comply with data retention policies
Operational Guidelines
- Centralize logs using tools like ELK, Splunk, or CloudWatch
- Set up log rotation to manage disk space
- Implement log level configuration without redeployment
- Use correlation IDs to trace requests across services
Error Handling Strategies¶
Error Classification System¶
Error Categories¶
Classification Benefits
Proper error classification enables appropriate handling strategies, correct severity levels, and effective monitoring.
| Category | Description | Example | Response |
|---|---|---|---|
| Validation | Input validation failures | Invalid email format | Return 400, clear message |
| Business Logic | Business rule violations | Insufficient balance | Return 422, explain constraint |
| Integration | External service errors | API timeout | Retry, then fallback |
| Security | Authentication/authorization failures | Invalid token | Return 401/403, log attempt |
| Infrastructure | System-level issues | Database down | Alert, return 503 |
Implementation Framework¶
from enum import Enum
from dataclasses import dataclass
from typing import Dict, Optional
class ErrorSeverity(Enum):
CRITICAL = "CRITICAL" # System unusable
HIGH = "HIGH" # Major functionality impacted
MEDIUM = "MEDIUM" # Partial functionality impacted
LOW = "LOW" # Minimal impact
class ErrorCategory(Enum):
VALIDATION = "VALIDATION"
BUSINESS_LOGIC = "BUSINESS_LOGIC"
INTEGRATION = "INTEGRATION"
SECURITY = "SECURITY"
INFRASTRUCTURE = "INFRASTRUCTURE"
@dataclass
class ApplicationError(Exception):
message: str
error_code: str
severity: ErrorSeverity
category: ErrorCategory
details: Optional[Dict] = None
def to_dict(self) -> Dict:
return {
"error": {
"message": self.message,
"code": self.error_code,
"severity": self.severity.value,
"category": self.category.value,
"details": self.details or {}
}
}
# Specific error types
class ValidationError(ApplicationError):
def __init__(self, message: str, details: Optional[Dict] = None):
super().__init__(
message=message,
error_code="VAL_001",
severity=ErrorSeverity.LOW,
category=ErrorCategory.VALIDATION,
details=details
)
class InsufficientBalanceError(ApplicationError):
def __init__(self, required: float, available: float):
super().__init__(
message="Insufficient account balance",
error_code="BUS_002",
severity=ErrorSeverity.MEDIUM,
category=ErrorCategory.BUSINESS_LOGIC,
details={
"required_amount": required,
"available_balance": available,
"shortfall": required - available
}
)
# Usage Example
def process_payment(order: Order, account: Account):
if order.total > account.balance:
raise InsufficientBalanceError(
required=order.total,
available=account.balance
)
try:
result = payment_gateway.charge(order)
return result
except GatewayTimeout:
raise ApplicationError(
message="Payment gateway timeout",
error_code="INT_001",
severity=ErrorSeverity.HIGH,
category=ErrorCategory.INTEGRATION,
details={"order_id": order.id}
)
class ApplicationError extends Error {
constructor(message, errorCode, severity, category, details = {}) {
super(message);
this.errorCode = errorCode;
this.severity = severity;
this.category = category;
this.details = details;
}
toJSON() {
return {
error: {
message: this.message,
code: this.errorCode,
severity: this.severity,
category: this.category,
details: this.details
}
};
}
}
class ValidationError extends ApplicationError {
constructor(message, details = {}) {
super(message, 'VAL_001', 'LOW', 'VALIDATION', details);
}
}
class InsufficientBalanceError extends ApplicationError {
constructor(required, available) {
super(
'Insufficient account balance',
'BUS_002',
'MEDIUM',
'BUSINESS_LOGIC',
{
required_amount: required,
available_balance: available,
shortfall: required - available
}
);
}
}
// Usage Example
async function processPayment(order, account) {
if (order.total > account.balance) {
throw new InsufficientBalanceError(order.total, account.balance);
}
try {
const result = await paymentGateway.charge(order);
return result;
} catch (error) {
if (error.code === 'ETIMEDOUT') {
throw new ApplicationError(
'Payment gateway timeout',
'INT_001',
'HIGH',
'INTEGRATION',
{ order_id: order.id }
);
}
throw error;
}
}
public abstract class ApplicationError extends Exception {
private final String errorCode;
private final ErrorSeverity severity;
private final ErrorCategory category;
private final Map<String, Object> details;
public ApplicationError(String message, String errorCode,
ErrorSeverity severity, ErrorCategory category,
Map<String, Object> details) {
super(message);
this.errorCode = errorCode;
this.severity = severity;
this.category = category;
this.details = details != null ? details : new HashMap<>();
}
public Map<String, Object> toMap() {
Map<String, Object> error = new HashMap<>();
error.put("message", getMessage());
error.put("code", errorCode);
error.put("severity", severity.name());
error.put("category", category.name());
error.put("details", details);
return Collections.singletonMap("error", error);
}
}
public class InsufficientBalanceError extends ApplicationError {
public InsufficientBalanceError(double required, double available) {
super(
"Insufficient account balance",
"BUS_002",
ErrorSeverity.MEDIUM,
ErrorCategory.BUSINESS_LOGIC,
Map.of(
"required_amount", required,
"available_balance", available,
"shortfall", required - available
)
);
}
}
Retry Patterns¶
Exponential Backoff Strategy¶
When to Use
Implement retry logic for transient failures like network timeouts, rate limiting, or temporary service unavailability.
Key Concepts:
- Initial Delay: Start with small delay (e.g., 1 second)
- Exponential Growth: Double delay after each retry
- Max Delay: Cap at reasonable maximum (e.g., 60 seconds)
- Jitter: Add randomness to prevent thundering herd
import asyncio
import random
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass
T = TypeVar('T')
@dataclass
class RetryConfig:
max_attempts: int = 3
initial_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: float = 0.1
class RetryableError(Exception):
"""Errors that should trigger retry logic"""
pass
async def retry_with_backoff(
func: Callable[..., Awaitable[T]],
config: RetryConfig = RetryConfig(),
*args,
**kwargs
) -> T:
"""
Execute function with exponential backoff retry logic
"""
attempt = 0
delay = config.initial_delay
while True:
attempt += 1
try:
return await func(*args, **kwargs)
except RetryableError as e:
if attempt >= config.max_attempts:
logger.error(
"Max retry attempts reached",
attempts=attempt,
error=str(e)
)
raise
# Calculate delay with jitter
jitter_amount = delay * config.jitter
actual_delay = delay + random.uniform(-jitter_amount, jitter_amount)
logger.warning(
"Operation failed, retrying",
attempt=attempt,
delay=actual_delay,
error=str(e)
)
await asyncio.sleep(actual_delay)
# Increase delay for next attempt
delay = min(
delay * config.exponential_base,
config.max_delay
)
# Usage Example
async def fetch_user_data(user_id: str):
"""Fetch user data with automatic retry"""
async def _fetch():
async with aiohttp.ClientSession() as session:
async with session.get(f'/api/users/{user_id}') as response:
if response.status >= 500:
raise RetryableError(f"Server error: {response.status}")
return await response.json()
return await retry_with_backoff(_fetch)
Retry Decision Matrix¶
Which Errors to Retry?
Not all errors should trigger retries. Use this matrix to decide:
| Error Type | Retry? | Reason |
|---|---|---|
| Network timeout | Yes | Transient network issue |
| Rate limit (429) | Yes | Temporary capacity constraint |
| Server error (5xx) | Yes | Temporary service issue |
| Bad request (400) | No | Invalid input won't change |
| Unauthorized (401) | No | Credentials won't auto-fix |
| Not found (404) | No | Resource doesn't exist |
| Validation error | No | Data problem needs fixing |
Circuit Breaker Pattern¶
Preventing Cascade Failures¶
The Problem
When a service fails, continuous retry attempts can overwhelm the failing service and cascade to dependent services.
The Solution
Circuit breakers automatically stop requests to failing services, allowing them to recover.
Circuit States:
stateDiagram-v2
[*] --> Closed
Closed --> Open: Failure threshold reached
Open --> HalfOpen: Timeout expires
HalfOpen --> Closed: Success
HalfOpen --> Open: Failure
note right of Closed
Normal operation
All requests pass through
end note
note right of Open
Failing fast
Requests immediately rejected
end note
note right of HalfOpen
Testing recovery
Limited requests allowed
end note Implementation¶
from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, TypeVar
T = TypeVar('T')
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreakerError(Exception):
"""Raised when circuit breaker is open"""
pass
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
reset_timeout: float = 60.0,
half_open_max_calls: int = 3
):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.half_open_max_calls = half_open_max_calls
self.state = CircuitState.CLOSED
self.failures = 0
self.last_failure_time = None
self.half_open_calls = 0
async def call(
self,
func: Callable[..., Awaitable[T]],
*args,
**kwargs
) -> T:
"""Execute function through circuit breaker"""
# Check if we should transition to HALF_OPEN
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
self.half_open_calls = 0
else:
raise CircuitBreakerError("Circuit breaker is OPEN")
# Check HALF_OPEN call limit
if self.state == CircuitState.HALF_OPEN:
if self.half_open_calls >= self.half_open_max_calls:
raise CircuitBreakerError("Circuit breaker HALF_OPEN limit reached")
self.half_open_calls += 1
# Execute the function
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""Check if enough time has passed to attempt reset"""
if not self.last_failure_time:
return False
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.reset_timeout
def _on_success(self):
"""Handle successful call"""
if self.state == CircuitState.HALF_OPEN:
# Success in HALF_OPEN -> back to CLOSED
self.state = CircuitState.CLOSED
self.failures = 0
logger.info("Circuit breaker closed after successful test")
def _on_failure(self):
"""Handle failed call"""
self.failures += 1
self.last_failure_time = datetime.now()
if self.state == CircuitState.HALF_OPEN:
# Failure in HALF_OPEN -> back to OPEN
self.state = CircuitState.OPEN
logger.warning("Circuit breaker reopened after failure in HALF_OPEN")
elif self.failures >= self.failure_threshold:
# Too many failures -> OPEN
self.state = CircuitState.OPEN
logger.error(
"Circuit breaker opened",
failures=self.failures,
threshold=self.failure_threshold
)
# Usage Example
payment_circuit = CircuitBreaker(
failure_threshold=5,
reset_timeout=30.0,
half_open_max_calls=3
)
async def process_payment(order_id: str):
"""Process payment with circuit breaker protection"""
try:
return await payment_circuit.call(
payment_service.charge,
order_id
)
except CircuitBreakerError:
logger.warning("Payment service unavailable (circuit open)")
# Return to fallback or queue for later
await queue_payment_for_retry(order_id)
raise ServiceUnavailableError("Payment processing temporarily unavailable")
Graceful Degradation¶
Feature Flags and Fallbacks¶
Maintain Availability
Instead of complete failure, provide reduced functionality when systems are degraded.
from enum import Enum
from typing import Optional, Callable, Any
class FeatureState(Enum):
ENABLED = "enabled"
DISABLED = "disabled"
DEGRADED = "degraded"
class FeatureFlag:
def __init__(
self,
name: str,
default_state: FeatureState = FeatureState.ENABLED,
fallback_function: Optional[Callable] = None
):
self.name = name
self.state = default_state
self.fallback = fallback_function
self.error_count = 0
self.last_error_time = None
def should_degrade(self) -> bool:
"""Check if feature should enter degraded mode"""
# Degrade after 5 errors
if self.error_count >= 5:
return True
# Degrade if recent errors
if self.last_error_time:
time_since_error = (
datetime.now() - self.last_error_time
).total_seconds()
return time_since_error < 60
return False
async def execute(
self,
main_function: Callable,
*args,
**kwargs
) -> Any:
"""Execute feature with fallback support"""
if self.state == FeatureState.DISABLED:
raise FeatureDisabledError(f"Feature {self.name} is disabled")
try:
# Use fallback if degraded
if self.should_degrade() and self.fallback:
logger.warning(
f"Feature {self.name} degraded, using fallback"
)
return await self.fallback(*args, **kwargs)
# Normal execution
result = await main_function(*args, **kwargs)
# Reset error count on success
if self.error_count > 0:
self.error_count = max(0, self.error_count - 1)
return result
except Exception as e:
self.error_count += 1
self.last_error_time = datetime.now()
# Try fallback if available
if self.fallback:
logger.error(
f"Error in feature {self.name}, using fallback",
error=str(e)
)
return await self.fallback(*args, **kwargs)
raise
# Usage Example
async def get_user_recommendations(user_id: str):
"""Get personalized recommendations with fallback"""
# Fallback: popular items instead of personalized
async def fallback_recommendations(user_id: str):
return await get_popular_items(limit=10)
recommendations_feature = FeatureFlag(
name="user_recommendations",
fallback_function=fallback_recommendations
)
async def get_ml_recommendations(user_id: str):
# Complex ML-based recommendation logic
return await ml_service.get_recommendations(user_id)
return await recommendations_feature.execute(
get_ml_recommendations,
user_id
)
Error Response Patterns¶
User-Facing Error Messages¶
Security First
Never expose internal system details, stack traces, or sensitive information to users.
Error Response Structure:
{
"error": {
"code": "PAYMENT_FAILED",
"message": "Unable to process payment",
"details": {
"reason": "Insufficient funds",
"action": "Please add funds and try again"
},
"request_id": "req_abc123",
"timestamp": "2025-10-23T14:30:00Z"
}
}
Implementation¶
from typing import Dict, Any, Optional
from datetime import datetime
class ErrorResponseBuilder:
@staticmethod
def build_response(
error: ApplicationError,
request_id: str,
include_details: bool = True
) -> Dict[str, Any]:
"""Build standardized error response"""
response = {
"error": {
"code": error.error_code,
"message": error.message,
"request_id": request_id,
"timestamp": datetime.utcnow().isoformat()
}
}
# Only include details if appropriate
if include_details and error.details:
# Sanitize details for user consumption
response["error"]["details"] = ErrorResponseBuilder._sanitize_details(
error.details
)
return response
@staticmethod
def _sanitize_details(details: Dict) -> Dict:
"""Remove sensitive information from error details"""
safe_keys = {'reason', 'action', 'field', 'constraint'}
return {
k: v for k, v in details.items()
if k in safe_keys
}
# Usage in API handler
@app.exception_handler(ApplicationError)
async def handle_application_error(request, exc: ApplicationError):
response = ErrorResponseBuilder.build_response(
error=exc,
request_id=request.state.request_id,
include_details=True
)
# Log internally with full context
logger.error(
"Application error occurred",
error_code=exc.error_code,
category=exc.category.value,
severity=exc.severity.value,
details=exc.details,
request_id=request.state.request_id
)
# Determine HTTP status code
status_code = {
ErrorCategory.VALIDATION: 400,
ErrorCategory.BUSINESS_LOGIC: 422,
ErrorCategory.SECURITY: 403,
ErrorCategory.INTEGRATION: 503,
ErrorCategory.INFRASTRUCTURE: 503
}.get(exc.category, 500)
return JSONResponse(
content=response,
status_code=status_code
)
Error Handling Best Practices¶
- Classify errors into appropriate categories
- Implement retry logic for transient failures
- Use circuit breakers for external dependencies
- Provide fallback mechanisms for critical features
- Return clear, actionable error messages
- Log errors with full context
- Never expose sensitive data in errors
- Use appropriate HTTP status codes
- Track error rates and patterns
- Implement error budgets for SLOs
Debugging Techniques and Tools¶
IDE Debugging¶
Development Environment Setup¶
Master Your Tools
Effective debugging starts with properly configured development tools. Invest time in learning your IDE's debugging capabilities.
VS Code Debug Configuration¶
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"justMyCode": false,
"env": {
"PYTHONPATH": "${workspaceFolder}",
"DEBUG": "true"
}
},
{
"name": "Python: FastAPI",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"main:app",
"--reload",
"--port",
"8000"
],
"jinja": true
},
{
"name": "Python: Pytest",
"type": "python",
"request": "launch",
"module": "pytest",
"args": [
"${file}",
"-v",
"-s"
]
}
]
}
{
"version": "0.2.0",
"configurations": [
{
"name": "Node: Current File",
"type": "node",
"request": "launch",
"program": "${file}",
"skipFiles": ["<node_internals>/**"],
"outFiles": ["${workspaceFolder}/dist/**/*.js"],
"sourceMaps": true
},
{
"name": "Node: Express Server",
"type": "node",
"request": "launch",
"program": "${workspaceFolder}/server.js",
"restart": true,
"runtimeExecutable": "nodemon",
"console": "integratedTerminal"
},
{
"name": "Node: Jest Tests",
"type": "node",
"request": "launch",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"args": ["--runInBand", "--no-cache"],
"console": "integratedTerminal"
}
]
}
{
"version": "0.2.0",
"configurations": [
{
"type": "java",
"name": "Debug (Launch) - Current File",
"request": "launch",
"mainClass": "${file}"
},
{
"type": "java",
"name": "Debug Spring Boot",
"request": "launch",
"mainClass": "com.company.Application",
"projectName": "my-project",
"args": "--spring.profiles.active=dev"
}
]
}
Strategic Breakpoint Placement¶
Breakpoint Strategy
Place breakpoints at critical decision points, not every line. Focus on where state changes or decisions are made.
Effective Breakpoint Locations:
def process_order(order: Order) -> OrderResult:
# Breakpoint 1: Verify input
validate_order(order) # Check validation logic
# Breakpoint 2: Before external call
result = payment_processor.charge(order) # Check before payment
# Breakpoint 3: After business logic
updated_order = update_order_status(result) # Verify state change
# Breakpoint 4: Before return
return create_response(updated_order) # Check final output
Conditional Breakpoints:
# Break only when specific conditions are met
for item in items:
# Condition: item.price > 1000
process_item(item) # Breakpoint here with condition
# Condition: user_id == "debug_user"
user_data = fetch_user(user_id) # Breakpoint with user condition
Watch Expressions¶
Track key variables during execution:
| Expression | Purpose |
|---|---|
len(items) | Monitor collection size |
total_amount > threshold | Watch for threshold crossing |
error_count | Track error accumulation |
user.is_authenticated | Monitor auth state |
response.status_code | Track API responses |
Browser Developer Tools¶
Console Debugging Patterns¶
Advanced Console Usage
Modern browsers provide powerful console APIs beyond simple console.log.
// Group related logs
console.group('User Authentication');
console.log('Username:', username);
console.time('loginDuration');
// Display data in table format
console.table(userData);
// Show call stack
console.trace('Auth flow');
console.timeEnd('loginDuration');
console.groupEnd();
// Conditional logging
console.assert(user.isValid, 'User validation failed', user);
// Count occurrences
console.count('API calls');
console.countReset('API calls');
// Performance markers
performance.mark('startOperation');
// ... operation code ...
performance.mark('endOperation');
performance.measure('operationDuration', 'startOperation', 'endOperation');
Network Debugging¶
Request Inspection:
// Enhanced fetch with debugging
async function debugFetch(url, options = {}) {
console.group(`${options.method || 'GET'} ${url}`);
console.time('request');
try {
const response = await fetch(url, options);
console.log('Status:', response.status);
console.log('Headers:', Object.fromEntries(response.headers));
const data = await response.json();
console.table(data);
console.timeEnd('request');
console.groupEnd();
return data;
} catch (error) {
console.error('Request failed:', error);
console.timeEnd('request');
console.groupEnd();
throw error;
}
}
// Usage
const userData = await debugFetch('/api/user/123');
Performance Profiling¶
Memory Leak Detection:
// Take heap snapshots
if (window.performance.memory) {
console.log('Memory usage:', {
totalJSHeapSize: window.performance.memory.totalJSHeapSize / 1048576 + ' MB',
usedJSHeapSize: window.performance.memory.usedJSHeapSize / 1048576 + ' MB',
jsHeapSizeLimit: window.performance.memory.jsHeapSizeLimit / 1048576 + ' MB'
});
}
// Profile function execution
console.profile('heavyComputation');
heavyComputation();
console.profileEnd('heavyComputation');
Production Debugging¶
Safe Production Debugging Practices¶
Production Safety
Production debugging requires extreme caution. Never compromise security or stability.
Safety Checklist:
- Use feature flags to enable/disable debug mode
- Implement automatic timeout for debug sessions
- Sanitize all logged data
- Monitor performance impact
- Maintain audit trails
- Require authorization for debug access
Diagnostic Logging Framework¶
import logging
import contextvars
import time
from typing import Optional, Dict, Any
request_id = contextvars.ContextVar('request_id', default=None)
class DiagnosticLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.start_time = None
def start_operation(self, operation_name: str, **context):
"""Start timing an operation with context"""
self.start_time = time.time()
self.logger.debug(
f"Starting {operation_name}",
extra={
'operation': operation_name,
'request_id': request_id.get(),
**context
}
)
def end_operation(self, operation_name: str, **context):
"""End timing with results"""
if self.start_time:
duration = time.time() - self.start_time
self.logger.debug(
f"Completed {operation_name}",
extra={
'operation': operation_name,
'duration_ms': duration * 1000,
'request_id': request_id.get(),
**context
}
)
def debug_state(self, obj: object, attributes: list):
"""Log object state for debugging"""
state = {
attr: getattr(obj, attr, None)
for attr in attributes
}
self.logger.debug(
f"State: {obj.__class__.__name__}",
extra={
'object_type': obj.__class__.__name__,
'state': state,
'request_id': request_id.get()
}
)
# Usage Example
logger = DiagnosticLogger(__name__)
def process_payment(payment_data: Dict):
logger.start_operation('payment_processing',
amount=payment_data['amount'])
try:
# Processing logic
result = charge_payment(payment_data)
logger.debug_state(result, ['status', 'transaction_id', 'amount'])
logger.end_operation('payment_processing',
status='success',
transaction_id=result.transaction_id)
return result
except Exception as e:
logger.end_operation('payment_processing',
status='failed',
error=str(e))
raise
Remote Debugging Setup¶
import debugpy
from contextlib import contextmanager
import logging
from datetime import datetime, timedelta
class SecureRemoteDebugger:
def __init__(self, host: str = 'localhost', port: int = 5678):
self.host = host
self.port = port
self.logger = logging.getLogger(__name__)
self.session_timeout = timedelta(minutes=30)
def setup(self, require_auth: bool = True):
"""Configure remote debugging with security"""
try:
if require_auth:
# Implement authentication here
if not self._authenticate():
raise PermissionError("Authentication required")
debugpy.listen((self.host, self.port))
self.logger.info(
f"Debug server listening on {self.host}:{self.port}"
)
# Log security event
self._audit_log("debug_server_started")
except Exception as e:
self.logger.error(f"Failed to start debug server: {e}")
raise
@contextmanager
def debug_session(self, session_id: str, timeout: int = 300):
"""Create temporary debugging session with timeout"""
try:
self.logger.info(f"Starting debug session: {session_id}")
self._audit_log("debug_session_started", session_id=session_id)
# Wait for debugger with timeout
debugpy.wait_for_client(timeout)
yield
finally:
self.logger.info(f"Ending debug session: {session_id}")
self._audit_log("debug_session_ended", session_id=session_id)
debugpy.disconnect()
def _authenticate(self) -> bool:
"""Authenticate debug session"""
# Implement actual authentication logic
return True
def _audit_log(self, event: str, **context):
"""Log security audit events"""
self.logger.info(
f"Security Audit: {event}",
extra={
'event_type': 'security_audit',
'event': event,
'timestamp': datetime.utcnow().isoformat(),
**context
}
)
# Usage Example (with feature flag)
if feature_flags.is_enabled('remote_debugging'):
debugger = SecureRemoteDebugger()
debugger.setup(require_auth=True)
with debugger.debug_session('incident_investigation_123'):
# Debug critical operation
investigate_issue()
Performance Profiling¶
CPU Profiling¶
import cProfile
import pstats
import io
from functools import wraps
from typing import Callable, Any
class CodeProfiler:
def __init__(self, enabled: bool = True):
self.enabled = enabled
self.profiler = cProfile.Profile()
def profile(self, func: Callable) -> Callable:
"""Decorator for profiling functions"""
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
if not self.enabled:
return func(*args, **kwargs)
self.profiler.enable()
try:
result = func(*args, **kwargs)
return result
finally:
self.profiler.disable()
self._print_stats()
return wrapper
def _print_stats(self):
"""Print profiling statistics"""
s = io.StringIO()
stats = pstats.Stats(self.profiler, stream=s)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
print("\n" + "="*80)
print("PROFILING RESULTS")
print("="*80)
print(s.getvalue())
# Usage Example
profiler = CodeProfiler()
@profiler.profile
def expensive_operation(data):
# Complex computation
result = process_large_dataset(data)
return result
Memory Profiling¶
import tracemalloc
from typing import Dict, List
class MemoryProfiler:
def __init__(self):
self.snapshot = None
self.previous_snapshot = None
def start_tracking(self):
"""Begin memory tracking"""
tracemalloc.start()
self.snapshot = tracemalloc.take_snapshot()
logger.info("Memory tracking started")
def analyze_memory(self) -> Dict:
"""Analyze memory usage changes"""
self.previous_snapshot = self.snapshot
self.snapshot = tracemalloc.take_snapshot()
# Compare snapshots
stats = self.snapshot.compare_to(
self.previous_snapshot,
'lineno'
)
analysis = {
"total_increase_kb": sum(
stat.size_diff for stat in stats if stat.size_diff > 0
) / 1024,
"top_increases": []
}
# Top 10 memory increases
for stat in stats[:10]:
if stat.size_diff > 0:
analysis["top_increases"].append({
"location": str(stat.traceback),
"size_increase_kb": stat.size_diff / 1024,
"count_increase": stat.count_diff
})
logger.info(
"Memory analysis",
extra=analysis
)
return analysis
def stop_tracking(self):
"""Stop memory tracking"""
tracemalloc.stop()
logger.info("Memory tracking stopped")
# Usage Example
memory_profiler = MemoryProfiler()
memory_profiler.start_tracking()
# Run operations
process_large_dataset()
# Analyze
analysis = memory_profiler.analyze_memory()
memory_profiler.stop_tracking()
Debugging Best Practices¶
Systematic Investigation¶
Follow a Process
Debugging is most effective when you follow a systematic approach rather than random trial and error.
Debugging Workflow:
graph TD
A[Observe Issue] --> B[Form Hypothesis]
B --> C[Gather Evidence]
C --> D{Hypothesis Correct?}
D -->|No| B
D -->|Yes| E[Implement Fix]
E --> F[Verify Solution]
F --> G{Issue Resolved?}
G -->|No| B
G -->|Yes| H[Document Finding] Debug Session Documentation¶
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any
@dataclass
class DebugSession:
issue_id: str
start_time: datetime = field(default_factory=datetime.now)
hypothesis: str = ""
evidence: List[Dict] = field(default_factory=list)
steps: List[Dict] = field(default_factory=list)
resolution: str = ""
def set_hypothesis(self, hypothesis: str):
"""Document initial hypothesis"""
self.hypothesis = hypothesis
self.log_step("Hypothesis formed", {"hypothesis": hypothesis})
def add_evidence(self, description: str, data: Dict[str, Any]):
"""Collect supporting evidence"""
evidence_entry = {
"timestamp": datetime.now(),
"description": description,
"data": data
}
self.evidence.append(evidence_entry)
self.log_step("Evidence collected", evidence_entry)
def log_step(self, action: str, details: Dict):
"""Log debugging step"""
self.steps.append({
"timestamp": datetime.now(),
"action": action,
"details": details
})
def export_session(self) -> Dict:
"""Export session for documentation"""
return {
"issue_id": self.issue_id,
"start_time": self.start_time.isoformat(),
"duration": (datetime.now() - self.start_time).total_seconds(),
"hypothesis": self.hypothesis,
"evidence_count": len(self.evidence),
"steps_count": len(self.steps),
"steps": self.steps,
"resolution": self.resolution
}
# Usage Example
session = DebugSession(issue_id="ISSUE-123")
session.set_hypothesis("Payment timeout due to database connection pool exhaustion")
# Gather evidence
session.add_evidence(
"Database connection metrics",
{
"active_connections": 95,
"max_connections": 100,
"wait_count": 15
}
)
# Log steps
session.log_step("Increased connection pool size", {"from": 100, "to": 150})
session.add_evidence("After pool increase", {"active_connections": 78, "wait_count": 0})
# Document resolution
session.resolution = "Increased connection pool size and added connection timeout alerts"
final_report = session.export_session()
Common Debugging Scenarios¶
Scenario 1: Intermittent Failures¶
Problem
Feature works sometimes but fails intermittently
Debugging Approach:
-
Add comprehensive logging
-
Check for race conditions
-
Monitor resource states
-
Implement retry with logging
Scenario 2: Performance Degradation¶
Problem
Application response time increasing over time
Debugging Approach:
import time
import psutil
import gc
class PerformanceMonitor:
def __init__(self):
self.metrics = []
def capture_metrics(self, operation: str):
"""Capture current performance metrics"""
process = psutil.Process()
metrics = {
"timestamp": time.time(),
"operation": operation,
"memory_mb": process.memory_info().rss / 1024 / 1024,
"cpu_percent": process.cpu_percent(),
"thread_count": process.num_threads(),
"open_files": len(process.open_files()),
"connections": len(process.connections())
}
self.metrics.append(metrics)
# Log if concerning
if metrics["memory_mb"] > 1000: # > 1GB
logger.warning("High memory usage", **metrics)
return metrics
def analyze_trends(self):
"""Analyze metric trends"""
if len(self.metrics) < 2:
return
first = self.metrics[0]
last = self.metrics[-1]
memory_growth = last["memory_mb"] - first["memory_mb"]
time_elapsed = last["timestamp"] - first["timestamp"]
if memory_growth > 100: # >100MB growth
logger.error(
"Memory leak suspected",
memory_growth_mb=memory_growth,
time_elapsed_sec=time_elapsed,
growth_rate_mb_per_min=memory_growth / (time_elapsed / 60)
)
# Suggest garbage collection
if memory_growth > 50:
gc.collect()
logger.info("Triggered garbage collection")
# Usage
monitor = PerformanceMonitor()
@app.middleware("http")
async def performance_monitoring(request, call_next):
monitor.capture_metrics(f"{request.method} {request.url.path}")
response = await call_next(request)
monitor.analyze_trends()
return response
Scenario 3: Production Data Issues¶
Problem
Issue only reproducible with production data
Safe Production Investigation:
class ProductionDebugger:
def __init__(self):
self.debug_enabled = False
self.target_user_ids = set()
def enable_for_user(self, user_id: str, duration_minutes: int = 30):
"""Enable debugging for specific user"""
self.target_user_ids.add(user_id)
self.debug_enabled = True
# Schedule auto-disable
asyncio.create_task(
self._auto_disable(user_id, duration_minutes)
)
logger.info(
"Debug enabled for user",
user_id=user_id,
duration_minutes=duration_minutes
)
async def _auto_disable(self, user_id: str, minutes: int):
"""Automatically disable after timeout"""
await asyncio.sleep(minutes * 60)
self.target_user_ids.discard(user_id)
logger.info("Debug auto-disabled for user", user_id=user_id)
def should_debug(self, user_id: str) -> bool:
"""Check if debugging enabled for user"""
return self.debug_enabled and user_id in self.target_user_ids
def debug_operation(self, user_id: str, operation: str, data: Dict):
"""Conditionally log debug information"""
if self.should_debug(user_id):
# Sanitize sensitive data
safe_data = self._sanitize(data)
logger.debug(
f"DEBUG: {operation}",
user_id=user_id,
data=safe_data
)
def _sanitize(self, data: Dict) -> Dict:
"""Remove sensitive fields"""
sensitive_keys = {'password', 'token', 'ssn', 'credit_card'}
return {
k: '***REDACTED***' if k in sensitive_keys else v
for k, v in data.items()
}
# Usage
prod_debugger = ProductionDebugger()
# Enable for specific user
prod_debugger.enable_for_user("user_123", duration_minutes=15)
# In your code
def process_order(user_id: str, order_data: Dict):
prod_debugger.debug_operation(user_id, "process_order", order_data)
# Normal processing
result = process(order_data)
prod_debugger.debug_operation(user_id, "order_result", result)
return result
Debugging Tools Comparison¶
| Tool | Best For | Pros | Cons |
|---|---|---|---|
| IDE Debugger | Development | Interactive, full control | Not for production |
| Structured Logs | All environments | Always available | Requires planning |
| Remote Debugger | Staging/Production | Real environment | Security risk |
| APM Tools | Production monitoring | Automatic instrumentation | Cost, overhead |
| Profilers | Performance issues | Detailed metrics | Performance impact |
| Browser DevTools | Frontend issues | Built-in, powerful | Browser-only |
Emergency Debugging Checklist¶
When facing a critical production issue:
- Enable enhanced logging for affected component
- Check recent deployments and changes
- Review error rates and patterns in monitoring
- Inspect system resources (CPU, memory, disk, network)
- Check external dependencies status
- Review recent data changes that might trigger bugs
- Enable debugging for specific users if needed
- Collect diagnostic information before making changes
- Document findings in real-time
- Set up alerts to prevent recurrence
Advanced Debugging Techniques¶
Conditional Compilation for Debug Code¶
import os
from typing import Any, Callable
DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'
def debug_only(func: Callable) -> Callable:
"""Decorator to execute function only in debug mode"""
if DEBUG:
return func
else:
return lambda *args, **kwargs: None
@debug_only
def validate_assumptions(data: Any):
"""Expensive validation only in debug mode"""
assert isinstance(data, dict), "Data must be dict"
assert 'id' in data, "Data must have id"
# More expensive checks...
# Usage - no-op in production, runs in debug
validate_assumptions(user_data)
Debugging Decorators¶
from functools import wraps
import inspect
def debug_calls(func):
"""Log all function calls with arguments"""
@wraps(func)
def wrapper(*args, **kwargs):
# Get argument names
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
logger.debug(
f"Calling {func.__name__}",
arguments=dict(bound_args.arguments)
)
try:
result = func(*args, **kwargs)
logger.debug(
f"{func.__name__} returned",
result=result
)
return result
except Exception as e:
logger.error(
f"{func.__name__} raised exception",
exception=str(e)
)
raise
return wrapper
# Usage
@debug_calls
def calculate_discount(price: float, discount_rate: float) -> float:
return price * (1 - discount_rate)
Debugging Best Practices Summary¶
Debugging Effectively
- Start with logs - Check existing logs before adding breakpoints
- Form hypothesis - Don't debug randomly, have a theory
- Collect evidence - Gather data to support/refute hypothesis
- Use appropriate tools - IDE for dev, logs for production
- Document findings - Help future debugging efforts
- Fix root cause - Don't just patch symptoms
- Add tests - Prevent regression
- Improve observability - Make future debugging easier
Incident Management and Response¶
Severity Levels and Classification¶
Incident Severity Definitions¶
Clear Classification
Well-defined severity levels ensure appropriate resource allocation and response times.
| Severity | Impact | Response Time | Examples |
|---|---|---|---|
| P0 (Critical) | Complete service outage Data loss/corruption Security breach | < 15 minutes | Production database down Payment system offline Data breach detected |
| P1 (High) | Major feature unavailable Significant degradation Revenue impact | < 30 minutes | Authentication failing Orders not processing API errors > 25% |
| P2 (Medium) | Non-critical feature down Minor performance issues Small user subset affected | < 2 hours | Search not working Email delays Non-critical API slow |
| P3 (Low) | Cosmetic issues Minor bugs No user impact | < 24 hours | UI formatting issues Non-critical logs Documentation errors |
Classification Decision Tree¶
graph TD
A[Incident Detected] --> B{Service Available?}
B -->|No| C[P0 - Critical]
B -->|Yes| D{Major Feature Down?}
D -->|Yes| E{Revenue Impact?}
E -->|Yes| F[P1 - High]
E -->|No| G{User Impact?}
G -->|High| F
G -->|Low| H[P2 - Medium]
D -->|No| I{Performance Issue?}
I -->|Severe| H
I -->|Minor| J[P3 - Low] Severity Classification Implementation¶
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional
class IncidentSeverity(Enum):
P0 = ("P0", "Critical", timedelta(minutes=15))
P1 = ("P1", "High", timedelta(minutes=30))
P2 = ("P2", "Medium", timedelta(hours=2))
P3 = ("P3", "Low", timedelta(hours=24))
def __init__(self, code: str, label: str, response_time: timedelta):
self.code = code
self.label = label
self.response_time = response_time
class SeverityClassifier:
@staticmethod
def classify(
service_available: bool,
error_rate: float,
affected_users: int,
revenue_impact: bool,
security_issue: bool
) -> IncidentSeverity:
"""Automatically classify incident severity"""
# Critical conditions
if not service_available or security_issue:
return IncidentSeverity.P0
# High severity conditions
if error_rate > 0.25 or revenue_impact:
return IncidentSeverity.P1
# Medium severity conditions
if error_rate > 0.10 or affected_users > 100:
return IncidentSeverity.P2
# Default to low
return IncidentSeverity.P3
@staticmethod
def should_escalate(
incident_age: timedelta,
current_severity: IncidentSeverity,
resolved: bool
) -> bool:
"""Check if incident should be escalated"""
if resolved:
return False
# Escalate if not resolved within response time
return incident_age > current_severity.response_time
# Usage Example
severity = SeverityClassifier.classify(
service_available=True,
error_rate=0.30,
affected_users=500,
revenue_impact=True,
security_issue=False
) # Returns P1
Incident Response Procedures¶
Response Workflow¶
Incident Lifecycle
Every incident follows a structured lifecycle from detection to resolution.
Incident States:
stateDiagram-v2
[*] --> Detected
Detected --> Investigating: Responder assigned
Investigating --> Identified: Root cause found
Identified --> Fixing: Fix being implemented
Fixing --> Resolved: Fix deployed
Resolved --> Monitoring: Verifying stability
Monitoring --> Closed: Stable for 24h
Monitoring --> Investigating: Issue recurs
Closed --> [*] Incident Manager Implementation¶
from typing import Dict, List, Optional
import uuid
from datetime import datetime
class IncidentStatus(Enum):
DETECTED = "detected"
INVESTIGATING = "investigating"
IDENTIFIED = "identified"
FIXING = "fixing"
RESOLVED = "resolved"
MONITORING = "monitoring"
CLOSED = "closed"
@dataclass
class Incident:
id: str
title: str
severity: IncidentSeverity
status: IncidentStatus
description: str
affected_services: List[str]
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None
timeline: List[Dict] = None
def __post_init__(self):
if self.timeline is None:
self.timeline = []
class IncidentManager:
def __init__(self):
self.incidents: Dict[str, Incident] = {}
self.notification_service = NotificationService()
async def create_incident(
self,
title: str,
severity: IncidentSeverity,
description: str,
affected_services: List[str]
) -> str:
"""Create and initialize new incident"""
incident_id = f"INC-{uuid.uuid4().hex[:8].upper()}"
incident = Incident(
id=incident_id,
title=title,
severity=severity,
status=IncidentStatus.DETECTED,
description=description,
affected_services=affected_services,
created_at=datetime.now(),
updated_at=datetime.now()
)
self.incidents[incident_id] = incident
# Log incident creation
self._add_timeline_entry(
incident,
"Incident detected and created",
{"severity": severity.code}
)
# Trigger initial response
await self._trigger_initial_response(incident)
logger.critical(
f"{severity.code} Incident Created",
incident_id=incident_id,
title=title,
severity=severity.code,
affected_services=affected_services
)
return incident_id
async def _trigger_initial_response(self, incident: Incident):
"""Initiate incident response procedures"""
# Get on-call team
responders = await self._get_on_call_team(
incident.severity,
incident.affected_services
)
# Send notifications
await self._send_notifications(incident, responders)
# Auto-assign primary responder
if responders:
await self.assign_incident(incident.id, responders[0])
# Start monitoring
await self._start_incident_monitoring(incident)
async def _send_notifications(
self,
incident: Incident,
responders: List[str]
):
"""Send notifications through appropriate channels"""
notification = {
"incident_id": incident.id,
"title": incident.title,
"severity": incident.severity.code,
"affected_services": incident.affected_services,
"link": f"https://incident-dashboard.company.com/{incident.id}"
}
# P0/P1 get all channels
if incident.severity in [IncidentSeverity.P0, IncidentSeverity.P1]:
await self.notification_service.send_pager(responders, notification)
await self.notification_service.send_sms(responders, notification)
await self.notification_service.send_slack(
channel="#incidents-critical",
message=notification
)
else:
# P2/P3 get Slack + email
await self.notification_service.send_slack(
channel="#incidents",
message=notification
)
await self.notification_service.send_email(responders, notification)
async def update_status(
self,
incident_id: str,
new_status: IncidentStatus,
update_message: str,
updated_by: str
):
"""Update incident status with timeline entry"""
incident = self.incidents.get(incident_id)
if not incident:
raise ValueError(f"Incident {incident_id} not found")
old_status = incident.status
incident.status = new_status
incident.updated_at = datetime.now()
# Add timeline entry
self._add_timeline_entry(
incident,
f"Status changed: {old_status.value} → {new_status.value}",
{
"message": update_message,
"updated_by": updated_by
}
)
# Handle status-specific actions
await self._handle_status_change(incident, old_status, new_status)
logger.info(
"Incident status updated",
incident_id=incident_id,
old_status=old_status.value,
new_status=new_status.value,
updated_by=updated_by
)
def _add_timeline_entry(
self,
incident: Incident,
event: str,
details: Optional[Dict] = None
):
"""Add entry to incident timeline"""
incident.timeline.append({
"timestamp": datetime.now().isoformat(),
"event": event,
"details": details or {}
})
async def _handle_status_change(
self,
incident: Incident,
old_status: IncidentStatus,
new_status: IncidentStatus
):
"""Execute actions based on status changes"""
# When resolved, start monitoring period
if new_status == IncidentStatus.RESOLVED:
await self._start_resolution_monitoring(incident)
# When closed, trigger post-mortem
elif new_status == IncidentStatus.CLOSED:
await self._trigger_postmortem_creation(incident)
# Usage Example
incident_mgr = IncidentManager()
# Create incident
incident_id = await incident_mgr.create_incident(
title="Payment Gateway Timeout",
severity=IncidentSeverity.P1,
description="Payment processing experiencing timeouts > 5s",
affected_services=["payment-gateway", "order-service"]
)
# Update status as investigation progresses
await incident_mgr.update_status(
incident_id=incident_id,
new_status=IncidentStatus.INVESTIGATING,
update_message="Platform team investigating database connection pool",
updated_by="name@atlancis.com"
)
Escalation Procedures¶
Escalation Paths¶
Know When to Escalate
Don't hesitate to escalate when needed. It's better to escalate early than to let an incident grow.
Escalation Triggers:
- Incident not acknowledged within response time
- Root cause not identified within 1 hour (P0/P1)
- Resolution attempts failing
- Scope or impact increasing
- Customer escalations
Escalation Implementation¶
@dataclass
class EscalationLevel:
level: int
roles: List[str]
notification_channels: List[str]
timeout_minutes: int
class EscalationManager:
def __init__(self):
self.escalation_paths = {
IncidentSeverity.P0: [
EscalationLevel(
level=1,
roles=["on-call-engineer"],
notification_channels=["pager", "sms", "slack"],
timeout_minutes=5
),
EscalationLevel(
level=2,
roles=["team-lead", "senior-engineer"],
notification_channels=["pager", "sms", "slack", "phone"],
timeout_minutes=10
),
EscalationLevel(
level=3,
roles=["engineering-manager", "cto"],
notification_channels=["phone", "sms"],
timeout_minutes=15
)
],
# Define paths for other severities...
}
async def check_escalation(self, incident: Incident):
"""Check if incident should be escalated"""
time_since_creation = datetime.now() - incident.created_at
current_level = self._get_current_escalation_level(incident)
path = self.escalation_paths.get(incident.severity, [])
if current_level < len(path):
next_level = path[current_level]
if time_since_creation.total_seconds() > (next_level.timeout_minutes * 60):
await self._escalate(incident, next_level)
async def _escalate(
self,
incident: Incident,
escalation_level: EscalationLevel
):
"""Execute escalation"""
logger.critical(
f"Escalating incident to level {escalation_level.level}",
incident_id=incident.id,
severity=incident.severity.code,
level=escalation_level.level,
roles=escalation_level.roles
)
# Notify escalation contacts
for role in escalation_level.roles:
contacts = await self._get_contacts_for_role(role)
await self._notify_escalation(
contacts,
incident,
escalation_level
)
Post-Mortem Analysis¶
Post-Mortem Template¶
Learn from Every Incident
Post-mortems are blameless learning opportunities, not finger-pointing sessions.
Standard Template:
# Post-Mortem: [Incident Title]
**Incident ID**: INC-12345678
**Date**: 2025-10-23
**Severity**: P1
**Duration**: 2h 15m
**Author**: name@atlancis.com
---
## Executive Summary
Brief 2-3 sentence summary of what happened and impact.
---
## Impact
**User Impact**:
- 45% of payment transactions failed
- Approximately 1,200 users affected
**Business Impact**:
- Estimated revenue loss: $25,000
- 127 customer support tickets
**Technical Impact**:
- Payment service degraded
- Order processing delayed by average 15 minutes
---
## Timeline
| Time | Event |
|------|-------|
| 14:00 | Alert triggered: Payment error rate > 10% |
| 14:05 | Incident declared (P1) |
| 14:15 | Root cause identified: Database connection pool exhaustion |
| 14:30 | Temporary fix deployed: Increased pool size |
| 15:45 | Permanent fix deployed: Connection leak patched |
| 16:15 | Monitoring period complete, incident closed |
---
## Root Cause Analysis
### Primary Cause
Database connection leak in payment service v2.3.1
### Contributing Factors
1. Insufficient connection pool monitoring
2. Load testing didn't catch the leak
3. Connection timeout too long (60s)
### Why It Wasn't Caught
- New code path only triggered under high load
- Integration tests used mocked database
- Staging environment has lower traffic
---
## Resolution
**Immediate Actions**:
- Increased connection pool size from 100 to 200
- Restarted payment service instances
**Permanent Fix**:
- Patched connection leak in ORM query
- Added connection pool metrics
- Reduced connection timeout to 10s
---
## Action Items
| Action | Owner | Due Date | Status |
|--------|-------|----------|--------|
| Add connection pool monitoring | Platform Team | 2025-10-25 | Done |
| Update integration tests | QA Team | 2025-10-27 | In Progress |
| Conduct load testing | DevOps | 2025-10-30 | Pending |
| Document connection best practices | name@ | 2025-10-26 | Done |
---
## Lessons Learned
**What Went Well**:
- Quick incident detection (<5min from issue start)
- Clear communication in incident channel
- Temporary fix deployed quickly
**What Could Be Improved**:
- Connection pool metrics should have existed
- Load testing should include connection lifecycle
- Staging environment should mirror production load
**Future Prevention**:
- Implement connection pool monitoring
- Add circuit breakers for database calls
- Improve load testing coverage
---
## Related Incidents
- INC-12340987: Similar connection pool issue (2024-08-15)
Post-Mortem Manager Implementation¶
class PostmortemManager:
def __init__(self):
self.postmortems: Dict[str, Dict] = {}
self.action_items: Dict[str, List] = {}
async def create_postmortem(
self,
incident: Incident,
facilitator: str
) -> str:
"""Create post-mortem from incident"""
postmortem_id = f"PM-{uuid.uuid4().hex[:8].upper()}"
postmortem = {
"id": postmortem_id,
"incident_id": incident.id,
"created_at": datetime.now(),
"due_date": datetime.now() + timedelta(days=5),
"facilitator": facilitator,
"status": "draft",
"sections": {
"summary": self._generate_summary(incident),
"impact": self._generate_impact_section(incident),
"timeline": self._format_timeline(incident.timeline),
"root_cause": "", # To be filled
"resolution": "", # To be filled
"action_items": [], # To be added
"lessons_learned": "" # To be filled
},
"contributors": [],
"reviews_required": ["team-lead", "engineering-manager"]
}
self.postmortems[postmortem_id] = postmortem
# Schedule post-mortem meeting
await self._schedule_postmortem_meeting(postmortem)
logger.info(
"Post-mortem created",
postmortem_id=postmortem_id,
incident_id=incident.id,
due_date=postmortem["due_date"].isoformat()
)
return postmortem_id
def _generate_summary(self, incident: Incident) -> str:
"""Generate executive summary"""
duration = incident.updated_at - incident.created_at
return f"""
{incident.severity.code} incident affecting {', '.join(incident.affected_services)}.
Duration: {self._format_duration(duration)}.
Status: {incident.status.value}.
"""
def _format_duration(self, duration: timedelta) -> str:
"""Format duration in human-readable format"""
hours = int(duration.total_seconds() // 3600)
minutes = int((duration.total_seconds() % 3600) // 60)
return f"{hours}h {minutes}m"
async def add_action_item(
self,
postmortem_id: str,
title: str,
description: str,
owner: str,
due_date: datetime,
priority: str
) -> str:
"""Add action item to post-mortem"""
action_item_id = f"AI-{uuid.uuid4().hex[:6].upper()}"
action_item = {
"id": action_item_id,
"title": title,
"description": description,
"owner": owner,
"due_date": due_date,
"priority": priority,
"status": "pending",
"created_at": datetime.now()
}
if postmortem_id not in self.action_items:
self.action_items[postmortem_id] = []
self.action_items[postmortem_id].append(action_item)
# Add to postmortem
postmortem = self.postmortems[postmortem_id]
postmortem["sections"]["action_items"].append(action_item)
# Create tracking ticket
await self._create_tracking_ticket(action_item)
logger.info(
"Action item added",
postmortem_id=postmortem_id,
action_item_id=action_item_id,
owner=owner
)
return action_item_id
async def _create_tracking_ticket(self, action_item: Dict):
"""Create JIRA/Linear ticket for action item"""
# Integration with project management tools
pass
# Usage Example
postmortem_mgr = PostmortemManager()
# Create post-mortem after incident
postmortem_id = await postmortem_mgr.create_postmortem(
incident=resolved_incident,
facilitator="alice@company.com"
)
# Add action items
await postmortem_mgr.add_action_item(
postmortem_id=postmortem_id,
title="Implement connection pool monitoring",
description="Add Prometheus metrics for database connection pool usage",
owner="platform-team",
due_date=datetime.now() + timedelta(days=7),
priority="high"
)
On-Call Management¶
On-Call Rotation Structure¶
Fair Distribution
Rotate on-call duties fairly to prevent burnout while maintaining 24/7 coverage.
Rotation Structure:
from datetime import datetime, timedelta
from typing import List, Dict, Optional
@dataclass
class OnCallSchedule:
team: str
primary: str
secondary: str
start_time: datetime
end_time: datetime
class OnCallManager:
def __init__(self):
self.schedules: Dict[str, List[OnCallSchedule]] = {}
self.handoff_notes: List[Dict] = []
def get_current_oncall(self, team: str) -> Dict[str, str]:
"""Get current on-call engineers"""
now = datetime.now()
schedule = self._find_active_schedule(team, now)
if not schedule:
raise ValueError(f"No active on-call schedule for {team}")
return {
"team": team,
"primary": schedule.primary,
"secondary": schedule.secondary,
"start_time": schedule.start_time.isoformat(),
"end_time": schedule.end_time.isoformat()
}
def _find_active_schedule(
self,
team: str,
timestamp: datetime
) -> Optional[OnCallSchedule]:
"""Find active schedule for timestamp"""
team_schedules = self.schedules.get(team, [])
for schedule in team_schedules:
if schedule.start_time <= timestamp < schedule.end_time:
return schedule
return None
async def perform_handoff(
self,
team: str,
from_engineer: str,
to_engineer: str,
notes: str,
active_incidents: List[str]
):
"""Document on-call handoff"""
handoff = {
"timestamp": datetime.now(),
"team": team,
"from": from_engineer,
"to": to_engineer,
"notes": notes,
"active_incidents": active_incidents,
"outstanding_issues": await self._get_outstanding_issues(team)
}
self.handoff_notes.append(handoff)
# Notify team
await self._notify_handoff(handoff)
logger.info(
"On-call handoff completed",
team=team,
from_engineer=from_engineer,
to_engineer=to_engineer,
active_incidents_count=len(active_incidents)
)
async def _notify_handoff(self, handoff: Dict):
"""Send handoff notification to team"""
message = f"""On-Call Handoff - {handoff['team']}
From: {handoff['from']}
To: {handoff['to']}
Active Incidents: {len(handoff['active_incidents'])}
Outstanding Issues: {len(handoff['outstanding_issues'])}
Notes: {handoff['notes']}
"""
# Send to team channel
await notification_service.send_slack(
channel=f"#{handoff['team']}-oncall",
message=message
)
On-Call Best Practices¶
On-Call Health
Maintain healthy on-call practices to ensure responder effectiveness and prevent burnout.
Guidelines:
- Rotation Length: 1 week maximum
- Backup Coverage: Always have secondary on-call
- Post-Incident Rest: Take time off after major incidents
- Handoff Protocol: Document active issues and concerns
- Compensation: Provide on-call compensation or time-off
- Load Balancing: Distribute incidents fairly across team
Incident Communication¶
Status Page Updates¶
class StatusPageManager:
def __init__(self):
self.status_page_api = StatusPageAPI()
self.incident_status_map = {
IncidentStatus.DETECTED: "investigating",
IncidentStatus.INVESTIGATING: "investigating",
IncidentStatus.IDENTIFIED: "identified",
IncidentStatus.FIXING: "monitoring",
IncidentStatus.RESOLVED: "resolved"
}
async def create_status_page_incident(
self,
incident: Incident
) -> str:
"""Create public-facing status page incident"""
# Sanitize information for public consumption
public_title = self._sanitize_title(incident.title)
public_description = self._create_public_message(incident)
status_incident = await self.status_page_api.create_incident(
name=public_title,
status=self.incident_status_map[incident.status],
impact=self._map_severity_to_impact(incident.severity),
components=incident.affected_services,
message=public_description
)
logger.info(
"Status page incident created",
incident_id=incident.id,
status_page_id=status_incident["id"]
)
return status_incident["id"]
async def update_status_page(
self,
status_page_id: str,
incident: Incident,
message: str
):
"""Post update to status page"""
await self.status_page_api.post_update(
incident_id=status_page_id,
status=self.incident_status_map[incident.status],
message=message
)
def _sanitize_title(self, title: str) -> str:
"""Remove internal jargon from title"""
# Replace internal service names with user-facing names
replacements = {
"payment-gateway": "Payment Processing",
"auth-service": "Login System",
"order-service": "Order Management"
}
sanitized = title
for internal, public in replacements.items():
sanitized = sanitized.replace(internal, public)
return sanitized
def _create_public_message(self, incident: Incident) -> str:
"""Create user-friendly incident message"""
return f"""
We are investigating an issue affecting {', '.join(incident.affected_services)}.
Our team is actively working on a resolution.
We will provide updates as we learn more.
"""
def _map_severity_to_impact(self, severity: IncidentSeverity) -> str:
"""Map internal severity to public impact level"""
mapping = {
IncidentSeverity.P0: "critical",
IncidentSeverity.P1: "major",
IncidentSeverity.P2: "minor",
IncidentSeverity.P3: "none"
}
return mapping.get(severity, "minor")
Internal Communication Templates¶
Communication Cadence
Regular updates prevent information vacuum and maintain stakeholder confidence.
Initial Notification Template:
**INCIDENT DECLARED** - [Severity]
**Incident ID**: INC-12345678
**Severity**: P1
**Status**: Investigating
**Affected Services**: Payment Gateway, Order Processing
**Summary**:
Users experiencing payment processing delays. Error rate elevated to 30%.
**Impact**:
- Approximately 500 users affected
- Payment completion time increased from 2s to 15s
**Current Actions**:
- Platform team investigating database connection issues
- Temporary rate limiting applied to stabilize service
**Next Update**: In 30 minutes or when status changes
**Incident Commander**: name@atlancis.com
**Communication Lead**: name@atlancis.com
Progress Update Template:
**INCIDENT UPDATE** - [Incident ID]
**Time**: 14:45 UTC
**Status**: Identified → Fixing
**Update**:
Root cause identified: Database connection pool exhaustion due to connection leak.
**Actions Taken**:
- Increased connection pool size (immediate mitigation)
- Identified problematic code path
- Deploying fix to production (ETA: 15:00 UTC)
**Current Impact**:
Error rate reduced from 30% to 8%
**Next Update**: 15:15 UTC or when resolved
Resolution Template:
**INCIDENT RESOLVED** - [Incident ID]
**Resolution Time**: 15:30 UTC
**Total Duration**: 2h 15m
**Final Status**:
Issue has been resolved. All services operating normally.
**Resolution**:
- Patched connection leak in payment service
- Connection pool metrics added for future monitoring
- Service fully restored at 15:30 UTC
**Impact Summary**:
- 1,200 users affected
- Estimated revenue impact: $25,000
- 127 support tickets created
**Next Steps**:
- Post-mortem scheduled for 2025-10-24
- Action items tracked in PM-ABC12345
**Questions**: Contact name@atlancis.com
Incident Metrics and Monitoring¶
Key Metrics to Track¶
Measure to Improve
Track incident metrics to identify trends and improve response effectiveness.
| Metric | Definition | Target |
|---|---|---|
| MTTD | Mean Time To Detect | < 5 minutes |
| MTTA | Mean Time To Acknowledge | < 15 minutes (P0/P1) |
| MTTI | Mean Time To Investigate | < 1 hour (P0/P1) |
| MTTR | Mean Time To Resolution | < 4 hours (P0/P1) |
| Incident Frequency | Incidents per week | Trending down |
| Repeat Incidents | Same root cause | < 5% |
Metrics Dashboard Implementation¶
from collections import defaultdict
from typing import List
class IncidentMetrics:
def __init__(self):
self.metrics = defaultdict(list)
def calculate_mttd(self, incidents: List[Incident]) -> float:
"""Calculate Mean Time To Detect"""
# Assumes detection time is tracked
detection_times = [
i.detection_time for i in incidents
if hasattr(i, 'detection_time')
]
return sum(detection_times) / len(detection_times) if detection_times else 0
def calculate_mttr(self, incidents: List[Incident]) -> float:
"""Calculate Mean Time To Resolution"""
resolution_times = [
(i.updated_at - i.created_at).total_seconds() / 60
for i in incidents
if i.status == IncidentStatus.CLOSED
]
return sum(resolution_times) / len(resolution_times) if resolution_times else 0
def get_incident_trends(
self,
incidents: List[Incident],
days: int = 30
) -> Dict:
"""Analyze incident trends"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_incidents = [
i for i in incidents
if i.created_at >= cutoff_date
]
# Group by severity
by_severity = defaultdict(int)
for incident in recent_incidents:
by_severity[incident.severity.code] += 1
# Group by affected service
by_service = defaultdict(int)
for incident in recent_incidents:
for service in incident.affected_services:
by_service[service] += 1
return {
"total_incidents": len(recent_incidents),
"by_severity": dict(by_severity),
"by_service": dict(by_service),
"mttr_minutes": self.calculate_mttr(recent_incidents),
"incidents_per_week": len(recent_incidents) / (days / 7)
}
def identify_repeat_incidents(
self,
incidents: List[Incident],
days: int = 90
) -> List[Dict]:
"""Identify recurring incidents"""
cutoff_date = datetime.now() - timedelta(days=days)
recent_incidents = [
i for i in incidents
if i.created_at >= cutoff_date
]
# Group by title/root cause
incident_groups = defaultdict(list)
for incident in recent_incidents:
# Simple grouping by title similarity
key = incident.title.lower()
incident_groups[key].append(incident)
# Find repeats
repeats = []
for title, group in incident_groups.items():
if len(group) > 1:
repeats.append({
"title": title,
"occurrences": len(group),
"incidents": [i.id for i in group],
"severity": group[0].severity.code
})
return sorted(repeats, key=lambda x: x["occurrences"], reverse=True)
# Usage Example
metrics = IncidentMetrics()
trends = metrics.get_incident_trends(all_incidents, days=30)
repeats = metrics.identify_repeat_incidents(all_incidents)
logger.info("Incident trends", **trends)
if repeats:
logger.warning("Repeat incidents detected", repeats=repeats)
Incident Response Checklist¶
During an Incident¶
- Declare incident with appropriate severity
- Assign incident commander to coordinate response
- Create communication channel (#incident-[id])
- Notify stakeholders based on severity
- Update status page if customer-facing
- Document timeline as events occur
- Communicate regularly (every 30min for P0/P1)
- Focus on mitigation before root cause
- Escalate if needed within response time
- Monitor for recurrence after resolution
After an Incident¶
- Update final status page message
- Close communication channels
- Schedule post-mortem meeting
- Create post-mortem document
- Identify action items with owners
- Track action items to completion
- Share learnings with broader team
- Update documentation and runbooks
- Review incident metrics
- Thank responders for their work
Incident Management Best Practices¶
Incident Management Excellence
- Prepare in advance - Have runbooks and procedures ready
- Communicate clearly - Keep stakeholders informed
- Act decisively - Make decisions quickly with available information
- Document everything - Timeline and actions are critical
- Focus on resolution - Root cause analysis comes after mitigation
- Learn from incidents - Conduct thorough post-mortems
- Track action items - Follow through on improvements
- Support responders - Provide rest and recognition
- Measure performance - Track metrics and trends
- Continuous improvement - Iterate on processes
Summary¶
This comprehensive guide covers the full spectrum of troubleshooting and error handling practices:
Logging: Build observable systems with structured logging, appropriate log levels, and contextual information that enables rapid diagnosis.
Error Handling: Implement systematic error classification, retry patterns with exponential backoff, circuit breakers for resilience, and graceful degradation strategies.
Debugging: Master IDE debugging tools, browser developer tools, production-safe debugging techniques, and performance profiling for identifying bottlenecks.
Incident Management: Establish clear severity levels, structured response procedures, escalation paths, comprehensive post-mortems, and healthy on-call practices.
Together, these practices create a robust foundation for maintaining system reliability, resolving issues quickly, and continuously improving operational excellence.
Last updated: October 2025