Skip to content

Troubleshooting and Error Handling

Section Overview

Comprehensive strategies for logging, error handling, debugging, and incident management that ensure system reliability and rapid problem resolution.


Quick Navigation

Logging Architecture

Build robust logging systems that provide complete observability

Topic Focus Area
Structured Logging Machine-parseable log formats
Log Levels Appropriate severity classification
Contextual Logging Request tracing and correlation

Error Management

Systematic approaches to handling and recovering from errors

Topic Focus Area
Error Classification Categorizing error types
Retry Patterns Resilient failure handling
Circuit Breakers Preventing cascade failures

Debugging Tools

Effective debugging techniques across different environments

Topic Focus Area
IDE Debugging Development environment tools
Production Debugging Safe production troubleshooting
Performance Profiling Identifying bottlenecks

Incident Management

Structured approaches to incident response and resolution

Topic Focus Area
Severity Levels Incident classification
Response Procedures Escalation and resolution
Post-mortems Learning from incidents

Core Principles

Observability First

Design systems with observability built-in from the start. Logs, metrics, and traces should be first-class citizens, not afterthoughts.

Fail Gracefully

Systems should degrade gracefully under failure conditions. Provide reduced functionality rather than complete outages whenever possible.

Learn and Improve

Every incident is a learning opportunity. Conduct thorough post-mortems and implement preventive measures systematically.


Key Concepts

The Three Pillars of Observability

Logs
Timestamped records of discrete events in your system
Metrics
Numerical measurements of system behavior over time
Traces
Records of request flows through distributed systems

Error Handling Hierarchy

graph TD
    A[Error Occurs] --> B{Can Recover?}
    B -->|Yes| C[Handle Locally]
    B -->|No| D[Propagate Up]
    C --> E[Log and Continue]
    D --> F{Critical?}
    F -->|Yes| G[Alert and Escalate]
    F -->|No| H[Log and Track]
    G --> I[Incident Response]
    H --> J[Monitor Patterns]

Getting Started

For New Team Members

  1. Start with Logging - Understand our structured logging approach
  2. Learn Error Patterns - Review common error scenarios and handling
  3. Practice Debugging - Set up your debugging environment
  4. Study Incidents - Review past incident post-mortems

For Experienced Developers

Jump directly to specific topics using the navigation tabs above or use the search function to find detailed guidance on particular scenarios.


Best Practices Summary

Practice Why It Matters
Structured Logging Enables automated analysis and alerting
Correlation IDs Traces requests across distributed systems
Graceful Degradation Maintains service availability during failures
Post-mortem Reviews Prevents incident recurrence
Monitoring Thresholds Detects issues before they become critical

Common Scenarios

Scenario: Intermittent API Failures

Problem: External API occasionally times out

Solution: Implement retry logic with exponential backoff + circuit breaker

Reference: Retry Patterns | Circuit Breakers

Scenario: High Memory Usage

Problem: Application memory consumption growing over time

Solution: Profile application, identify leaks, implement memory monitoring

Reference: Performance Profiling

Scenario: Production Issue Investigation

Problem: Need to debug production issue without disrupting service

Solution: Use structured logging, distributed tracing, and safe debugging techniques

Reference: Production Debugging


Logging Architecture and Best Practices

Core Logging Principles

Fundamental Principle: Implement a comprehensive logging system that provides complete system observability while maintaining performance and security.

Key Guidelines

  • Structure all logs in machine-parseable formats
  • Implement consistent logging patterns across services
  • Ensure logs provide full request context
  • Maintain security and compliance in log content
  • Optimize log storage and retention

Why This Matters

A well-designed logging architecture is fundamental for rapid problem diagnosis, performance monitoring, security auditing, compliance requirements, and system behavior analysis.


Structured Logging Implementation

Standard Log Entry Fields

Every log entry must include required fields for consistency and traceability.

Required Fields:

{
  "timestamp": "2024-12-15T10:30:45Z",
  "level": "ERROR",
  "service": "payment-service",
  "environment": "production",
  "requestId": "req-abc-123",
  "correlationId": "trace-xyz-789",
  "message": "Payment processing failed",
  "sourceLocation": {
    "file": "payment_processor.py",
    "line": 145,
    "function": "process_payment"
  }
}

Conditional Fields:

{
  "userId": "user_12345",
  "tenantId": "tenant_abc",
  "errorDetails": {
    "code": "PAYMENT_GATEWAY_ERROR",
    "type": "GatewayTimeoutError",
    "stack": "..."
  },
  "performance": {
    "duration": 2500,
    "memoryUsage": 128000000
  }
}

Implementation Examples

import structlog
import time
from typing import Dict, Any
from contextlib import contextmanager

class StructuredLogger:
    def __init__(self):
        self.logger = structlog.get_logger()
        self.context: Dict[str, Any] = {}

    def set_context(self, **kwargs):
        """Set context that will be included in all subsequent log entries"""
        self.context.update(kwargs)

    @contextmanager
    def operation_logger(self, operation_name: str):
        """Context manager for logging operation duration and status"""
        start_time = time.time()
        try:
            yield
            duration = (time.time() - start_time) * 1000
            self.logger.info(
                f"{operation_name}_completed",
                duration_ms=duration,
                status="success",
                **self.context
            )
        except Exception as e:
            duration = (time.time() - start_time) * 1000
            self.logger.error(
                f"{operation_name}_failed",
                duration_ms=duration,
                status="error",
                error_type=type(e).__name__,
                error_message=str(e),
                **self.context
            )
            raise

    def audit_log(self, action: str, resource: str, changes: Dict[str, Any]):
        """Special logging for audit events"""
        self.logger.info(
            "audit_event",
            action=action,
            resource=resource,
            changes=changes,
            timestamp=time.time(),
            **self.context
        )

# Usage Example
logger = StructuredLogger()

def process_order(order_id: str, user_id: str):
    logger.set_context(order_id=order_id, user_id=user_id)

    with logger.operation_logger("order_processing"):
        # Order processing logic here
        order = fetch_order(order_id)
        validate_order(order)
        process_payment(order)

    logger.audit_log(
        action="order_processed",
        resource=f"order/{order_id}",
        changes={"status": "completed", "processed_at": time.time()}
    )
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Instant;
import java.util.Map;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;

public class EnhancedLogger {
    private static final Logger logger = LogManager.getLogger();
    private static final ObjectMapper mapper = new ObjectMapper();
    private final ThreadLocal<Map<String, Object>> context = 
        ThreadLocal.withInitial(ConcurrentHashMap::new);

    public class LogBuilder {
        private final Map<String, Object> logData = new HashMap<>();

        public LogBuilder withField(String key, Object value) {
            logData.put(key, value);
            return this;
        }

        public LogBuilder withException(Exception e) {
            logData.put("errorType", e.getClass().getSimpleName());
            logData.put("errorMessage", e.getMessage());
            logData.put("stackTrace", e.getStackTrace());
            return this;
        }

        public void info(String message) {
            log("INFO", message, logData);
        }

        public void error(String message) {
            log("ERROR", message, logData);
        }

        private void log(String level, String message, Map<String, Object> data) {
            Map<String, Object> fullLogEntry = new HashMap<>(context.get());
            fullLogEntry.putAll(data);
            fullLogEntry.put("timestamp", Instant.now().toString());
            fullLogEntry.put("level", level);
            fullLogEntry.put("message", message);

            try {
                String jsonLog = mapper.writeValueAsString(fullLogEntry);
                if ("ERROR".equals(level)) {
                    logger.error(jsonLog);
                } else {
                    logger.info(jsonLog);
                }
            } catch (Exception e) {
                logger.error("Failed to serialize log entry", e);
            }
        }
    }

    public void setContext(String key, Object value) {
        context.get().put(key, value);
    }

    public LogBuilder log() {
        return new LogBuilder();
    }
}
const winston = require('winston');

class StructuredLogger {
  constructor() {
    this.logger = winston.createLogger({
      format: winston.format.combine(
        winston.format.timestamp(),
        winston.format.json()
      ),
      transports: [
        new winston.transports.Console(),
        new winston.transports.File({ filename: 'application.log' })
      ]
    });
    this.context = {};
  }

  setContext(key, value) {
    this.context[key] = value;
  }

  async withOperation(operationName, fn) {
    const startTime = Date.now();
    try {
      const result = await fn();
      const duration = Date.now() - startTime;

      this.logger.info({
        message: `${operationName}_completed`,
        duration,
        status: 'success',
        ...this.context
      });

      return result;
    } catch (error) {
      const duration = Date.now() - startTime;

      this.logger.error({
        message: `${operationName}_failed`,
        duration,
        status: 'error',
        errorType: error.constructor.name,
        errorMessage: error.message,
        ...this.context
      });

      throw error;
    }
  }
}

// Usage Example
const logger = new StructuredLogger();

async function processOrder(orderId, userId) {
  logger.setContext('orderId', orderId);
  logger.setContext('userId', userId);

  await logger.withOperation('order_processing', async () => {
    // Order processing logic
  });
}

Log Levels and Their Application

Comprehensive Log Level Guide

Understanding when to use each log level is crucial for effective system observability.

Level Purpose When to Use Example Scenarios
TRACE Extremely detailed debugging Function entry/exit, loop iterations trace("Entering calculateTotal", items=items)
DEBUG Detailed development info Database queries, API calls, cache operations debug("Query executed", query=sql, duration=ms)
INFO Normal application flow Service startup, user actions, scheduled tasks info("Application started", version=v, env=prod)
WARN Potentially harmful situations Deprecated features, resource limits, recoverable errors warn("Rate limit approaching", current=90, limit=100)
ERROR Error events allowing continuation Failed operations, integration errors error("Payment failed", order_id=id, error=msg)
FATAL Severe errors causing shutdown Database unavailable, critical resource failure fatal("Cannot initialize", reason=db_error)

Practical Examples by Log Level

logger.trace("Entering calculateTotal with items", items=items)
logger.trace("Loop iteration", iteration=i, currentSum=total)
logger.trace("Exiting calculateTotal with result", total=total)

Use for: Granular debugging during development

logger.debug("Database query executed",
    query=query_string,
    parameters=params,
    execution_time=duration)

logger.debug("Cache hit",
    key=cache_key,
    ttl_remaining=ttl)

Use for: Development and staging diagnostics

logger.info("Application started",
    version=app_version,
    environment=env,
    config=config_summary)

logger.info("User logged in",
    user_id=user.id,
    auth_method="oauth")

Use for: Production operational visibility

logger.warn("API rate limit approaching",
    current_rate=current,
    limit=max_limit,
    time_window="1 minute")

logger.warn("Deprecated API used",
    endpoint="/api/v1/users",
    replacement="/api/v2/users")

Use for: Potential issues requiring attention

logger.error("Payment processing failed",
    order_id=order.id,
    error_code=e.code,
    error_message=str(e),
    user_id=user.id)

Use for: Recoverable failures affecting operations

logger.fatal("Unable to initialize application",
    reason="Database connection failed",
    connection_string=masked_conn_string,
    retry_attempts=max_retries)

Use for: Critical failures requiring immediate intervention


Contextual Logging Implementation

Request Context Tracking

Core Principle: Capture and maintain request-level context throughout the entire transaction lifecycle.

import threading
import uuid
import time
from typing import Dict, Any, Optional

class RequestContextManager:
    def __init__(self):
        # Thread-local storage for context
        self._context = threading.local()

    def set_context(self, **kwargs):
        """Set context values for the current request/thread"""
        if not hasattr(self._context, 'data'):
            self._context.data = {}
        self._context.data.update(kwargs)

    def get_context(self, key: Optional[str] = None, default=None):
        """Retrieve context values"""
        if not hasattr(self._context, 'data'):
            return default
        return self._context.data.get(key, default) if key else self._context.data

    def clear_context(self):
        """Clear context for the current request/thread"""
        if hasattr(self._context, 'data'):
            del self._context.data

    def create_context_decorator(self):
        """Decorator to manage context for function calls"""
        def decorator(func):
            @functools.wraps(func)
            def wrapper(*args, **kwargs):
                request_id = str(uuid.uuid4())
                self.set_context(
                    request_id=request_id,
                    start_time=time.time()
                )
                try:
                    return func(*args, **kwargs)
                finally:
                    self.clear_context()
            return wrapper
        return decorator

# Global context manager
request_context = RequestContextManager()

# Usage example
@request_context.create_context_decorator()
def process_order(order_id):
    context = request_context.get_context()
    logger.info(f"Processing order {order_id}",
        extra={
            'request_id': context.get('request_id'),
            'start_time': context.get('start_time')
        })

Logging Best Practices

Performance Considerations

  • Use asynchronous logging for high-throughput systems
  • Implement log sampling for very verbose operations
  • Avoid logging in tight loops without sampling
  • Use structured logging to enable efficient log queries

Security Considerations

  • Never log sensitive data (passwords, tokens, PII)
  • Mask or hash identifiable information
  • Implement access controls for log data
  • Comply with data retention policies

Operational Guidelines

  • Centralize logs using tools like ELK, Splunk, or CloudWatch
  • Set up log rotation to manage disk space
  • Implement log level configuration without redeployment
  • Use correlation IDs to trace requests across services

Error Handling Strategies

Error Classification System

Error Categories

Classification Benefits

Proper error classification enables appropriate handling strategies, correct severity levels, and effective monitoring.

Category Description Example Response
Validation Input validation failures Invalid email format Return 400, clear message
Business Logic Business rule violations Insufficient balance Return 422, explain constraint
Integration External service errors API timeout Retry, then fallback
Security Authentication/authorization failures Invalid token Return 401/403, log attempt
Infrastructure System-level issues Database down Alert, return 503

Implementation Framework

from enum import Enum
from dataclasses import dataclass
from typing import Dict, Optional

class ErrorSeverity(Enum):
    CRITICAL = "CRITICAL"  # System unusable
    HIGH = "HIGH"         # Major functionality impacted
    MEDIUM = "MEDIUM"     # Partial functionality impacted
    LOW = "LOW"           # Minimal impact

class ErrorCategory(Enum):
    VALIDATION = "VALIDATION"
    BUSINESS_LOGIC = "BUSINESS_LOGIC"
    INTEGRATION = "INTEGRATION"
    SECURITY = "SECURITY"
    INFRASTRUCTURE = "INFRASTRUCTURE"

@dataclass
class ApplicationError(Exception):
    message: str
    error_code: str
    severity: ErrorSeverity
    category: ErrorCategory
    details: Optional[Dict] = None

    def to_dict(self) -> Dict:
        return {
            "error": {
                "message": self.message,
                "code": self.error_code,
                "severity": self.severity.value,
                "category": self.category.value,
                "details": self.details or {}
            }
        }

# Specific error types
class ValidationError(ApplicationError):
    def __init__(self, message: str, details: Optional[Dict] = None):
        super().__init__(
            message=message,
            error_code="VAL_001",
            severity=ErrorSeverity.LOW,
            category=ErrorCategory.VALIDATION,
            details=details
        )

class InsufficientBalanceError(ApplicationError):
    def __init__(self, required: float, available: float):
        super().__init__(
            message="Insufficient account balance",
            error_code="BUS_002",
            severity=ErrorSeverity.MEDIUM,
            category=ErrorCategory.BUSINESS_LOGIC,
            details={
                "required_amount": required,
                "available_balance": available,
                "shortfall": required - available
            }
        )

# Usage Example
def process_payment(order: Order, account: Account):
    if order.total > account.balance:
        raise InsufficientBalanceError(
            required=order.total,
            available=account.balance
        )

    try:
        result = payment_gateway.charge(order)
        return result
    except GatewayTimeout:
        raise ApplicationError(
            message="Payment gateway timeout",
            error_code="INT_001",
            severity=ErrorSeverity.HIGH,
            category=ErrorCategory.INTEGRATION,
            details={"order_id": order.id}
        )
class ApplicationError extends Error {
  constructor(message, errorCode, severity, category, details = {}) {
    super(message);
    this.errorCode = errorCode;
    this.severity = severity;
    this.category = category;
    this.details = details;
  }

  toJSON() {
    return {
      error: {
        message: this.message,
        code: this.errorCode,
        severity: this.severity,
        category: this.category,
        details: this.details
      }
    };
  }
}

class ValidationError extends ApplicationError {
  constructor(message, details = {}) {
    super(message, 'VAL_001', 'LOW', 'VALIDATION', details);
  }
}

class InsufficientBalanceError extends ApplicationError {
  constructor(required, available) {
    super(
      'Insufficient account balance',
      'BUS_002',
      'MEDIUM',
      'BUSINESS_LOGIC',
      {
        required_amount: required,
        available_balance: available,
        shortfall: required - available
      }
    );
  }
}

// Usage Example
async function processPayment(order, account) {
  if (order.total > account.balance) {
    throw new InsufficientBalanceError(order.total, account.balance);
  }

  try {
    const result = await paymentGateway.charge(order);
    return result;
  } catch (error) {
    if (error.code === 'ETIMEDOUT') {
      throw new ApplicationError(
        'Payment gateway timeout',
        'INT_001',
        'HIGH',
        'INTEGRATION',
        { order_id: order.id }
      );
    }
    throw error;
  }
}
public abstract class ApplicationError extends Exception {
    private final String errorCode;
    private final ErrorSeverity severity;
    private final ErrorCategory category;
    private final Map<String, Object> details;

    public ApplicationError(String message, String errorCode,
                          ErrorSeverity severity, ErrorCategory category,
                          Map<String, Object> details) {
        super(message);
        this.errorCode = errorCode;
        this.severity = severity;
        this.category = category;
        this.details = details != null ? details : new HashMap<>();
    }

    public Map<String, Object> toMap() {
        Map<String, Object> error = new HashMap<>();
        error.put("message", getMessage());
        error.put("code", errorCode);
        error.put("severity", severity.name());
        error.put("category", category.name());
        error.put("details", details);

        return Collections.singletonMap("error", error);
    }
}

public class InsufficientBalanceError extends ApplicationError {
    public InsufficientBalanceError(double required, double available) {
        super(
            "Insufficient account balance",
            "BUS_002",
            ErrorSeverity.MEDIUM,
            ErrorCategory.BUSINESS_LOGIC,
            Map.of(
                "required_amount", required,
                "available_balance", available,
                "shortfall", required - available
            )
        );
    }
}

Retry Patterns

Exponential Backoff Strategy

When to Use

Implement retry logic for transient failures like network timeouts, rate limiting, or temporary service unavailability.

Key Concepts:

  • Initial Delay: Start with small delay (e.g., 1 second)
  • Exponential Growth: Double delay after each retry
  • Max Delay: Cap at reasonable maximum (e.g., 60 seconds)
  • Jitter: Add randomness to prevent thundering herd
import asyncio
import random
from typing import TypeVar, Callable, Awaitable
from dataclasses import dataclass

T = TypeVar('T')

@dataclass
class RetryConfig:
    max_attempts: int = 3
    initial_delay: float = 1.0
    max_delay: float = 60.0
    exponential_base: float = 2.0
    jitter: float = 0.1

class RetryableError(Exception):
    """Errors that should trigger retry logic"""
    pass

async def retry_with_backoff(
    func: Callable[..., Awaitable[T]],
    config: RetryConfig = RetryConfig(),
    *args,
    **kwargs
) -> T:
    """
    Execute function with exponential backoff retry logic
    """
    attempt = 0
    delay = config.initial_delay

    while True:
        attempt += 1
        try:
            return await func(*args, **kwargs)

        except RetryableError as e:
            if attempt >= config.max_attempts:
                logger.error(
                    "Max retry attempts reached",
                    attempts=attempt,
                    error=str(e)
                )
                raise

            # Calculate delay with jitter
            jitter_amount = delay * config.jitter
            actual_delay = delay + random.uniform(-jitter_amount, jitter_amount)

            logger.warning(
                "Operation failed, retrying",
                attempt=attempt,
                delay=actual_delay,
                error=str(e)
            )

            await asyncio.sleep(actual_delay)

            # Increase delay for next attempt
            delay = min(
                delay * config.exponential_base,
                config.max_delay
            )

# Usage Example
async def fetch_user_data(user_id: str):
    """Fetch user data with automatic retry"""
    async def _fetch():
        async with aiohttp.ClientSession() as session:
            async with session.get(f'/api/users/{user_id}') as response:
                if response.status >= 500:
                    raise RetryableError(f"Server error: {response.status}")
                return await response.json()

    return await retry_with_backoff(_fetch)

Retry Decision Matrix

Which Errors to Retry?

Not all errors should trigger retries. Use this matrix to decide:

Error Type Retry? Reason
Network timeout Yes Transient network issue
Rate limit (429) Yes Temporary capacity constraint
Server error (5xx) Yes Temporary service issue
Bad request (400) No Invalid input won't change
Unauthorized (401) No Credentials won't auto-fix
Not found (404) No Resource doesn't exist
Validation error No Data problem needs fixing

Circuit Breaker Pattern

Preventing Cascade Failures

The Problem

When a service fails, continuous retry attempts can overwhelm the failing service and cascade to dependent services.

The Solution

Circuit breakers automatically stop requests to failing services, allowing them to recover.

Circuit States:

stateDiagram-v2
    [*] --> Closed
    Closed --> Open: Failure threshold reached
    Open --> HalfOpen: Timeout expires
    HalfOpen --> Closed: Success
    HalfOpen --> Open: Failure

    note right of Closed
        Normal operation
        All requests pass through
    end note

    note right of Open
        Failing fast
        Requests immediately rejected
    end note

    note right of HalfOpen
        Testing recovery
        Limited requests allowed
    end note

Implementation

from datetime import datetime, timedelta
from enum import Enum
from typing import Callable, TypeVar

T = TypeVar('T')

class CircuitState(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

class CircuitBreakerError(Exception):
    """Raised when circuit breaker is open"""
    pass

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        reset_timeout: float = 60.0,
        half_open_max_calls: int = 3
    ):
        self.failure_threshold = failure_threshold
        self.reset_timeout = reset_timeout
        self.half_open_max_calls = half_open_max_calls

        self.state = CircuitState.CLOSED
        self.failures = 0
        self.last_failure_time = None
        self.half_open_calls = 0

    async def call(
        self,
        func: Callable[..., Awaitable[T]],
        *args,
        **kwargs
    ) -> T:
        """Execute function through circuit breaker"""

        # Check if we should transition to HALF_OPEN
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
                self.half_open_calls = 0
            else:
                raise CircuitBreakerError("Circuit breaker is OPEN")

        # Check HALF_OPEN call limit
        if self.state == CircuitState.HALF_OPEN:
            if self.half_open_calls >= self.half_open_max_calls:
                raise CircuitBreakerError("Circuit breaker HALF_OPEN limit reached")
            self.half_open_calls += 1

        # Execute the function
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise e

    def _should_attempt_reset(self) -> bool:
        """Check if enough time has passed to attempt reset"""
        if not self.last_failure_time:
            return False

        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.reset_timeout

    def _on_success(self):
        """Handle successful call"""
        if self.state == CircuitState.HALF_OPEN:
            # Success in HALF_OPEN -> back to CLOSED
            self.state = CircuitState.CLOSED
            self.failures = 0
            logger.info("Circuit breaker closed after successful test")

    def _on_failure(self):
        """Handle failed call"""
        self.failures += 1
        self.last_failure_time = datetime.now()

        if self.state == CircuitState.HALF_OPEN:
            # Failure in HALF_OPEN -> back to OPEN
            self.state = CircuitState.OPEN
            logger.warning("Circuit breaker reopened after failure in HALF_OPEN")

        elif self.failures >= self.failure_threshold:
            # Too many failures -> OPEN
            self.state = CircuitState.OPEN
            logger.error(
                "Circuit breaker opened",
                failures=self.failures,
                threshold=self.failure_threshold
            )

# Usage Example
payment_circuit = CircuitBreaker(
    failure_threshold=5,
    reset_timeout=30.0,
    half_open_max_calls=3
)

async def process_payment(order_id: str):
    """Process payment with circuit breaker protection"""
    try:
        return await payment_circuit.call(
            payment_service.charge,
            order_id
        )
    except CircuitBreakerError:
        logger.warning("Payment service unavailable (circuit open)")
        # Return to fallback or queue for later
        await queue_payment_for_retry(order_id)
        raise ServiceUnavailableError("Payment processing temporarily unavailable")

Graceful Degradation

Feature Flags and Fallbacks

Maintain Availability

Instead of complete failure, provide reduced functionality when systems are degraded.

from enum import Enum
from typing import Optional, Callable, Any

class FeatureState(Enum):
    ENABLED = "enabled"
    DISABLED = "disabled"
    DEGRADED = "degraded"

class FeatureFlag:
    def __init__(
        self,
        name: str,
        default_state: FeatureState = FeatureState.ENABLED,
        fallback_function: Optional[Callable] = None
    ):
        self.name = name
        self.state = default_state
        self.fallback = fallback_function
        self.error_count = 0
        self.last_error_time = None

    def should_degrade(self) -> bool:
        """Check if feature should enter degraded mode"""
        # Degrade after 5 errors
        if self.error_count >= 5:
            return True

        # Degrade if recent errors
        if self.last_error_time:
            time_since_error = (
                datetime.now() - self.last_error_time
            ).total_seconds()
            return time_since_error < 60

        return False

    async def execute(
        self,
        main_function: Callable,
        *args,
        **kwargs
    ) -> Any:
        """Execute feature with fallback support"""

        if self.state == FeatureState.DISABLED:
            raise FeatureDisabledError(f"Feature {self.name} is disabled")

        try:
            # Use fallback if degraded
            if self.should_degrade() and self.fallback:
                logger.warning(
                    f"Feature {self.name} degraded, using fallback"
                )
                return await self.fallback(*args, **kwargs)

            # Normal execution
            result = await main_function(*args, **kwargs)

            # Reset error count on success
            if self.error_count > 0:
                self.error_count = max(0, self.error_count - 1)

            return result

        except Exception as e:
            self.error_count += 1
            self.last_error_time = datetime.now()

            # Try fallback if available
            if self.fallback:
                logger.error(
                    f"Error in feature {self.name}, using fallback",
                    error=str(e)
                )
                return await self.fallback(*args, **kwargs)

            raise

# Usage Example
async def get_user_recommendations(user_id: str):
    """Get personalized recommendations with fallback"""

    # Fallback: popular items instead of personalized
    async def fallback_recommendations(user_id: str):
        return await get_popular_items(limit=10)

    recommendations_feature = FeatureFlag(
        name="user_recommendations",
        fallback_function=fallback_recommendations
    )

    async def get_ml_recommendations(user_id: str):
        # Complex ML-based recommendation logic
        return await ml_service.get_recommendations(user_id)

    return await recommendations_feature.execute(
        get_ml_recommendations,
        user_id
    )

Error Response Patterns

User-Facing Error Messages

Security First

Never expose internal system details, stack traces, or sensitive information to users.

Error Response Structure:

{
  "error": {
    "code": "PAYMENT_FAILED",
    "message": "Unable to process payment",
    "details": {
      "reason": "Insufficient funds",
      "action": "Please add funds and try again"
    },
    "request_id": "req_abc123",
    "timestamp": "2025-10-23T14:30:00Z"
  }
}

Implementation

from typing import Dict, Any, Optional
from datetime import datetime

class ErrorResponseBuilder:
    @staticmethod
    def build_response(
        error: ApplicationError,
        request_id: str,
        include_details: bool = True
    ) -> Dict[str, Any]:
        """Build standardized error response"""

        response = {
            "error": {
                "code": error.error_code,
                "message": error.message,
                "request_id": request_id,
                "timestamp": datetime.utcnow().isoformat()
            }
        }

        # Only include details if appropriate
        if include_details and error.details:
            # Sanitize details for user consumption
            response["error"]["details"] = ErrorResponseBuilder._sanitize_details(
                error.details
            )

        return response

    @staticmethod
    def _sanitize_details(details: Dict) -> Dict:
        """Remove sensitive information from error details"""
        safe_keys = {'reason', 'action', 'field', 'constraint'}
        return {
            k: v for k, v in details.items()
            if k in safe_keys
        }

# Usage in API handler
@app.exception_handler(ApplicationError)
async def handle_application_error(request, exc: ApplicationError):
    response = ErrorResponseBuilder.build_response(
        error=exc,
        request_id=request.state.request_id,
        include_details=True
    )

    # Log internally with full context
    logger.error(
        "Application error occurred",
        error_code=exc.error_code,
        category=exc.category.value,
        severity=exc.severity.value,
        details=exc.details,
        request_id=request.state.request_id
    )

    # Determine HTTP status code
    status_code = {
        ErrorCategory.VALIDATION: 400,
        ErrorCategory.BUSINESS_LOGIC: 422,
        ErrorCategory.SECURITY: 403,
        ErrorCategory.INTEGRATION: 503,
        ErrorCategory.INFRASTRUCTURE: 503
    }.get(exc.category, 500)

    return JSONResponse(
        content=response,
        status_code=status_code
    )

Error Handling Best Practices

  • Classify errors into appropriate categories
  • Implement retry logic for transient failures
  • Use circuit breakers for external dependencies
  • Provide fallback mechanisms for critical features
  • Return clear, actionable error messages
  • Log errors with full context
  • Never expose sensitive data in errors
  • Use appropriate HTTP status codes
  • Track error rates and patterns
  • Implement error budgets for SLOs

Debugging Techniques and Tools

IDE Debugging

Development Environment Setup

Master Your Tools

Effective debugging starts with properly configured development tools. Invest time in learning your IDE's debugging capabilities.

VS Code Debug Configuration

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Python: Current File",
      "type": "python",
      "request": "launch",
      "program": "${file}",
      "console": "integratedTerminal",
      "justMyCode": false,
      "env": {
        "PYTHONPATH": "${workspaceFolder}",
        "DEBUG": "true"
      }
    },
    {
      "name": "Python: FastAPI",
      "type": "python",
      "request": "launch",
      "module": "uvicorn",
      "args": [
        "main:app",
        "--reload",
        "--port",
        "8000"
      ],
      "jinja": true
    },
    {
      "name": "Python: Pytest",
      "type": "python",
      "request": "launch",
      "module": "pytest",
      "args": [
        "${file}",
        "-v",
        "-s"
      ]
    }
  ]
}
{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Node: Current File",
      "type": "node",
      "request": "launch",
      "program": "${file}",
      "skipFiles": ["<node_internals>/**"],
      "outFiles": ["${workspaceFolder}/dist/**/*.js"],
      "sourceMaps": true
    },
    {
      "name": "Node: Express Server",
      "type": "node",
      "request": "launch",
      "program": "${workspaceFolder}/server.js",
      "restart": true,
      "runtimeExecutable": "nodemon",
      "console": "integratedTerminal"
    },
    {
      "name": "Node: Jest Tests",
      "type": "node",
      "request": "launch",
      "program": "${workspaceFolder}/node_modules/.bin/jest",
      "args": ["--runInBand", "--no-cache"],
      "console": "integratedTerminal"
    }
  ]
}
{
  "version": "0.2.0",
  "configurations": [
    {
      "type": "java",
      "name": "Debug (Launch) - Current File",
      "request": "launch",
      "mainClass": "${file}"
    },
    {
      "type": "java",
      "name": "Debug Spring Boot",
      "request": "launch",
      "mainClass": "com.company.Application",
      "projectName": "my-project",
      "args": "--spring.profiles.active=dev"
    }
  ]
}

Strategic Breakpoint Placement

Breakpoint Strategy

Place breakpoints at critical decision points, not every line. Focus on where state changes or decisions are made.

Effective Breakpoint Locations:

def process_order(order: Order) -> OrderResult:
    # Breakpoint 1: Verify input
    validate_order(order)  # Check validation logic

    # Breakpoint 2: Before external call
    result = payment_processor.charge(order)  # Check before payment

    # Breakpoint 3: After business logic
    updated_order = update_order_status(result)  # Verify state change

    # Breakpoint 4: Before return
    return create_response(updated_order)  # Check final output

Conditional Breakpoints:

# Break only when specific conditions are met
for item in items:
    # Condition: item.price > 1000
    process_item(item)  # Breakpoint here with condition

# Condition: user_id == "debug_user"
user_data = fetch_user(user_id)  # Breakpoint with user condition

Watch Expressions

Track key variables during execution:

Expression Purpose
len(items) Monitor collection size
total_amount > threshold Watch for threshold crossing
error_count Track error accumulation
user.is_authenticated Monitor auth state
response.status_code Track API responses

Browser Developer Tools

Console Debugging Patterns

Advanced Console Usage

Modern browsers provide powerful console APIs beyond simple console.log.

// Group related logs
console.group('User Authentication');
console.log('Username:', username);
console.time('loginDuration');

// Display data in table format
console.table(userData);

// Show call stack
console.trace('Auth flow');

console.timeEnd('loginDuration');
console.groupEnd();

// Conditional logging
console.assert(user.isValid, 'User validation failed', user);

// Count occurrences
console.count('API calls');
console.countReset('API calls');

// Performance markers
performance.mark('startOperation');
// ... operation code ...
performance.mark('endOperation');
performance.measure('operationDuration', 'startOperation', 'endOperation');

Network Debugging

Request Inspection:

// Enhanced fetch with debugging
async function debugFetch(url, options = {}) {
  console.group(`${options.method || 'GET'} ${url}`);
  console.time('request');

  try {
    const response = await fetch(url, options);

    console.log('Status:', response.status);
    console.log('Headers:', Object.fromEntries(response.headers));

    const data = await response.json();
    console.table(data);

    console.timeEnd('request');
    console.groupEnd();

    return data;
  } catch (error) {
    console.error('Request failed:', error);
    console.timeEnd('request');
    console.groupEnd();
    throw error;
  }
}

// Usage
const userData = await debugFetch('/api/user/123');

Performance Profiling

Memory Leak Detection:

// Take heap snapshots
if (window.performance.memory) {
  console.log('Memory usage:', {
    totalJSHeapSize: window.performance.memory.totalJSHeapSize / 1048576 + ' MB',
    usedJSHeapSize: window.performance.memory.usedJSHeapSize / 1048576 + ' MB',
    jsHeapSizeLimit: window.performance.memory.jsHeapSizeLimit / 1048576 + ' MB'
  });
}

// Profile function execution
console.profile('heavyComputation');
heavyComputation();
console.profileEnd('heavyComputation');

Production Debugging

Safe Production Debugging Practices

Production Safety

Production debugging requires extreme caution. Never compromise security or stability.

Safety Checklist:

  • Use feature flags to enable/disable debug mode
  • Implement automatic timeout for debug sessions
  • Sanitize all logged data
  • Monitor performance impact
  • Maintain audit trails
  • Require authorization for debug access

Diagnostic Logging Framework

import logging
import contextvars
import time
from typing import Optional, Dict, Any

request_id = contextvars.ContextVar('request_id', default=None)

class DiagnosticLogger:
    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self.start_time = None

    def start_operation(self, operation_name: str, **context):
        """Start timing an operation with context"""
        self.start_time = time.time()
        self.logger.debug(
            f"Starting {operation_name}",
            extra={
                'operation': operation_name,
                'request_id': request_id.get(),
                **context
            }
        )

    def end_operation(self, operation_name: str, **context):
        """End timing with results"""
        if self.start_time:
            duration = time.time() - self.start_time
            self.logger.debug(
                f"Completed {operation_name}",
                extra={
                    'operation': operation_name,
                    'duration_ms': duration * 1000,
                    'request_id': request_id.get(),
                    **context
                }
            )

    def debug_state(self, obj: object, attributes: list):
        """Log object state for debugging"""
        state = {
            attr: getattr(obj, attr, None)
            for attr in attributes
        }
        self.logger.debug(
            f"State: {obj.__class__.__name__}",
            extra={
                'object_type': obj.__class__.__name__,
                'state': state,
                'request_id': request_id.get()
            }
        )

# Usage Example
logger = DiagnosticLogger(__name__)

def process_payment(payment_data: Dict):
    logger.start_operation('payment_processing', 
                          amount=payment_data['amount'])

    try:
        # Processing logic
        result = charge_payment(payment_data)
        logger.debug_state(result, ['status', 'transaction_id', 'amount'])

        logger.end_operation('payment_processing',
                           status='success',
                           transaction_id=result.transaction_id)
        return result

    except Exception as e:
        logger.end_operation('payment_processing',
                           status='failed',
                           error=str(e))
        raise

Remote Debugging Setup

import debugpy
from contextlib import contextmanager
import logging
from datetime import datetime, timedelta

class SecureRemoteDebugger:
    def __init__(self, host: str = 'localhost', port: int = 5678):
        self.host = host
        self.port = port
        self.logger = logging.getLogger(__name__)
        self.session_timeout = timedelta(minutes=30)

    def setup(self, require_auth: bool = True):
        """Configure remote debugging with security"""
        try:
            if require_auth:
                # Implement authentication here
                if not self._authenticate():
                    raise PermissionError("Authentication required")

            debugpy.listen((self.host, self.port))
            self.logger.info(
                f"Debug server listening on {self.host}:{self.port}"
            )

            # Log security event
            self._audit_log("debug_server_started")

        except Exception as e:
            self.logger.error(f"Failed to start debug server: {e}")
            raise

    @contextmanager
    def debug_session(self, session_id: str, timeout: int = 300):
        """Create temporary debugging session with timeout"""
        try:
            self.logger.info(f"Starting debug session: {session_id}")
            self._audit_log("debug_session_started", session_id=session_id)

            # Wait for debugger with timeout
            debugpy.wait_for_client(timeout)

            yield

        finally:
            self.logger.info(f"Ending debug session: {session_id}")
            self._audit_log("debug_session_ended", session_id=session_id)
            debugpy.disconnect()

    def _authenticate(self) -> bool:
        """Authenticate debug session"""
        # Implement actual authentication logic
        return True

    def _audit_log(self, event: str, **context):
        """Log security audit events"""
        self.logger.info(
            f"Security Audit: {event}",
            extra={
                'event_type': 'security_audit',
                'event': event,
                'timestamp': datetime.utcnow().isoformat(),
                **context
            }
        )

# Usage Example (with feature flag)
if feature_flags.is_enabled('remote_debugging'):
    debugger = SecureRemoteDebugger()
    debugger.setup(require_auth=True)

    with debugger.debug_session('incident_investigation_123'):
        # Debug critical operation
        investigate_issue()

Performance Profiling

CPU Profiling

import cProfile
import pstats
import io
from functools import wraps
from typing import Callable, Any

class CodeProfiler:
    def __init__(self, enabled: bool = True):
        self.enabled = enabled
        self.profiler = cProfile.Profile()

    def profile(self, func: Callable) -> Callable:
        """Decorator for profiling functions"""
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            if not self.enabled:
                return func(*args, **kwargs)

            self.profiler.enable()
            try:
                result = func(*args, **kwargs)
                return result
            finally:
                self.profiler.disable()
                self._print_stats()

        return wrapper

    def _print_stats(self):
        """Print profiling statistics"""
        s = io.StringIO()
        stats = pstats.Stats(self.profiler, stream=s)
        stats.sort_stats('cumulative')
        stats.print_stats(20)  # Top 20 functions

        print("\n" + "="*80)
        print("PROFILING RESULTS")
        print("="*80)
        print(s.getvalue())

# Usage Example
profiler = CodeProfiler()

@profiler.profile
def expensive_operation(data):
    # Complex computation
    result = process_large_dataset(data)
    return result

Memory Profiling

import tracemalloc
from typing import Dict, List

class MemoryProfiler:
    def __init__(self):
        self.snapshot = None
        self.previous_snapshot = None

    def start_tracking(self):
        """Begin memory tracking"""
        tracemalloc.start()
        self.snapshot = tracemalloc.take_snapshot()
        logger.info("Memory tracking started")

    def analyze_memory(self) -> Dict:
        """Analyze memory usage changes"""
        self.previous_snapshot = self.snapshot
        self.snapshot = tracemalloc.take_snapshot()

        # Compare snapshots
        stats = self.snapshot.compare_to(
            self.previous_snapshot,
            'lineno'
        )

        analysis = {
            "total_increase_kb": sum(
                stat.size_diff for stat in stats if stat.size_diff > 0
            ) / 1024,
            "top_increases": []
        }

        # Top 10 memory increases
        for stat in stats[:10]:
            if stat.size_diff > 0:
                analysis["top_increases"].append({
                    "location": str(stat.traceback),
                    "size_increase_kb": stat.size_diff / 1024,
                    "count_increase": stat.count_diff
                })

        logger.info(
            "Memory analysis",
            extra=analysis
        )

        return analysis

    def stop_tracking(self):
        """Stop memory tracking"""
        tracemalloc.stop()
        logger.info("Memory tracking stopped")

# Usage Example
memory_profiler = MemoryProfiler()
memory_profiler.start_tracking()

# Run operations
process_large_dataset()

# Analyze
analysis = memory_profiler.analyze_memory()
memory_profiler.stop_tracking()

Debugging Best Practices

Systematic Investigation

Follow a Process

Debugging is most effective when you follow a systematic approach rather than random trial and error.

Debugging Workflow:

graph TD
    A[Observe Issue] --> B[Form Hypothesis]
    B --> C[Gather Evidence]
    C --> D{Hypothesis Correct?}
    D -->|No| B
    D -->|Yes| E[Implement Fix]
    E --> F[Verify Solution]
    F --> G{Issue Resolved?}
    G -->|No| B
    G -->|Yes| H[Document Finding]

Debug Session Documentation

from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Any

@dataclass
class DebugSession:
    issue_id: str
    start_time: datetime = field(default_factory=datetime.now)
    hypothesis: str = ""
    evidence: List[Dict] = field(default_factory=list)
    steps: List[Dict] = field(default_factory=list)
    resolution: str = ""

    def set_hypothesis(self, hypothesis: str):
        """Document initial hypothesis"""
        self.hypothesis = hypothesis
        self.log_step("Hypothesis formed", {"hypothesis": hypothesis})

    def add_evidence(self, description: str, data: Dict[str, Any]):
        """Collect supporting evidence"""
        evidence_entry = {
            "timestamp": datetime.now(),
            "description": description,
            "data": data
        }
        self.evidence.append(evidence_entry)
        self.log_step("Evidence collected", evidence_entry)

    def log_step(self, action: str, details: Dict):
        """Log debugging step"""
        self.steps.append({
            "timestamp": datetime.now(),
            "action": action,
            "details": details
        })

    def export_session(self) -> Dict:
        """Export session for documentation"""
        return {
            "issue_id": self.issue_id,
            "start_time": self.start_time.isoformat(),
            "duration": (datetime.now() - self.start_time).total_seconds(),
            "hypothesis": self.hypothesis,
            "evidence_count": len(self.evidence),
            "steps_count": len(self.steps),
            "steps": self.steps,
            "resolution": self.resolution
        }

# Usage Example
session = DebugSession(issue_id="ISSUE-123")
session.set_hypothesis("Payment timeout due to database connection pool exhaustion")

# Gather evidence
session.add_evidence(
    "Database connection metrics",
    {
        "active_connections": 95,
        "max_connections": 100,
        "wait_count": 15
    }
)

# Log steps
session.log_step("Increased connection pool size", {"from": 100, "to": 150})
session.add_evidence("After pool increase", {"active_connections": 78, "wait_count": 0})

# Document resolution
session.resolution = "Increased connection pool size and added connection timeout alerts"
final_report = session.export_session()

Common Debugging Scenarios

Scenario 1: Intermittent Failures

Problem

Feature works sometimes but fails intermittently

Debugging Approach:

  1. Add comprehensive logging

    logger.info("Attempting operation", attempt=attempt_number, context=ctx)
    

  2. Check for race conditions

    # Add synchronization
    async with asyncio.Lock():
        # Critical section
        pass
    

  3. Monitor resource states

    logger.debug("Resource state", 
                 connections=pool.active,
                 memory_mb=get_memory_usage())
    

  4. Implement retry with logging

    for attempt in range(max_retries):
        try:
            result = await operation()
            logger.info("Operation succeeded", attempt=attempt)
            break
        except Exception as e:
            logger.warning("Attempt failed", 
                          attempt=attempt, 
                          error=str(e))
    


Scenario 2: Performance Degradation

Problem

Application response time increasing over time

Debugging Approach:

import time
import psutil
import gc

class PerformanceMonitor:
    def __init__(self):
        self.metrics = []

    def capture_metrics(self, operation: str):
        """Capture current performance metrics"""
        process = psutil.Process()

        metrics = {
            "timestamp": time.time(),
            "operation": operation,
            "memory_mb": process.memory_info().rss / 1024 / 1024,
            "cpu_percent": process.cpu_percent(),
            "thread_count": process.num_threads(),
            "open_files": len(process.open_files()),
            "connections": len(process.connections())
        }

        self.metrics.append(metrics)

        # Log if concerning
        if metrics["memory_mb"] > 1000:  # > 1GB
            logger.warning("High memory usage", **metrics)

        return metrics

    def analyze_trends(self):
        """Analyze metric trends"""
        if len(self.metrics) < 2:
            return

        first = self.metrics[0]
        last = self.metrics[-1]

        memory_growth = last["memory_mb"] - first["memory_mb"]
        time_elapsed = last["timestamp"] - first["timestamp"]

        if memory_growth > 100:  # >100MB growth
            logger.error(
                "Memory leak suspected",
                memory_growth_mb=memory_growth,
                time_elapsed_sec=time_elapsed,
                growth_rate_mb_per_min=memory_growth / (time_elapsed / 60)
            )

        # Suggest garbage collection
        if memory_growth > 50:
            gc.collect()
            logger.info("Triggered garbage collection")

# Usage
monitor = PerformanceMonitor()

@app.middleware("http")
async def performance_monitoring(request, call_next):
    monitor.capture_metrics(f"{request.method} {request.url.path}")
    response = await call_next(request)
    monitor.analyze_trends()
    return response

Scenario 3: Production Data Issues

Problem

Issue only reproducible with production data

Safe Production Investigation:

class ProductionDebugger:
    def __init__(self):
        self.debug_enabled = False
        self.target_user_ids = set()

    def enable_for_user(self, user_id: str, duration_minutes: int = 30):
        """Enable debugging for specific user"""
        self.target_user_ids.add(user_id)
        self.debug_enabled = True

        # Schedule auto-disable
        asyncio.create_task(
            self._auto_disable(user_id, duration_minutes)
        )

        logger.info(
            "Debug enabled for user",
            user_id=user_id,
            duration_minutes=duration_minutes
        )

    async def _auto_disable(self, user_id: str, minutes: int):
        """Automatically disable after timeout"""
        await asyncio.sleep(minutes * 60)
        self.target_user_ids.discard(user_id)
        logger.info("Debug auto-disabled for user", user_id=user_id)

    def should_debug(self, user_id: str) -> bool:
        """Check if debugging enabled for user"""
        return self.debug_enabled and user_id in self.target_user_ids

    def debug_operation(self, user_id: str, operation: str, data: Dict):
        """Conditionally log debug information"""
        if self.should_debug(user_id):
            # Sanitize sensitive data
            safe_data = self._sanitize(data)

            logger.debug(
                f"DEBUG: {operation}",
                user_id=user_id,
                data=safe_data
            )

    def _sanitize(self, data: Dict) -> Dict:
        """Remove sensitive fields"""
        sensitive_keys = {'password', 'token', 'ssn', 'credit_card'}
        return {
            k: '***REDACTED***' if k in sensitive_keys else v
            for k, v in data.items()
        }

# Usage
prod_debugger = ProductionDebugger()

# Enable for specific user
prod_debugger.enable_for_user("user_123", duration_minutes=15)

# In your code
def process_order(user_id: str, order_data: Dict):
    prod_debugger.debug_operation(user_id, "process_order", order_data)

    # Normal processing
    result = process(order_data)

    prod_debugger.debug_operation(user_id, "order_result", result)
    return result

Debugging Tools Comparison

Tool Best For Pros Cons
IDE Debugger Development Interactive, full control Not for production
Structured Logs All environments Always available Requires planning
Remote Debugger Staging/Production Real environment Security risk
APM Tools Production monitoring Automatic instrumentation Cost, overhead
Profilers Performance issues Detailed metrics Performance impact
Browser DevTools Frontend issues Built-in, powerful Browser-only

Emergency Debugging Checklist

When facing a critical production issue:

  • Enable enhanced logging for affected component
  • Check recent deployments and changes
  • Review error rates and patterns in monitoring
  • Inspect system resources (CPU, memory, disk, network)
  • Check external dependencies status
  • Review recent data changes that might trigger bugs
  • Enable debugging for specific users if needed
  • Collect diagnostic information before making changes
  • Document findings in real-time
  • Set up alerts to prevent recurrence

Advanced Debugging Techniques

Conditional Compilation for Debug Code

import os
from typing import Any, Callable

DEBUG = os.getenv('DEBUG', 'false').lower() == 'true'

def debug_only(func: Callable) -> Callable:
    """Decorator to execute function only in debug mode"""
    if DEBUG:
        return func
    else:
        return lambda *args, **kwargs: None

@debug_only
def validate_assumptions(data: Any):
    """Expensive validation only in debug mode"""
    assert isinstance(data, dict), "Data must be dict"
    assert 'id' in data, "Data must have id"
    # More expensive checks...

# Usage - no-op in production, runs in debug
validate_assumptions(user_data)

Debugging Decorators

from functools import wraps
import inspect

def debug_calls(func):
    """Log all function calls with arguments"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        # Get argument names
        sig = inspect.signature(func)
        bound_args = sig.bind(*args, **kwargs)
        bound_args.apply_defaults()

        logger.debug(
            f"Calling {func.__name__}",
            arguments=dict(bound_args.arguments)
        )

        try:
            result = func(*args, **kwargs)
            logger.debug(
                f"{func.__name__} returned",
                result=result
            )
            return result
        except Exception as e:
            logger.error(
                f"{func.__name__} raised exception",
                exception=str(e)
            )
            raise

    return wrapper

# Usage
@debug_calls
def calculate_discount(price: float, discount_rate: float) -> float:
    return price * (1 - discount_rate)

Debugging Best Practices Summary

Debugging Effectively

  1. Start with logs - Check existing logs before adding breakpoints
  2. Form hypothesis - Don't debug randomly, have a theory
  3. Collect evidence - Gather data to support/refute hypothesis
  4. Use appropriate tools - IDE for dev, logs for production
  5. Document findings - Help future debugging efforts
  6. Fix root cause - Don't just patch symptoms
  7. Add tests - Prevent regression
  8. Improve observability - Make future debugging easier

Incident Management and Response

Severity Levels and Classification

Incident Severity Definitions

Clear Classification

Well-defined severity levels ensure appropriate resource allocation and response times.

Severity Impact Response Time Examples
P0 (Critical) Complete service outage
Data loss/corruption
Security breach
< 15 minutes Production database down
Payment system offline
Data breach detected
P1 (High) Major feature unavailable
Significant degradation
Revenue impact
< 30 minutes Authentication failing
Orders not processing
API errors > 25%
P2 (Medium) Non-critical feature down
Minor performance issues
Small user subset affected
< 2 hours Search not working
Email delays
Non-critical API slow
P3 (Low) Cosmetic issues
Minor bugs
No user impact
< 24 hours UI formatting issues
Non-critical logs
Documentation errors

Classification Decision Tree

graph TD
    A[Incident Detected] --> B{Service Available?}
    B -->|No| C[P0 - Critical]
    B -->|Yes| D{Major Feature Down?}
    D -->|Yes| E{Revenue Impact?}
    E -->|Yes| F[P1 - High]
    E -->|No| G{User Impact?}
    G -->|High| F
    G -->|Low| H[P2 - Medium]
    D -->|No| I{Performance Issue?}
    I -->|Severe| H
    I -->|Minor| J[P3 - Low]

Severity Classification Implementation

from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import List, Optional

class IncidentSeverity(Enum):
    P0 = ("P0", "Critical", timedelta(minutes=15))
    P1 = ("P1", "High", timedelta(minutes=30))
    P2 = ("P2", "Medium", timedelta(hours=2))
    P3 = ("P3", "Low", timedelta(hours=24))

    def __init__(self, code: str, label: str, response_time: timedelta):
        self.code = code
        self.label = label
        self.response_time = response_time

class SeverityClassifier:
    @staticmethod
    def classify(
        service_available: bool,
        error_rate: float,
        affected_users: int,
        revenue_impact: bool,
        security_issue: bool
    ) -> IncidentSeverity:
        """Automatically classify incident severity"""

        # Critical conditions
        if not service_available or security_issue:
            return IncidentSeverity.P0

        # High severity conditions
        if error_rate > 0.25 or revenue_impact:
            return IncidentSeverity.P1

        # Medium severity conditions
        if error_rate > 0.10 or affected_users > 100:
            return IncidentSeverity.P2

        # Default to low
        return IncidentSeverity.P3

    @staticmethod
    def should_escalate(
        incident_age: timedelta,
        current_severity: IncidentSeverity,
        resolved: bool
    ) -> bool:
        """Check if incident should be escalated"""
        if resolved:
            return False

        # Escalate if not resolved within response time
        return incident_age > current_severity.response_time

# Usage Example
severity = SeverityClassifier.classify(
    service_available=True,
    error_rate=0.30,
    affected_users=500,
    revenue_impact=True,
    security_issue=False
)  # Returns P1

Incident Response Procedures

Response Workflow

Incident Lifecycle

Every incident follows a structured lifecycle from detection to resolution.

Incident States:

stateDiagram-v2
    [*] --> Detected
    Detected --> Investigating: Responder assigned
    Investigating --> Identified: Root cause found
    Identified --> Fixing: Fix being implemented
    Fixing --> Resolved: Fix deployed
    Resolved --> Monitoring: Verifying stability
    Monitoring --> Closed: Stable for 24h
    Monitoring --> Investigating: Issue recurs
    Closed --> [*]

Incident Manager Implementation

from typing import Dict, List, Optional
import uuid
from datetime import datetime

class IncidentStatus(Enum):
    DETECTED = "detected"
    INVESTIGATING = "investigating"
    IDENTIFIED = "identified"
    FIXING = "fixing"
    RESOLVED = "resolved"
    MONITORING = "monitoring"
    CLOSED = "closed"

@dataclass
class Incident:
    id: str
    title: str
    severity: IncidentSeverity
    status: IncidentStatus
    description: str
    affected_services: List[str]
    created_at: datetime
    updated_at: datetime
    assigned_to: Optional[str] = None
    timeline: List[Dict] = None

    def __post_init__(self):
        if self.timeline is None:
            self.timeline = []

class IncidentManager:
    def __init__(self):
        self.incidents: Dict[str, Incident] = {}
        self.notification_service = NotificationService()

    async def create_incident(
        self,
        title: str,
        severity: IncidentSeverity,
        description: str,
        affected_services: List[str]
    ) -> str:
        """Create and initialize new incident"""

        incident_id = f"INC-{uuid.uuid4().hex[:8].upper()}"

        incident = Incident(
            id=incident_id,
            title=title,
            severity=severity,
            status=IncidentStatus.DETECTED,
            description=description,
            affected_services=affected_services,
            created_at=datetime.now(),
            updated_at=datetime.now()
        )

        self.incidents[incident_id] = incident

        # Log incident creation
        self._add_timeline_entry(
            incident,
            "Incident detected and created",
            {"severity": severity.code}
        )

        # Trigger initial response
        await self._trigger_initial_response(incident)

        logger.critical(
            f"{severity.code} Incident Created",
            incident_id=incident_id,
            title=title,
            severity=severity.code,
            affected_services=affected_services
        )

        return incident_id

    async def _trigger_initial_response(self, incident: Incident):
        """Initiate incident response procedures"""

        # Get on-call team
        responders = await self._get_on_call_team(
            incident.severity,
            incident.affected_services
        )

        # Send notifications
        await self._send_notifications(incident, responders)

        # Auto-assign primary responder
        if responders:
            await self.assign_incident(incident.id, responders[0])

        # Start monitoring
        await self._start_incident_monitoring(incident)

    async def _send_notifications(
        self,
        incident: Incident,
        responders: List[str]
    ):
        """Send notifications through appropriate channels"""

        notification = {
            "incident_id": incident.id,
            "title": incident.title,
            "severity": incident.severity.code,
            "affected_services": incident.affected_services,
            "link": f"https://incident-dashboard.company.com/{incident.id}"
        }

        # P0/P1 get all channels
        if incident.severity in [IncidentSeverity.P0, IncidentSeverity.P1]:
            await self.notification_service.send_pager(responders, notification)
            await self.notification_service.send_sms(responders, notification)
            await self.notification_service.send_slack(
                channel="#incidents-critical",
                message=notification
            )
        else:
            # P2/P3 get Slack + email
            await self.notification_service.send_slack(
                channel="#incidents",
                message=notification
            )
            await self.notification_service.send_email(responders, notification)

    async def update_status(
        self,
        incident_id: str,
        new_status: IncidentStatus,
        update_message: str,
        updated_by: str
    ):
        """Update incident status with timeline entry"""

        incident = self.incidents.get(incident_id)
        if not incident:
            raise ValueError(f"Incident {incident_id} not found")

        old_status = incident.status
        incident.status = new_status
        incident.updated_at = datetime.now()

        # Add timeline entry
        self._add_timeline_entry(
            incident,
            f"Status changed: {old_status.value}{new_status.value}",
            {
                "message": update_message,
                "updated_by": updated_by
            }
        )

        # Handle status-specific actions
        await self._handle_status_change(incident, old_status, new_status)

        logger.info(
            "Incident status updated",
            incident_id=incident_id,
            old_status=old_status.value,
            new_status=new_status.value,
            updated_by=updated_by
        )

    def _add_timeline_entry(
        self,
        incident: Incident,
        event: str,
        details: Optional[Dict] = None
    ):
        """Add entry to incident timeline"""
        incident.timeline.append({
            "timestamp": datetime.now().isoformat(),
            "event": event,
            "details": details or {}
        })

    async def _handle_status_change(
        self,
        incident: Incident,
        old_status: IncidentStatus,
        new_status: IncidentStatus
    ):
        """Execute actions based on status changes"""

        # When resolved, start monitoring period
        if new_status == IncidentStatus.RESOLVED:
            await self._start_resolution_monitoring(incident)

        # When closed, trigger post-mortem
        elif new_status == IncidentStatus.CLOSED:
            await self._trigger_postmortem_creation(incident)

# Usage Example
incident_mgr = IncidentManager()

# Create incident
incident_id = await incident_mgr.create_incident(
    title="Payment Gateway Timeout",
    severity=IncidentSeverity.P1,
    description="Payment processing experiencing timeouts > 5s",
    affected_services=["payment-gateway", "order-service"]
)

# Update status as investigation progresses
await incident_mgr.update_status(
    incident_id=incident_id,
    new_status=IncidentStatus.INVESTIGATING,
    update_message="Platform team investigating database connection pool",
    updated_by="name@atlancis.com"
)

Escalation Procedures

Escalation Paths

Know When to Escalate

Don't hesitate to escalate when needed. It's better to escalate early than to let an incident grow.

Escalation Triggers:

  • Incident not acknowledged within response time
  • Root cause not identified within 1 hour (P0/P1)
  • Resolution attempts failing
  • Scope or impact increasing
  • Customer escalations

Escalation Implementation

@dataclass
class EscalationLevel:
    level: int
    roles: List[str]
    notification_channels: List[str]
    timeout_minutes: int

class EscalationManager:
    def __init__(self):
        self.escalation_paths = {
            IncidentSeverity.P0: [
                EscalationLevel(
                    level=1,
                    roles=["on-call-engineer"],
                    notification_channels=["pager", "sms", "slack"],
                    timeout_minutes=5
                ),
                EscalationLevel(
                    level=2,
                    roles=["team-lead", "senior-engineer"],
                    notification_channels=["pager", "sms", "slack", "phone"],
                    timeout_minutes=10
                ),
                EscalationLevel(
                    level=3,
                    roles=["engineering-manager", "cto"],
                    notification_channels=["phone", "sms"],
                    timeout_minutes=15
                )
            ],
            # Define paths for other severities...
        }

    async def check_escalation(self, incident: Incident):
        """Check if incident should be escalated"""

        time_since_creation = datetime.now() - incident.created_at
        current_level = self._get_current_escalation_level(incident)

        path = self.escalation_paths.get(incident.severity, [])
        if current_level < len(path):
            next_level = path[current_level]

            if time_since_creation.total_seconds() > (next_level.timeout_minutes * 60):
                await self._escalate(incident, next_level)

    async def _escalate(
        self,
        incident: Incident,
        escalation_level: EscalationLevel
    ):
        """Execute escalation"""

        logger.critical(
            f"Escalating incident to level {escalation_level.level}",
            incident_id=incident.id,
            severity=incident.severity.code,
            level=escalation_level.level,
            roles=escalation_level.roles
        )

        # Notify escalation contacts
        for role in escalation_level.roles:
            contacts = await self._get_contacts_for_role(role)
            await self._notify_escalation(
                contacts,
                incident,
                escalation_level
            )

Post-Mortem Analysis

Post-Mortem Template

Learn from Every Incident

Post-mortems are blameless learning opportunities, not finger-pointing sessions.

Standard Template:

# Post-Mortem: [Incident Title]

**Incident ID**: INC-12345678  
**Date**: 2025-10-23  
**Severity**: P1  
**Duration**: 2h 15m  
**Author**: name@atlancis.com  

---

## Executive Summary

Brief 2-3 sentence summary of what happened and impact.

---

## Impact

**User Impact**:
- 45% of payment transactions failed
- Approximately 1,200 users affected

**Business Impact**:
- Estimated revenue loss: $25,000
- 127 customer support tickets

**Technical Impact**:
- Payment service degraded
- Order processing delayed by average 15 minutes

---

## Timeline

| Time | Event |
|------|-------|
| 14:00 | Alert triggered: Payment error rate > 10% |
| 14:05 | Incident declared (P1) |
| 14:15 | Root cause identified: Database connection pool exhaustion |
| 14:30 | Temporary fix deployed: Increased pool size |
| 15:45 | Permanent fix deployed: Connection leak patched |
| 16:15 | Monitoring period complete, incident closed |

---

## Root Cause Analysis

### Primary Cause
Database connection leak in payment service v2.3.1

### Contributing Factors
1. Insufficient connection pool monitoring
2. Load testing didn't catch the leak
3. Connection timeout too long (60s)

### Why It Wasn't Caught
- New code path only triggered under high load
- Integration tests used mocked database
- Staging environment has lower traffic

---

## Resolution

**Immediate Actions**:
- Increased connection pool size from 100 to 200
- Restarted payment service instances

**Permanent Fix**:
- Patched connection leak in ORM query
- Added connection pool metrics
- Reduced connection timeout to 10s

---

## Action Items

| Action | Owner | Due Date | Status |
|--------|-------|----------|--------|
| Add connection pool monitoring | Platform Team | 2025-10-25 |  Done |
| Update integration tests | QA Team | 2025-10-27 |  In Progress |
| Conduct load testing | DevOps | 2025-10-30 |  Pending |
| Document connection best practices | name@ | 2025-10-26 |  Done |

---

## Lessons Learned

**What Went Well**:
- Quick incident detection (<5min from issue start)
- Clear communication in incident channel
- Temporary fix deployed quickly

**What Could Be Improved**:
- Connection pool metrics should have existed
- Load testing should include connection lifecycle
- Staging environment should mirror production load

**Future Prevention**:
- Implement connection pool monitoring
- Add circuit breakers for database calls
- Improve load testing coverage

---

## Related Incidents
- INC-12340987: Similar connection pool issue (2024-08-15)

Post-Mortem Manager Implementation

class PostmortemManager:
    def __init__(self):
        self.postmortems: Dict[str, Dict] = {}
        self.action_items: Dict[str, List] = {}

    async def create_postmortem(
        self,
        incident: Incident,
        facilitator: str
    ) -> str:
        """Create post-mortem from incident"""

        postmortem_id = f"PM-{uuid.uuid4().hex[:8].upper()}"

        postmortem = {
            "id": postmortem_id,
            "incident_id": incident.id,
            "created_at": datetime.now(),
            "due_date": datetime.now() + timedelta(days=5),
            "facilitator": facilitator,
            "status": "draft",
            "sections": {
                "summary": self._generate_summary(incident),
                "impact": self._generate_impact_section(incident),
                "timeline": self._format_timeline(incident.timeline),
                "root_cause": "",  # To be filled
                "resolution": "",  # To be filled
                "action_items": [],  # To be added
                "lessons_learned": ""  # To be filled
            },
            "contributors": [],
            "reviews_required": ["team-lead", "engineering-manager"]
        }

        self.postmortems[postmortem_id] = postmortem

        # Schedule post-mortem meeting
        await self._schedule_postmortem_meeting(postmortem)

        logger.info(
            "Post-mortem created",
            postmortem_id=postmortem_id,
            incident_id=incident.id,
            due_date=postmortem["due_date"].isoformat()
        )

        return postmortem_id

    def _generate_summary(self, incident: Incident) -> str:
        """Generate executive summary"""
        duration = incident.updated_at - incident.created_at
        return f"""
        {incident.severity.code} incident affecting {', '.join(incident.affected_services)}.
        Duration: {self._format_duration(duration)}.
        Status: {incident.status.value}.
        """

    def _format_duration(self, duration: timedelta) -> str:
        """Format duration in human-readable format"""
        hours = int(duration.total_seconds() // 3600)
        minutes = int((duration.total_seconds() % 3600) // 60)
        return f"{hours}h {minutes}m"

    async def add_action_item(
        self,
        postmortem_id: str,
        title: str,
        description: str,
        owner: str,
        due_date: datetime,
        priority: str
    ) -> str:
        """Add action item to post-mortem"""

        action_item_id = f"AI-{uuid.uuid4().hex[:6].upper()}"

        action_item = {
            "id": action_item_id,
            "title": title,
            "description": description,
            "owner": owner,
            "due_date": due_date,
            "priority": priority,
            "status": "pending",
            "created_at": datetime.now()
        }

        if postmortem_id not in self.action_items:
            self.action_items[postmortem_id] = []

        self.action_items[postmortem_id].append(action_item)

        # Add to postmortem
        postmortem = self.postmortems[postmortem_id]
        postmortem["sections"]["action_items"].append(action_item)

        # Create tracking ticket
        await self._create_tracking_ticket(action_item)

        logger.info(
            "Action item added",
            postmortem_id=postmortem_id,
            action_item_id=action_item_id,
            owner=owner
        )

        return action_item_id

    async def _create_tracking_ticket(self, action_item: Dict):
        """Create JIRA/Linear ticket for action item"""
        # Integration with project management tools
        pass

# Usage Example
postmortem_mgr = PostmortemManager()

# Create post-mortem after incident
postmortem_id = await postmortem_mgr.create_postmortem(
    incident=resolved_incident,
    facilitator="alice@company.com"
)

# Add action items
await postmortem_mgr.add_action_item(
    postmortem_id=postmortem_id,
    title="Implement connection pool monitoring",
    description="Add Prometheus metrics for database connection pool usage",
    owner="platform-team",
    due_date=datetime.now() + timedelta(days=7),
    priority="high"
)

On-Call Management

On-Call Rotation Structure

Fair Distribution

Rotate on-call duties fairly to prevent burnout while maintaining 24/7 coverage.

Rotation Structure:

from datetime import datetime, timedelta
from typing import List, Dict, Optional

@dataclass
class OnCallSchedule:
    team: str
    primary: str
    secondary: str
    start_time: datetime
    end_time: datetime

class OnCallManager:
    def __init__(self):
        self.schedules: Dict[str, List[OnCallSchedule]] = {}
        self.handoff_notes: List[Dict] = []

    def get_current_oncall(self, team: str) -> Dict[str, str]:
        """Get current on-call engineers"""
        now = datetime.now()

        schedule = self._find_active_schedule(team, now)
        if not schedule:
            raise ValueError(f"No active on-call schedule for {team}")

        return {
            "team": team,
            "primary": schedule.primary,
            "secondary": schedule.secondary,
            "start_time": schedule.start_time.isoformat(),
            "end_time": schedule.end_time.isoformat()
        }

    def _find_active_schedule(
        self,
        team: str,
        timestamp: datetime
    ) -> Optional[OnCallSchedule]:
        """Find active schedule for timestamp"""
        team_schedules = self.schedules.get(team, [])

        for schedule in team_schedules:
            if schedule.start_time <= timestamp < schedule.end_time:
                return schedule

        return None

    async def perform_handoff(
        self,
        team: str,
        from_engineer: str,
        to_engineer: str,
        notes: str,
        active_incidents: List[str]
    ):
        """Document on-call handoff"""

        handoff = {
            "timestamp": datetime.now(),
            "team": team,
            "from": from_engineer,
            "to": to_engineer,
            "notes": notes,
            "active_incidents": active_incidents,
            "outstanding_issues": await self._get_outstanding_issues(team)
        }

        self.handoff_notes.append(handoff)

        # Notify team
        await self._notify_handoff(handoff)

        logger.info(
            "On-call handoff completed",
            team=team,
            from_engineer=from_engineer,
            to_engineer=to_engineer,
            active_incidents_count=len(active_incidents)
        )

    async def _notify_handoff(self, handoff: Dict):
        """Send handoff notification to team"""
        message = f"""On-Call Handoff - {handoff['team']}

        From: {handoff['from']}
        To: {handoff['to']}

        Active Incidents: {len(handoff['active_incidents'])}
        Outstanding Issues: {len(handoff['outstanding_issues'])}

        Notes: {handoff['notes']}
        """

        # Send to team channel
        await notification_service.send_slack(
            channel=f"#{handoff['team']}-oncall",
            message=message
        )

On-Call Best Practices

On-Call Health

Maintain healthy on-call practices to ensure responder effectiveness and prevent burnout.

Guidelines:

  • Rotation Length: 1 week maximum
  • Backup Coverage: Always have secondary on-call
  • Post-Incident Rest: Take time off after major incidents
  • Handoff Protocol: Document active issues and concerns
  • Compensation: Provide on-call compensation or time-off
  • Load Balancing: Distribute incidents fairly across team

Incident Communication

Status Page Updates

class StatusPageManager:
    def __init__(self):
        self.status_page_api = StatusPageAPI()
        self.incident_status_map = {
            IncidentStatus.DETECTED: "investigating",
            IncidentStatus.INVESTIGATING: "investigating",
            IncidentStatus.IDENTIFIED: "identified",
            IncidentStatus.FIXING: "monitoring",
            IncidentStatus.RESOLVED: "resolved"
        }

    async def create_status_page_incident(
        self,
        incident: Incident
    ) -> str:
        """Create public-facing status page incident"""

        # Sanitize information for public consumption
        public_title = self._sanitize_title(incident.title)
        public_description = self._create_public_message(incident)

        status_incident = await self.status_page_api.create_incident(
            name=public_title,
            status=self.incident_status_map[incident.status],
            impact=self._map_severity_to_impact(incident.severity),
            components=incident.affected_services,
            message=public_description
        )

        logger.info(
            "Status page incident created",
            incident_id=incident.id,
            status_page_id=status_incident["id"]
        )

        return status_incident["id"]

    async def update_status_page(
        self,
        status_page_id: str,
        incident: Incident,
        message: str
    ):
        """Post update to status page"""

        await self.status_page_api.post_update(
            incident_id=status_page_id,
            status=self.incident_status_map[incident.status],
            message=message
        )

    def _sanitize_title(self, title: str) -> str:
        """Remove internal jargon from title"""
        # Replace internal service names with user-facing names
        replacements = {
            "payment-gateway": "Payment Processing",
            "auth-service": "Login System",
            "order-service": "Order Management"
        }

        sanitized = title
        for internal, public in replacements.items():
            sanitized = sanitized.replace(internal, public)

        return sanitized

    def _create_public_message(self, incident: Incident) -> str:
        """Create user-friendly incident message"""
        return f"""
        We are investigating an issue affecting {', '.join(incident.affected_services)}.
        Our team is actively working on a resolution.
        We will provide updates as we learn more.
        """

    def _map_severity_to_impact(self, severity: IncidentSeverity) -> str:
        """Map internal severity to public impact level"""
        mapping = {
            IncidentSeverity.P0: "critical",
            IncidentSeverity.P1: "major",
            IncidentSeverity.P2: "minor",
            IncidentSeverity.P3: "none"
        }
        return mapping.get(severity, "minor")

Internal Communication Templates

Communication Cadence

Regular updates prevent information vacuum and maintain stakeholder confidence.

Initial Notification Template:

**INCIDENT DECLARED** - [Severity]

**Incident ID**: INC-12345678
**Severity**: P1
**Status**: Investigating
**Affected Services**: Payment Gateway, Order Processing

**Summary**: 
Users experiencing payment processing delays. Error rate elevated to 30%.

**Impact**:
- Approximately 500 users affected
- Payment completion time increased from 2s to 15s

**Current Actions**:
- Platform team investigating database connection issues
- Temporary rate limiting applied to stabilize service

**Next Update**: In 30 minutes or when status changes

**Incident Commander**: name@atlancis.com
**Communication Lead**: name@atlancis.com

Progress Update Template:

**INCIDENT UPDATE** - [Incident ID]

**Time**: 14:45 UTC
**Status**: Identified → Fixing

**Update**:
Root cause identified: Database connection pool exhaustion due to connection leak.

**Actions Taken**:
- Increased connection pool size (immediate mitigation)
- Identified problematic code path
- Deploying fix to production (ETA: 15:00 UTC)

**Current Impact**:
Error rate reduced from 30% to 8%

**Next Update**: 15:15 UTC or when resolved

Resolution Template:

**INCIDENT RESOLVED** - [Incident ID]

**Resolution Time**: 15:30 UTC
**Total Duration**: 2h 15m

**Final Status**:
Issue has been resolved. All services operating normally.

**Resolution**:
- Patched connection leak in payment service
- Connection pool metrics added for future monitoring
- Service fully restored at 15:30 UTC

**Impact Summary**:
- 1,200 users affected
- Estimated revenue impact: $25,000
- 127 support tickets created

**Next Steps**:
- Post-mortem scheduled for 2025-10-24
- Action items tracked in PM-ABC12345

**Questions**: Contact name@atlancis.com

Incident Metrics and Monitoring

Key Metrics to Track

Measure to Improve

Track incident metrics to identify trends and improve response effectiveness.

Metric Definition Target
MTTD Mean Time To Detect < 5 minutes
MTTA Mean Time To Acknowledge < 15 minutes (P0/P1)
MTTI Mean Time To Investigate < 1 hour (P0/P1)
MTTR Mean Time To Resolution < 4 hours (P0/P1)
Incident Frequency Incidents per week Trending down
Repeat Incidents Same root cause < 5%

Metrics Dashboard Implementation

from collections import defaultdict
from typing import List

class IncidentMetrics:
    def __init__(self):
        self.metrics = defaultdict(list)

    def calculate_mttd(self, incidents: List[Incident]) -> float:
        """Calculate Mean Time To Detect"""
        # Assumes detection time is tracked
        detection_times = [
            i.detection_time for i in incidents
            if hasattr(i, 'detection_time')
        ]
        return sum(detection_times) / len(detection_times) if detection_times else 0

    def calculate_mttr(self, incidents: List[Incident]) -> float:
        """Calculate Mean Time To Resolution"""
        resolution_times = [
            (i.updated_at - i.created_at).total_seconds() / 60
            for i in incidents
            if i.status == IncidentStatus.CLOSED
        ]
        return sum(resolution_times) / len(resolution_times) if resolution_times else 0

    def get_incident_trends(
        self,
        incidents: List[Incident],
        days: int = 30
    ) -> Dict:
        """Analyze incident trends"""

        cutoff_date = datetime.now() - timedelta(days=days)
        recent_incidents = [
            i for i in incidents
            if i.created_at >= cutoff_date
        ]

        # Group by severity
        by_severity = defaultdict(int)
        for incident in recent_incidents:
            by_severity[incident.severity.code] += 1

        # Group by affected service
        by_service = defaultdict(int)
        for incident in recent_incidents:
            for service in incident.affected_services:
                by_service[service] += 1

        return {
            "total_incidents": len(recent_incidents),
            "by_severity": dict(by_severity),
            "by_service": dict(by_service),
            "mttr_minutes": self.calculate_mttr(recent_incidents),
            "incidents_per_week": len(recent_incidents) / (days / 7)
        }

    def identify_repeat_incidents(
        self,
        incidents: List[Incident],
        days: int = 90
    ) -> List[Dict]:
        """Identify recurring incidents"""

        cutoff_date = datetime.now() - timedelta(days=days)
        recent_incidents = [
            i for i in incidents
            if i.created_at >= cutoff_date
        ]

        # Group by title/root cause
        incident_groups = defaultdict(list)
        for incident in recent_incidents:
            # Simple grouping by title similarity
            key = incident.title.lower()
            incident_groups[key].append(incident)

        # Find repeats
        repeats = []
        for title, group in incident_groups.items():
            if len(group) > 1:
                repeats.append({
                    "title": title,
                    "occurrences": len(group),
                    "incidents": [i.id for i in group],
                    "severity": group[0].severity.code
                })

        return sorted(repeats, key=lambda x: x["occurrences"], reverse=True)

# Usage Example
metrics = IncidentMetrics()
trends = metrics.get_incident_trends(all_incidents, days=30)
repeats = metrics.identify_repeat_incidents(all_incidents)

logger.info("Incident trends", **trends)
if repeats:
    logger.warning("Repeat incidents detected", repeats=repeats)

Incident Response Checklist

During an Incident

  • Declare incident with appropriate severity
  • Assign incident commander to coordinate response
  • Create communication channel (#incident-[id])
  • Notify stakeholders based on severity
  • Update status page if customer-facing
  • Document timeline as events occur
  • Communicate regularly (every 30min for P0/P1)
  • Focus on mitigation before root cause
  • Escalate if needed within response time
  • Monitor for recurrence after resolution

After an Incident

  • Update final status page message
  • Close communication channels
  • Schedule post-mortem meeting
  • Create post-mortem document
  • Identify action items with owners
  • Track action items to completion
  • Share learnings with broader team
  • Update documentation and runbooks
  • Review incident metrics
  • Thank responders for their work

Incident Management Best Practices

Incident Management Excellence

  1. Prepare in advance - Have runbooks and procedures ready
  2. Communicate clearly - Keep stakeholders informed
  3. Act decisively - Make decisions quickly with available information
  4. Document everything - Timeline and actions are critical
  5. Focus on resolution - Root cause analysis comes after mitigation
  6. Learn from incidents - Conduct thorough post-mortems
  7. Track action items - Follow through on improvements
  8. Support responders - Provide rest and recognition
  9. Measure performance - Track metrics and trends
  10. Continuous improvement - Iterate on processes

Summary

This comprehensive guide covers the full spectrum of troubleshooting and error handling practices:

Logging: Build observable systems with structured logging, appropriate log levels, and contextual information that enables rapid diagnosis.

Error Handling: Implement systematic error classification, retry patterns with exponential backoff, circuit breakers for resilience, and graceful degradation strategies.

Debugging: Master IDE debugging tools, browser developer tools, production-safe debugging techniques, and performance profiling for identifying bottlenecks.

Incident Management: Establish clear severity levels, structured response procedures, escalation paths, comprehensive post-mortems, and healthy on-call practices.

Together, these practices create a robust foundation for maintaining system reliability, resolving issues quickly, and continuously improving operational excellence.


Last updated: October 2025