Library and Framework Usage¶
Section Overview
Comprehensive guidelines for evaluating, integrating, and managing third-party libraries and frameworks throughout their complete lifecycle.
Quick Navigation¶
Core Topics:
- Introduction
- Dependency Selection Criteria
- Integration Patterns
- Lifecycle Management
- Common Usage Patterns
- Best Practices Summary
Key Resources:
Introduction¶
External libraries and frameworks are powerful tools that accelerate development and provide battle-tested solutions. However, they also introduce dependencies, potential security risks, and maintenance overhead. This section provides systematic approaches to:
- Evaluate dependencies against project requirements
- Integrate libraries using maintainable patterns
- Manage the complete dependency lifecycle
- Apply proven usage patterns across common scenarios
Balance Innovation and Stability
The goal is not to minimize dependencies, but to manage them strategically-choosing the right tools while maintaining control over your architecture.
Core Principles¶
1. Systematic Evaluation¶
Principle
Every dependency decision should follow a documented evaluation process that balances technical merit, organizational fit, and long-term sustainability.
Why it matters: Prevents technical debt accumulation and ensures dependencies align with project goals.
2. Strategic Encapsulation¶
Principle
External dependencies should be isolated behind domain-specific abstractions to maintain upgrade flexibility and architectural control.
Why it matters: Enables library replacement without cascading refactoring across the codebase.
3. Proactive Management¶
Principle
Dependencies require active management throughout their lifecycle, from initial evaluation through eventual replacement.
Why it matters: Prevents security vulnerabilities and maintains system health over time.
4. Measured Adoption¶
Principle
Not every problem requires a new dependency. Balance the value gained against the complexity added.
Why it matters: Each dependency increases attack surface, deployment size, and maintenance burden.
Decision Frameworks¶
When to Add a Dependency¶
Use this decision tree to evaluate new dependency proposals:
graph TD
A[New Functionality Needed] --> B{Available in Standard Library?}
B -->|Yes| C[Use Standard Library]
B -->|No| D{Can Implement in <100 LOC?}
D -->|Yes| E{High Complexity Algorithm?}
E -->|No| F[Implement Internally]
E -->|Yes| G[Evaluate Dependencies]
D -->|No| G
G --> H{Passes Evaluation Scorecard?}
H -->|No| I[Reject or Find Alternative]
H -->|Yes| J{Security Risk Acceptable?}
J -->|No| I
J -->|Yes| K[Document & Adopt] When to Remove a Dependency¶
| Indicator | Action |
|---|---|
| No longer used | Remove immediately |
| Security vulnerabilities with no fixes | Replace urgently |
| Abandoned project (>1 year no updates) | Plan migration |
| Available in standard library | Evaluate replacement cost |
| Simpler alternative available | Consider during major refactor |
Risk Assessment Matrix¶
Understanding the risk profile of dependencies helps prioritize management efforts:
| Risk Factor | Low Risk | Medium Risk | High Risk |
|---|---|---|---|
| Maintenance | Active development, frequent releases | Occasional updates | No updates in 6+ months |
| Security | No known vulnerabilities | Minor vulnerabilities with fixes | Critical unpatched vulnerabilities |
| Usage | Used in non-critical paths | Used in important features | Core functionality dependency |
| Replaceability | Easy to replace | Moderate effort to replace | Deeply integrated, hard to replace |
| License | Permissive (MIT, Apache) | Weak copyleft (LGPL) | Strong copyleft (GPL) |
Risk Level Calculation:
- 3-5 Low risk factors: Monitor quarterly
- 3+ Medium risk factors: Review monthly
- Any High risk factor: Immediate attention required
Metrics and Monitoring¶
Track these metrics to maintain healthy dependency management:
Health Metrics¶
| Metric | Target | Red Flag |
|---|---|---|
| Total dependency count | Trend stable or decreasing | Continuous growth |
| Dependencies with known vulnerabilities | 0 | > 0 critical, > 3 high |
| Average dependency age | < 1 year | > 2 years |
| Outdated dependencies | < 10% | > 30% |
| Time to patch critical vulnerabilities | < 48 hours | > 1 week |
| Unused dependencies | 0 | > 5 |
Process Metrics¶
| Metric | Target | Review Frequency |
|---|---|---|
| Dependencies without assigned owners | 0 | Monthly |
| Days since last dependency review | < 90 | Weekly check |
| Evaluation scorecards completed | 100% of new deps | Per addition |
| Documentation coverage | 100% | Per addition |
Resources and Tools¶
Evaluation Tools¶
- Security Scanners: Snyk, npm audit, Safety (Python), OWASP Dependency-Check
- License Checkers: FOSSA, WhiteSource, Black Duck
- Dependency Analyzers: Dependabot, Renovate, Libraries.io
Management Tools¶
- Version Locking: package-lock.json, Pipfile.lock, go.mod
- Update Management: Dependabot, Renovate Bot
- Vulnerability Databases: CVE, GitHub Advisory, OSV
Documentation Platforms¶
- Internal: Confluence, Notion, GitHub Wiki
- Code Documentation: JSDoc, Sphinx, Javadoc
- Decision Records: ADR (Architecture Decision Records)
Getting Help¶
Internal Resources¶
- Dependency Registry: Check
DEPENDENCIES.mdfor current approved libraries - Architecture Team: For evaluation of significant new dependencies
- Security Team: For vulnerability assessment and remediation
- DevOps Team: For deployment and infrastructure considerations
External Resources¶
- OWASP Dependency Management Cheat Sheet
- Semantic Versioning Specification
- Keep a Changelog
- Libraries.io - Dependency monitoring
Dependency Selection Criteria¶
Overview¶
Selecting the right dependencies is a critical architectural decision that impacts:
- Development velocity - How quickly can features be built?
- System stability - Will updates break functionality?
- Security posture - What vulnerabilities are introduced?
- Maintenance burden - How much ongoing work is required?
- Technical debt - What future constraints are created?
This section provides structured evaluation frameworks to make informed dependency decisions.
Evaluation Framework¶
Evaluation Scorecard¶
Every dependency candidate should be evaluated across these dimensions:
| Category | Criteria | Weight | Scoring Guide |
|---|---|---|---|
| Maintenance | Release frequency Issue resolution time Open PR count | High (0.8) | Active: 8-10 Moderate: 5-7 Stale: 0-4 |
| Documentation | Completeness Examples API reference Tutorials | High (0.8) | Comprehensive: 8-10 Adequate: 5-7 Poor: 0-4 |
| Community | Stars/popularity Contributors Forum activity | Medium (0.5) | Vibrant: 8-10 Active: 5-7 Quiet: 0-4 |
| Security | Vulnerability history Security policy Dependency count | High (0.9) | Excellent: 8-10 Good: 5-7 Concerning: 0-4 |
| Performance | Benchmarks Resource usage Optimization | Medium (0.6) | Excellent: 8-10 Good: 5-7 Poor: 0-4 |
| Features | Completeness Extensibility API design | High (0.8) | Complete: 8-10 Adequate: 5-7 Limited: 0-4 |
| License | Compatibility | Critical (1.0) | Compatible: 10 Review needed: 5 Incompatible: 0 |
Weighted Score Calculation:
Recommendation Thresholds:
- ≥ 7.5: APPROVED - Strong candidate
- 6.0-7.4: ACCEPTABLE - Use with monitoring
- < 6.0: REQUIRES REVIEW - Consider alternatives
Implementation: Evaluation Tools¶
from dataclasses import dataclass
from typing import Dict, List
import requests
@dataclass
class EvaluationResult:
scores: Dict[str, float]
weighted_score: float
recommendation: str
rationale: str
class DependencyEvaluator:
WEIGHTS = {
'maintenance': 0.8,
'documentation': 0.8,
'community': 0.5,
'security': 0.9,
'performance': 0.6,
'features': 0.8,
'license': 1.0
}
def __init__(self, package_registry):
self.registry = package_registry
def evaluate(self, package_name: str, requirements: Dict) -> EvaluationResult:
"""Evaluate a package against project requirements."""
metadata = self.registry.fetch_metadata(package_name)
scores = {
'maintenance': self._evaluate_maintenance(metadata),
'documentation': self._evaluate_documentation(metadata),
'community': self._evaluate_community(metadata),
'security': self._evaluate_security(metadata),
'performance': self._evaluate_performance(metadata, requirements),
'features': self._evaluate_features(metadata, requirements),
'license': self._evaluate_license(metadata)
}
weighted_score = sum(
scores[k] * self.WEIGHTS[k] for k in scores
) / sum(self.WEIGHTS.values())
recommendation = self._generate_recommendation(scores, weighted_score)
rationale = self._generate_rationale(scores, metadata, requirements)
return EvaluationResult(scores, weighted_score, recommendation, rationale)
def _evaluate_maintenance(self, metadata: Dict) -> float:
"""Score based on maintenance activity."""
# Check release frequency
releases = metadata.get('releases', [])
recent_releases = [r for r in releases if self._is_recent(r['date'], months=6)]
# Check issue resolution
issues = metadata.get('issues', {})
resolution_time = issues.get('avg_resolution_days', 999)
score = 0.0
score += 3.0 if len(recent_releases) >= 3 else len(recent_releases)
score += 4.0 if resolution_time < 30 else 2.0 if resolution_time < 90 else 0.0
score += 3.0 if issues.get('open_count', 999) < 50 else 1.0
return min(10.0, score)
def _evaluate_documentation(self, metadata: Dict) -> float:
"""Score documentation quality."""
docs = metadata.get('documentation', {})
score = 0.0
score += 3.0 if docs.get('has_api_reference') else 0.0
score += 3.0 if docs.get('has_examples') else 0.0
score += 2.0 if docs.get('has_tutorials') else 0.0
score += 2.0 if docs.get('has_migration_guides') else 0.0
return score
def _evaluate_security(self, metadata: Dict) -> float:
"""Score security posture."""
security = metadata.get('security', {})
vulns = security.get('vulnerabilities', [])
# Critical failure for unpatched high/critical vulnerabilities
critical_vulns = [v for v in vulns if v['severity'] in ['HIGH', 'CRITICAL'] and not v.get('patched')]
if critical_vulns:
return 0.0
score = 10.0
score -= len([v for v in vulns if not v.get('patched')]) * 2.0
score += 2.0 if security.get('has_security_policy') else 0.0
return max(0.0, score)
def _generate_recommendation(self, scores: Dict, weighted_score: float) -> str:
"""Generate recommendation based on scores."""
if scores['license'] < 0.5:
return "REJECT - License incompatible"
if scores['security'] < 6.0:
return "CAUTION - Security concerns"
if weighted_score >= 7.5:
return "APPROVED"
if weighted_score >= 6.0:
return "ACCEPTABLE - Monitor closely"
return "REQUIRES REVIEW"
class DependencyEvaluator {
constructor(packageRegistry) {
this.registry = packageRegistry;
this.weights = {
maintenance: 0.8,
documentation: 0.8,
community: 0.5,
security: 0.9,
performance: 0.6,
features: 0.8,
license: 1.0
};
}
async evaluate(packageName, requirements) {
const metadata = await this.registry.fetchMetadata(packageName);
const scores = {
maintenance: this.evaluateMaintenance(metadata),
documentation: this.evaluateDocumentation(metadata),
community: this.evaluateCommunity(metadata),
security: await this.evaluateSecurity(metadata),
performance: this.evaluatePerformance(metadata, requirements),
features: this.evaluateFeatures(metadata, requirements),
license: this.evaluateLicense(metadata.license)
};
const weightedScore = Object.keys(scores).reduce(
(sum, key) => sum + (scores[key] * this.weights[key]),
0
) / Object.values(this.weights).reduce((sum, w) => sum + w, 0);
return {
scores,
weightedScore,
recommendation: this.generateRecommendation(scores, weightedScore),
rationale: this.generateRationale(scores, metadata, requirements)
};
}
evaluateMaintenance(metadata) {
const { releases = [], issues = {} } = metadata;
const recentReleases = releases.filter(r => this.isRecent(r.date, 6));
const { avgResolutionDays = 999, openCount = 999 } = issues;
let score = 0;
score += Math.min(3, recentReleases.length);
score += avgResolutionDays < 30 ? 4 : avgResolutionDays < 90 ? 2 : 0;
score += openCount < 50 ? 3 : 1;
return Math.min(10, score);
}
evaluateSecurity(metadata) {
const { security = {} } = metadata;
const { vulnerabilities = [], hasSecurityPolicy = false } = security;
// Critical failure for unpatched vulnerabilities
const criticalVulns = vulnerabilities.filter(v =>
['HIGH', 'CRITICAL'].includes(v.severity) && !v.patched
);
if (criticalVulns.length > 0) return 0;
let score = 10;
score -= vulnerabilities.filter(v => !v.patched).length * 2;
score += hasSecurityPolicy ? 2 : 0;
return Math.max(0, score);
}
generateRecommendation(scores, weightedScore) {
if (scores.license < 0.5) return "REJECT - License incompatible";
if (scores.security < 6) return "CAUTION - Security concerns";
if (weightedScore >= 7.5) return "APPROVED";
if (weightedScore >= 6) return "ACCEPTABLE - Monitor closely";
return "REQUIRES REVIEW";
}
}
public class DependencyEvaluator {
private final PackageRegistry registry;
private final Map<EvaluationCriteria, Double> weights;
public DependencyEvaluator(PackageRegistry registry) {
this.registry = registry;
this.weights = initializeWeights();
}
public EvaluationResult evaluate(String packageName, ProjectRequirements requirements) {
LibraryMetadata metadata = registry.fetchMetadata(packageName);
Map<EvaluationCriteria, Double> scores = new EnumMap<>(EvaluationCriteria.class);
scores.put(MAINTENANCE, evaluateMaintenance(metadata));
scores.put(DOCUMENTATION, evaluateDocumentation(metadata));
scores.put(COMMUNITY, evaluateCommunity(metadata));
scores.put(SECURITY, evaluateSecurity(metadata));
scores.put(PERFORMANCE, evaluatePerformance(metadata, requirements));
scores.put(FEATURES, evaluateFeatures(metadata, requirements));
scores.put(LICENSE, evaluateLicense(metadata.getLicense()));
double weightedScore = calculateWeightedScore(scores);
Recommendation recommendation = generateRecommendation(scores, weightedScore);
String rationale = generateRationale(scores, metadata, requirements);
return new EvaluationResult(scores, weightedScore, recommendation, rationale);
}
private double evaluateMaintenance(LibraryMetadata metadata) {
List<Release> recentReleases = metadata.getReleases().stream()
.filter(r -> isRecent(r.getDate(), 6))
.collect(Collectors.toList());
int avgResolutionDays = metadata.getIssues().getAvgResolutionDays();
int openCount = metadata.getIssues().getOpenCount();
double score = 0.0;
score += Math.min(3.0, recentReleases.size());
score += avgResolutionDays < 30 ? 4.0 : avgResolutionDays < 90 ? 2.0 : 0.0;
score += openCount < 50 ? 3.0 : 1.0;
return Math.min(10.0, score);
}
private Recommendation generateRecommendation(Map<EvaluationCriteria, Double> scores,
double weightedScore) {
if (scores.get(LICENSE) < 0.5) {
return Recommendation.REJECT_LICENSE;
}
if (scores.get(SECURITY) < 6.0) {
return Recommendation.CAUTION_SECURITY;
}
if (weightedScore >= 7.5) {
return Recommendation.APPROVED;
}
if (weightedScore >= 6.0) {
return Recommendation.ACCEPTABLE;
}
return Recommendation.REQUIRES_REVIEW;
}
}
Project Requirement Alignment¶
Before evaluating specific libraries, clearly document your project's requirements:
Requirements Assessment Template¶
| Category | Questions | Your Answers |
|---|---|---|
| Functional | What specific features are required? What are nice-to-have features? | |
| Performance | What are performance targets (latency, throughput)? What are acceptable resource limits? | |
| Deployment | What are deployment constraints? Is containerization required? | |
| Security | What security standards must be met? Are there compliance requirements? | |
| Compatibility | What existing systems must integrate? What versions must be supported? | |
| Team | What is the team's expertise level? How much learning time is acceptable? | |
| Maintenance | What maintenance effort is acceptable? Who will own this dependency? |
Example: HTTP Client Selection
Project: E-commerce API Integration
Requirements:
- Must support OAuth 2.0 authentication
- Need automatic retry with exponential backoff
- Target: < 100ms latency for typical requests
- Deploy in Docker containers
- Team has JavaScript expertise
- Must handle 1000 req/sec peak load
Evaluation: Compare axios vs got vs node-fetch against these criteria
Dependency Management Strategies¶
Version Pinning Strategy¶
Different environments require different version management approaches:
| Environment | Strategy | Format | Rationale |
|---|---|---|---|
| Production | Exact versions | package@1.2.3 | Predictability and stability |
| Staging | Exact versions | package@1.2.3 | Match production exactly |
| Development | Range (minor) | package@~1.2.0 | Allow safe updates |
| CI/CD | Exact versions | package@1.2.3 | Reproducible builds |
<dependencies>
<!-- Production - exact versions -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>3.2.0</version>
</dependency>
<!-- Test - range versions acceptable -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>[5.10.0,5.11.0)</version>
<scope>test</scope>
</dependency>
</dependencies>
The Anti-Dependency Principle¶
When NOT to Add a Dependency¶
Critical Assessment
Before adding any dependency, ask: "Can we reasonably implement this ourselves?"
Consider internal implementation when:
- Functionality is simple (< 100 lines of code)
- Standard library provides similar capability
- Dependency adds significant size/complexity
- Security requirements are strict
- Long-term maintenance is uncertain
Decision Matrix¶
| Implementation Complexity | Maintenance | Decision |
|---|---|---|
| < 50 LOC | Low | Implement internally |
| 50-200 LOC | Low-Medium | Evaluate dependency |
| 200-1000 LOC | Medium | Prefer dependency |
| > 1000 LOC | High | Use dependency |
Exception: Even simple functionality may warrant a dependency if it involves:
- Complex algorithms (e.g., crypto, compression)
- Security-critical operations
- Specialized domain knowledge
- Standards compliance (e.g., date/time handling)
Example: Dependency Audit¶
# dependency_audit.py - Identify candidates for removal
import subprocess
import json
def audit_dependencies():
"""Generate report of potentially unnecessary dependencies."""
# Get dependency tree
result = subprocess.run(['pip', 'list', '--format=json'],
capture_output=True, text=True)
packages = json.loads(result.stdout)
candidates_for_removal = []
for pkg in packages:
# Check if dependency is actually used
usage_count = count_imports(pkg['name'])
if usage_count == 0:
candidates_for_removal.append({
'name': pkg['name'],
'reason': 'Not imported anywhere',
'action': 'Remove'
})
elif is_simple_functionality(pkg['name']):
candidates_for_removal.append({
'name': pkg['name'],
'reason': 'Simple functionality (<100 LOC)',
'action': 'Consider internal implementation'
})
return candidates_for_removal
Approval Workflow¶
Establish a clear process for dependency approval:
graph TD
A[Developer Identifies Need] --> B[Document Requirements]
B --> C[Research Alternatives]
C --> D[Complete Evaluation Scorecard]
D --> E{Score >= 7.5?}
E -->|No| F[Team Review Required]
E -->|Yes| G{Critical Dependency?}
G -->|Yes| F
G -->|No| H[Tech Lead Approval]
F --> I[Architecture Review]
I --> J{Approved?}
J -->|Yes| K[Document Decision]
J -->|No| L[Find Alternative]
H --> K
K --> M[Add to Registry]
M --> N[Integrate] Approval Levels¶
| Dependency Type | Approval Required | Review Criteria |
|---|---|---|
| Utility (<1MB, non-critical) | Tech Lead | Scorecard only |
| Framework (>1MB or critical path) | Architecture Team | Full evaluation + POC |
| Security-related | Security Team | Security audit required |
| GPL/AGPL Licensed | Legal Team | License compatibility review |
Common Pitfalls¶
What to Avoid¶
Dependency Hell
Adding dependencies without evaluation leads to:
- Conflicting version requirements
- Large deployment artifacts
- Complex update processes
- Security vulnerabilities
Resume-Driven Development
Choosing libraries based on popularity rather than fit results in:
- Over-engineering for actual needs
- Unnecessary complexity
- Maintenance burden
Vendor Lock-in
Deep integration without abstraction makes it:
- Impossible to upgrade
- Expensive to replace
- Risky for business continuity
Best Practices¶
- Document all decisions in ADRs (Architecture Decision Records)
- Set up dependency scanning in CI/CD pipelines
- Review dependencies quarterly for updates and relevance
- Assign owners for each major dependency
- Create proof-of-concepts for critical dependencies
- Maintain alternatives list for key dependencies
Integration Patterns¶
Overview¶
Poor integration practices are a leading cause of technical debt. Libraries integrated without proper abstraction become difficult to:
- Upgrade - Breaking changes cascade through the codebase
- Replace - Migration requires widespread refactoring
- Test - Business logic is tightly coupled to external implementations
- Debug - Library behavior leaks into application logic
This section provides proven patterns for clean, maintainable library integration.
Core Integration Principles¶
1. Abstraction and Encapsulation¶
Principle
External library usage should be encapsulated behind domain-specific abstractions to isolate the codebase from dependency changes.
Why it matters:
- Enables library replacement without cascading changes
- Simplifies testing through clear boundaries
- Prevents implementation details from leaking
- Maintains architectural control
Implementation approach:
graph LR
A[Application Code] --> B[Domain Interface]
B --> C[Adapter/Wrapper]
C --> D[External Library]
style A fill:#e1f5e1
style B fill:#fff3cd
style C fill:#cfe2ff
style D fill:#f8d7da Example: HTTP Client Abstraction¶
# BAD: Direct library usage scattered across codebase
import requests
def get_user_data(user_id):
response = requests.get(f"https://api.example.com/users/{user_id}")
if response.status_code == 200:
return response.json()
raise Exception(f"Failed: {response.status_code}")
def create_user(user_data):
response = requests.post("https://api.example.com/users", json=user_data)
if response.status_code == 201:
return response.json()
raise Exception(f"Failed: {response.status_code}")
Problems:
- Library directly called from multiple locations
- Error handling inconsistent
- Hard to mock for testing
- Switching libraries requires changes everywhere
# GOOD: Abstracted behind interface
from abc import ABC, abstractmethod
import requests
class UserServiceError(Exception):
"""Domain-specific exception."""
pass
class UserServiceClient(ABC):
"""Abstract interface for user service interactions."""
@abstractmethod
def get_user(self, user_id: str) -> dict:
"""Fetch user by ID."""
pass
@abstractmethod
def create_user(self, user_data: dict) -> dict:
"""Create a new user."""
pass
class HttpUserServiceClient(UserServiceClient):
"""HTTP implementation using requests library."""
def __init__(self, base_url: str, auth_token: str = None, timeout: int = 10):
self.base_url = base_url
self.timeout = timeout
self.session = requests.Session()
if auth_token:
self.session.headers.update({
"Authorization": f"Bearer {auth_token}"
})
def get_user(self, user_id: str) -> dict:
try:
response = self.session.get(
f"{self.base_url}/users/{user_id}",
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise UserServiceError(f"Failed to fetch user {user_id}: {str(e)}")
def create_user(self, user_data: dict) -> dict:
try:
response = self.session.post(
f"{self.base_url}/users",
json=user_data,
timeout=self.timeout
)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
raise UserServiceError(f"Failed to create user: {str(e)}")
# Usage with dependency injection
def process_user_workflow(user_client: UserServiceClient):
"""Business logic depends on abstraction, not implementation."""
user = user_client.get_user("123")
# ... business logic
return user
# Easy to test with mock
class MockUserServiceClient(UserServiceClient):
def get_user(self, user_id: str) -> dict:
return {"id": user_id, "name": "Test User"}
def create_user(self, user_data: dict) -> dict:
return {"id": "new_id", **user_data}
Benefits:
- Single point of change for library upgrades
- Consistent error handling
- Easy to mock for testing
- Can swap implementations without touching business logic
Integration Patterns¶
Pattern 1: The Adapter Pattern¶
Use when: You need to make an external library's interface match your domain model.
Structure:
# Domain interface (what your application expects)
class PaymentProcessor(ABC):
@abstractmethod
def process_payment(self, payment: Payment) -> PaymentResult:
pass
# Adapter for Stripe
class StripeAdapter(PaymentProcessor):
def __init__(self, stripe_client):
self._stripe = stripe_client
def process_payment(self, payment: Payment) -> PaymentResult:
# Translate domain model to Stripe's model
stripe_intent = self._create_stripe_intent(payment)
# Call Stripe
result = self._stripe.charge(stripe_intent)
# Translate Stripe's response to domain model
return self._to_payment_result(result)
def _create_stripe_intent(self, payment: Payment):
"""Transform domain Payment to Stripe PaymentIntent."""
return {
'amount': int(payment.amount * 100), # Convert to cents
'currency': payment.currency.lower(),
'payment_method': payment.method_id
}
def _to_payment_result(self, stripe_response) -> PaymentResult:
"""Transform Stripe response to domain PaymentResult."""
return PaymentResult(
success=stripe_response.status == 'succeeded',
transaction_id=stripe_response.id,
amount=stripe_response.amount / 100, # Convert from cents
timestamp=datetime.fromtimestamp(stripe_response.created)
)
Key points:
- Domain models never reference library-specific types
- Transformation logic isolated in adapter
- Easy to add adapters for other payment processors
Pattern 2: The Facade Pattern¶
Use when: You need to simplify a complex library interface.
Example: Simplifying AWS S3 Operations
class StorageService:
"""Simplified facade for cloud storage operations."""
def __init__(self, bucket: str, region: str = 'us-east-1'):
self.bucket = bucket
self._s3 = boto3.client('s3', region_name=region)
def upload_file(self, content: bytes, path: str,
content_type: str = 'application/octet-stream') -> str:
"""Upload file with sensible defaults."""
self._s3.put_object(
Bucket=self.bucket,
Key=path,
Body=content,
ContentType=content_type,
ServerSideEncryption='AES256',
StorageClass='STANDARD_IA' # Cost-optimized default
)
return f"s3://{self.bucket}/{path}"
def download_file(self, path: str) -> bytes:
"""Download file content."""
response = self._s3.get_object(Bucket=self.bucket, Key=path)
return response['Body'].read()
def delete_file(self, path: str) -> bool:
"""Delete file."""
self._s3.delete_object(Bucket=self.bucket, Key=path)
return True
def list_files(self, prefix: str = '') -> list[str]:
"""List files with optional prefix filter."""
response = self._s3.list_objects_v2(
Bucket=self.bucket,
Prefix=prefix
)
return [obj['Key'] for obj in response.get('Contents', [])]
# Usage is much simpler
storage = StorageService('my-bucket')
storage.upload_file(content, 'path/to/file.txt', 'text/plain')
files = storage.list_files('path/to/')
Pattern 3: The Repository Pattern¶
Use when: Abstracting data access libraries (ORMs, database clients).
Example: Database Access Abstraction
from abc import ABC, abstractmethod
from typing import List, Optional
class UserRepository(ABC):
"""Domain-agnostic data access interface."""
@abstractmethod
def find_by_id(self, user_id: str) -> Optional[User]:
pass
@abstractmethod
def find_by_email(self, email: str) -> Optional[User]:
pass
@abstractmethod
def find_all(self, limit: int = 100, offset: int = 0) -> List[User]:
pass
@abstractmethod
def save(self, user: User) -> User:
pass
@abstractmethod
def delete(self, user_id: str) -> bool:
pass
from sqlalchemy.orm import Session
class SqlAlchemyUserRepository(UserRepository):
"""SQLAlchemy implementation of UserRepository."""
def __init__(self, session: Session):
self.session = session
def find_by_id(self, user_id: str) -> Optional[User]:
db_user = self.session.query(UserModel).filter(
UserModel.id == user_id
).first()
return self._to_domain(db_user) if db_user else None
def find_by_email(self, email: str) -> Optional[User]:
db_user = self.session.query(UserModel).filter(
UserModel.email == email
).first()
return self._to_domain(db_user) if db_user else None
def save(self, user: User) -> User:
db_user = self._to_model(user)
self.session.add(db_user)
self.session.commit()
self.session.refresh(db_user)
return self._to_domain(db_user)
def _to_domain(self, model: UserModel) -> User:
"""Convert ORM model to domain object."""
return User(
id=model.id,
email=model.email,
name=model.name,
created_at=model.created_at
)
def _to_model(self, user: User) -> UserModel:
"""Convert domain object to ORM model."""
return UserModel(
id=user.id,
email=user.email,
name=user.name
)
from pymongo import MongoClient
class MongoUserRepository(UserRepository):
"""MongoDB implementation of UserRepository."""
def __init__(self, client: MongoClient, database: str):
self.db = client[database]
self.users = self.db.users
def find_by_id(self, user_id: str) -> Optional[User]:
doc = self.users.find_one({"_id": user_id})
return self._to_domain(doc) if doc else None
def find_by_email(self, email: str) -> Optional[User]:
doc = self.users.find_one({"email": email})
return self._to_domain(doc) if doc else None
def save(self, user: User) -> User:
doc = self._to_document(user)
self.users.replace_one(
{"_id": user.id},
doc,
upsert=True
)
return user
def _to_domain(self, doc: dict) -> User:
"""Convert MongoDB document to domain object."""
return User(
id=doc['_id'],
email=doc['email'],
name=doc['name'],
created_at=doc['created_at']
)
def _to_document(self, user: User) -> dict:
"""Convert domain object to MongoDB document."""
return {
'_id': user.id,
'email': user.email,
'name': user.name,
'created_at': user.created_at
}
Benefits:
- Application code doesn't know about SQLAlchemy or MongoDB
- Can switch databases without changing business logic
- Easy to test with in-memory implementations
- Can use different databases in different environments
Version Management¶
Semantic Versioning Strategy¶
Understanding SemVer
Version format: MAJOR.MINOR.PATCH (e.g., 2.1.3)
- MAJOR: Breaking changes (incompatible API changes)
- MINOR: New features (backward compatible)
- PATCH: Bug fixes (backward compatible)
Update strategy by version type:
| Update Type | Risk Level | Strategy | Approval |
|---|---|---|---|
| Patch (1.2.3 → 1.2.4) | Low | Auto-update with testing | Automated |
| Minor (1.2.x → 1.3.0) | Medium | Scheduled with review | Tech Lead |
| Major (1.x.x → 2.0.0) | High | Planned migration | Architecture Team |
Version Update Workflow¶
graph TD
A[Dependency Update Available] --> B{Update Type?}
B -->|Patch| C[Run Automated Tests]
B -->|Minor| D[Review Changelog]
B -->|Major| E[Review Breaking Changes]
C --> F{Tests Pass?}
F -->|Yes| G[Auto-deploy to Staging]
F -->|No| H[Manual Investigation]
D --> I[Test in Development]
I --> J{Regression Issues?}
J -->|No| K[Schedule Staging Deploy]
J -->|Yes| L[Document Issues]
E --> M[Create Migration Plan]
M --> N[Proof of Concept]
N --> O[Team Review]
O --> P[Scheduled Migration] Automated Version Management¶
# dependency_updater.py
import subprocess
import json
from typing import List, Dict
class DependencyUpdater:
def __init__(self):
self.updates_by_type = {
'patch': [],
'minor': [],
'major': []
}
def check_updates(self) -> Dict[str, List]:
"""Check for available updates and categorize by type."""
result = subprocess.run(
['pip', 'list', '--outdated', '--format=json'],
capture_output=True,
text=True
)
outdated = json.loads(result.stdout)
for pkg in outdated:
update_type = self._determine_update_type(
pkg['version'],
pkg['latest_version']
)
self.updates_by_type[update_type].append(pkg)
return self.updates_by_type
def apply_patch_updates(self) -> bool:
"""Automatically apply low-risk patch updates."""
if not self.updates_by_type['patch']:
print("No patch updates available")
return True
packages = [pkg['name'] for pkg in self.updates_by_type['patch']]
try:
subprocess.run(
['pip', 'install', '--upgrade'] + packages,
check=True
)
print(f"Applied {len(packages)} patch updates")
return True
except subprocess.CalledProcessError as e:
print(f"Failed to apply updates: {e}")
return False
def generate_minor_update_plan(self) -> str:
"""Generate review plan for minor updates."""
if not self.updates_by_type['minor']:
return "No minor updates available"
plan = ["# Minor Update Plan\n"]
plan.append("## Packages to Review:\n")
for pkg in self.updates_by_type['minor']:
plan.append(f"- **{pkg['name']}**: {pkg['version']} - {pkg['latest_version']}")
plan.append(f" - Check changelog at: {self._get_changelog_url(pkg['name'])}")
plan.append(f" - Test in development environment")
plan.append("")
return "\n".join(plan)
def generate_major_update_plan(self) -> str:
"""Generate migration plan for major updates."""
if not self.updates_by_type['major']:
return "No major updates available"
plan = ["# Major Update Migration Plan\n"]
plan.append("**These updates require careful planning**\n")
for pkg in self.updates_by_type['major']:
plan.append(f"## {pkg['name']}: {pkg['version']} → {pkg['latest_version']}\n")
plan.append("### Migration Steps:")
plan.append("1. Review breaking changes in changelog")
plan.append("2. Create feature branch for migration")
plan.append("3. Update code to handle breaking changes")
plan.append("4. Update tests")
plan.append("5. Test thoroughly in staging")
plan.append("6. Document changes")
plan.append("7. Schedule production deployment\n")
return "\n".join(plan)
// dependency-updater.js
const { execSync } = require('child_process');
const semver = require('semver');
class DependencyUpdater {
constructor() {
this.updatesByType = {
patch: [],
minor: [],
major: []
};
}
checkUpdates() {
try {
const output = execSync('npm outdated --json', { encoding: 'utf8' });
const outdated = JSON.parse(output || '{}');
Object.entries(outdated).forEach(([name, info]) => {
const updateType = this.determineUpdateType(
info.current,
info.latest
);
this.updatesByType[updateType].push({ name, ...info });
});
return this.updatesByType;
} catch (error) {
console.error('Error checking updates:', error.message);
return this.updatesByType;
}
}
determineUpdateType(current, latest) {
try {
if (semver.major(latest) > semver.major(current)) {
return 'major';
}
if (semver.minor(latest) > semver.minor(current)) {
return 'minor';
}
return 'patch';
} catch {
return 'unknown';
}
}
applyPatchUpdates() {
const patches = this.updatesByType.patch;
if (patches.length === 0) {
console.log('No patch updates available');
return true;
}
const packages = patches.map(p => `${p.name}@${p.latest}`).join(' ');
try {
execSync(`npm install ${packages}`, { stdio: 'inherit' });
console.log(`Applied ${patches.length} patch updates`);
return true;
} catch (error) {
console.error('Failed to apply updates:', error.message);
return false;
}
}
generateMinorUpdatePlan() {
const minors = this.updatesByType.minor;
if (minors.length === 0) {
return 'No minor updates available';
}
let plan = '# Minor Update Plan\n\n';
plan += '## Packages to Review:\n\n';
minors.forEach(pkg => {
plan += `- **${pkg.name}**: ${pkg.current} → ${pkg.latest}\n`;
plan += ` - Review changelog\n`;
plan += ` - Test in development\n\n`;
});
return plan;
}
}
module.exports = DependencyUpdater;
Breaking Change Management¶
Detecting Breaking Changes¶
Critical Process
Always review changelogs and migration guides before updating dependencies, especially for major version changes.
Pre-update checklist:
- Read release notes and changelog
- Review migration guide (if provided)
- Check for deprecated APIs you're using
- Identify breaking changes affecting your code
- Assess impact on dependent services
- Plan rollback strategy
Compatibility Layers¶
When facing breaking changes, create temporary compatibility layers:
# compatibility_layer.py
import warnings
from functools import wraps
class CompatibilityLayer:
"""Manages transitions during library upgrades."""
def __init__(self, lib_name: str, old_version: str, new_version: str):
self.lib_name = lib_name
self.old_version = old_version
self.new_version = new_version
self._warnings_shown = set()
def deprecated_function(self, old_name: str, new_func):
"""Decorator for renamed functions."""
@wraps(new_func)
def wrapper(*args, **kwargs):
if old_name not in self._warnings_shown:
warnings.warn(
f"'{old_name}' is deprecated in {self.lib_name} "
f"{self.new_version}, use '{new_func.__name__}' instead",
DeprecationWarning,
stacklevel=2
)
self._warnings_shown.add(old_name)
return new_func(*args, **kwargs)
return wrapper
def transform_response(self, transform_func):
"""Decorator to transform new response format to old format."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
result = func(*args, **kwargs)
return transform_func(result)
return wrapper
return decorator
# Usage example
compat = CompatibilityLayer("example_lib", "1.0", "2.0")
# Function was renamed in v2
@compat.deprecated_function("get_stats", new_get_user_stats)
def get_stats(start_date, end_date):
# Adapt old signature to new function
return new_get_user_stats(
user_id=None, # Default for backward compatibility
date_range={'start': start_date, 'end': end_date}
)
# Response format changed in v2
@compat.transform_response(lambda new_data: new_data['user'])
def fetch_user(user_id):
# v2 returns nested structure, we flatten for backward compatibility
return fetch_user_v2(user_id)
Testing Integration Points¶
Test Strategy¶
Multi-Layer Testing
Test library integrations at multiple levels to ensure reliability.
| Test Type | Purpose | Example |
|---|---|---|
| Unit Tests | Test adapter logic in isolation | Mock library, test transformations |
| Integration Tests | Test actual library behavior | Real library calls with test data |
| Contract Tests | Verify library API hasn't changed | Assert response structure |
| Performance Tests | Ensure acceptable performance | Load testing with real library |
Example Test Suite¶
# test_user_client.py
import pytest
from unittest.mock import Mock, patch
from user_client import HttpUserServiceClient, UserServiceError
def test_get_user_success():
"""Test successful user retrieval."""
# Arrange
mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 200
mock_response.json.return_value = {'id': '123', 'name': 'Test'}
mock_session.get.return_value = mock_response
client = HttpUserServiceClient('http://api.test', 'token')
client.session = mock_session
# Act
result = client.get_user('123')
# Assert
assert result['id'] == '123'
assert result['name'] == 'Test'
mock_session.get.assert_called_once()
def test_get_user_not_found():
"""Test handling of 404 response."""
mock_session = Mock()
mock_response = Mock()
mock_response.status_code = 404
mock_response.raise_for_status.side_effect = requests.HTTPError()
mock_session.get.return_value = mock_response
client = HttpUserServiceClient('http://api.test', 'token')
client.session = mock_session
with pytest.raises(UserServiceError):
client.get_user('999')
# test_user_client_integration.py
import pytest
import responses
from user_client import HttpUserServiceClient
@responses.activate
def test_get_user_integration():
"""Test against actual HTTP behavior."""
# Setup mock HTTP server
responses.add(
responses.GET,
'http://api.test/users/123',
json={'id': '123', 'name': 'Integration Test'},
status=200
)
# Use real client (it will hit mocked endpoint)
client = HttpUserServiceClient('http://api.test', 'token')
result = client.get_user('123')
assert result['id'] == '123'
assert result['name'] == 'Integration Test'
# Verify request was made correctly
assert len(responses.calls) == 1
assert responses.calls[0].request.headers['Authorization'] == 'Bearer token'
# test_user_api_contract.py
import pytest
from user_client import HttpUserServiceClient
@pytest.mark.contract
def test_user_response_structure():
"""Verify API response structure hasn't changed."""
client = HttpUserServiceClient('http://api.test', 'token')
user = client.get_user('123')
# Assert contract: response must have these fields
assert 'id' in user
assert 'name' in user
assert 'email' in user
assert 'created_at' in user
# Assert types
assert isinstance(user['id'], str)
assert isinstance(user['name'], str)
assert isinstance(user['email'], str)
# If this test fails after a library update,
# the API contract may have changed
Lifecycle Management¶
Overview¶
Dependencies aren't "set and forget"—they require active management throughout their lifecycle. This section covers:
- Tracking - Maintaining a comprehensive dependency inventory
- Monitoring - Watching for security issues and updates
- Maintaining - Keeping dependencies current and healthy
- Replacing - Systematic migration when necessary
Technical Debt Risk
Neglecting dependency lifecycle management leads to:
- Accumulated security vulnerabilities
- Difficult or impossible upgrades
- Legacy systems that can't evolve
- Increased operational costs
Lifecycle Phases¶
stateDiagram-v2
[*] --> Evaluation
Evaluation --> Adoption: Approved
Evaluation --> [*]: Rejected
Adoption --> Active: Integrated
Active --> Maintained: Regular updates
Maintained --> Active: Still useful
Maintained --> DeprecationPlanning: Issues identified
DeprecationPlanning --> Migration: Plan approved
Migration --> Deprecated: Replacement complete
Deprecated --> Decommissioned: Fully removed
Decommissioned --> [*] Phase Details¶
| Phase | Duration | Key Activities | Owner |
|---|---|---|---|
| Evaluation | 1-2 weeks | Research, scoring, POC | Developer + Tech Lead |
| Adoption | 1-2 sprints | Integration, documentation, testing | Development Team |
| Active | Ongoing | Regular usage, monitoring | Assigned Owner |
| Maintained | Ongoing | Updates, security patches | Assigned Owner |
| Deprecation Planning | 2-4 weeks | Alternative evaluation, impact analysis | Architecture Team |
| Migration | 1-3 months | Implementation, testing, rollout | Development Team |
| Decommissioned | Complete | Archive documentation, cleanup | DevOps Team |
Dependency Registry¶
Registry Structure¶
Maintain a central registry tracking all dependencies:
# Dependency Registry
Last updated: 2025-10-31
## Active Dependencies
### fastapi (v0.104.1)
- **Purpose**: Web framework for REST API
- **Owner**: @backend-team
- **Status**: ACTIVE
- **Adoption Date**: 2024-01-15
- **Last Review**: 2025-10-15
- **Next Review**: 2026-01-15
- **Security Risk**: LOW
- **Alternatives Considered**: Flask, Django REST
- **Rationale**: Modern async support, automatic OpenAPI generation
- **Usage Locations**:
- `src/api/`
- `src/routes/`
### requests (v2.31.0)
- **Purpose**: HTTP client for external API calls
- **Owner**: @integration-team
- **Status**: MAINTAINED
- **Adoption Date**: 2023-06-01
- **Last Review**: 2025-09-01
- **Next Review**: 2026-03-01
- **Security Risk**: LOW
- **Known Issues**: None
- **Usage Locations**:
- `src/integrations/payment/`
- `src/integrations/shipping/`
## Deprecated Dependencies
### flask (v2.0.3)
- **Status**: DEPRECATED
- **Deprecation Date**: 2024-12-01
- **Replacement**: fastapi
- **Migration Deadline**: 2025-03-31
- **Migration Owner**: @backend-team
- **Migration Plan**: See docs/migrations/flask-to-fastapi.md
{
"registry_version": "1.0",
"last_updated": "2025-10-31",
"dependencies": {
"fastapi": {
"version": "0.104.1",
"purpose": "Web framework for REST API",
"owner": "backend-team",
"owner_email": "backend@company.com",
"status": "ACTIVE",
"adoption_date": "2024-01-15",
"last_review": "2025-10-15",
"next_review": "2026-01-15",
"security_risk": "LOW",
"known_vulnerabilities": [],
"alternatives": ["Flask", "Django REST Framework"],
"adoption_rationale": "Modern async support, automatic OpenAPI docs",
"usage_locations": [
"src/api/",
"src/routes/"
],
"tags": ["web", "api", "async"]
},
"requests": {
"version": "2.31.0",
"purpose": "HTTP client for external APIs",
"owner": "integration-team",
"owner_email": "integrations@company.com",
"status": "MAINTAINED",
"adoption_date": "2023-06-01",
"last_review": "2025-09-01",
"next_review": "2026-03-01",
"security_risk": "LOW",
"known_vulnerabilities": [],
"usage_locations": [
"src/integrations/payment/",
"src/integrations/shipping/"
],
"tags": ["http", "integration"]
}
}
}
Registry Management Tool¶
# dependency_registry.py
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from typing import List, Optional
from enum import Enum
class DependencyStatus(Enum):
EVALUATION = "EVALUATION"
ACTIVE = "ACTIVE"
MAINTAINED = "MAINTAINED"
DEPRECATING = "DEPRECATING"
DECOMMISSIONED = "DECOMMISSIONED"
class SecurityRisk(Enum):
LOW = "LOW"
MEDIUM = "MEDIUM"
HIGH = "HIGH"
CRITICAL = "CRITICAL"
@dataclass
class Dependency:
name: str
version: str
purpose: str
owner: str
status: DependencyStatus
adoption_date: datetime
last_review: datetime
next_review: datetime
security_risk: SecurityRisk
known_vulnerabilities: List[dict]
alternatives: List[str]
adoption_rationale: str
usage_locations: List[str]
tags: List[str]
replacement_plan: Optional[str] = None
class DependencyRegistry:
"""Manages the dependency registry."""
def __init__(self, registry_file: str = "DEPENDENCIES.json"):
self.registry_file = registry_file
self.dependencies = {}
self.load()
def load(self):
"""Load registry from file."""
try:
with open(self.registry_file, 'r') as f:
data = json.load(f)
for name, dep_data in data.get('dependencies', {}).items():
self.dependencies[name] = self._deserialize_dependency(dep_data)
except FileNotFoundError:
print(f"Registry file not found. Starting with empty registry.")
def save(self):
"""Save registry to file."""
data = {
"registry_version": "1.0",
"last_updated": datetime.now().isoformat(),
"dependencies": {
name: self._serialize_dependency(dep)
for name, dep in self.dependencies.items()
}
}
with open(self.registry_file, 'w') as f:
json.dump(data, f, indent=2)
def add_dependency(self, dependency: Dependency):
"""Add a new dependency to the registry."""
if dependency.name in self.dependencies:
raise ValueError(f"Dependency {dependency.name} already exists")
self.dependencies[dependency.name] = dependency
self.save()
print(f"Added dependency: {dependency.name}")
def update_dependency(self, name: str, **updates):
"""Update an existing dependency."""
if name not in self.dependencies:
raise ValueError(f"Dependency {name} not found")
dep = self.dependencies[name]
for key, value in updates.items():
if hasattr(dep, key):
setattr(dep, key, value)
self.save()
print(f"Updated dependency: {name}")
def deprecate_dependency(self, name: str, replacement: str, deadline: datetime):
"""Mark a dependency for deprecation."""
if name not in self.dependencies:
raise ValueError(f"Dependency {name} not found")
self.dependencies[name].status = DependencyStatus.DEPRECATING
self.dependencies[name].replacement_plan = (
f"Replace with {replacement} by {deadline.date()}"
)
self.save()
print(f"Deprecated dependency: {name} → {replacement}")
def decommission_dependency(self, name: str):
"""Mark a dependency as fully decommissioned."""
if name not in self.dependencies:
raise ValueError(f"Dependency {name} not found")
self.dependencies[name].status = DependencyStatus.DECOMMISSIONED
self.save()
print(f"Decommissioned dependency: {name}")
def get_dependencies_needing_review(self) -> List[Dependency]:
"""Get dependencies that need review."""
now = datetime.now()
return [
dep for dep in self.dependencies.values()
if dep.next_review <= now and dep.status != DependencyStatus.DECOMMISSIONED
]
def get_dependencies_by_risk(self, risk: SecurityRisk) -> List[Dependency]:
"""Get dependencies with specified security risk level."""
return [
dep for dep in self.dependencies.values()
if dep.security_risk == risk and dep.status != DependencyStatus.DECOMMISSIONED
]
def get_vulnerable_dependencies(self) -> List[Dependency]:
"""Get dependencies with known vulnerabilities."""
return [
dep for dep in self.dependencies.values()
if len(dep.known_vulnerabilities) > 0 and
dep.status != DependencyStatus.DECOMMISSIONED
]
def generate_health_report(self) -> dict:
"""Generate a health report of the dependency ecosystem."""
active_deps = [d for d in self.dependencies.values()
if d.status in [DependencyStatus.ACTIVE, DependencyStatus.MAINTAINED]]
return {
"total_dependencies": len(active_deps),
"needs_review": len(self.get_dependencies_needing_review()),
"high_risk": len(self.get_dependencies_by_risk(SecurityRisk.HIGH)),
"critical_risk": len(self.get_dependencies_by_risk(SecurityRisk.CRITICAL)),
"with_vulnerabilities": len(self.get_vulnerable_dependencies()),
"deprecating": len([d for d in self.dependencies.values()
if d.status == DependencyStatus.DEPRECATING]),
"health_score": self._calculate_health_score()
}
def _calculate_health_score(self) -> float:
"""Calculate overall health score (0-100)."""
if not self.dependencies:
return 100.0
active_deps = [d for d in self.dependencies.values()
if d.status in [DependencyStatus.ACTIVE, DependencyStatus.MAINTAINED]]
if not active_deps:
return 100.0
penalties = 0
# Penalty for overdue reviews
overdue = len(self.get_dependencies_needing_review())
penalties += overdue * 5
# Penalty for vulnerabilities
vulns = len(self.get_vulnerable_dependencies())
penalties += vulns * 10
# Penalty for high/critical risk
high_risk = len(self.get_dependencies_by_risk(SecurityRisk.HIGH))
critical_risk = len(self.get_dependencies_by_risk(SecurityRisk.CRITICAL))
penalties += high_risk * 15
penalties += critical_risk * 25
score = max(0, 100 - penalties)
return score
def _serialize_dependency(self, dep: Dependency) -> dict:
"""Convert dependency to JSON-serializable dict."""
data = asdict(dep)
data['status'] = dep.status.value
data['security_risk'] = dep.security_risk.value
data['adoption_date'] = dep.adoption_date.isoformat()
data['last_review'] = dep.last_review.isoformat()
data['next_review'] = dep.next_review.isoformat()
return data
def _deserialize_dependency(self, data: dict) -> Dependency:
"""Convert dict to Dependency object."""
data['status'] = DependencyStatus(data['status'])
data['security_risk'] = SecurityRisk(data['security_risk'])
data['adoption_date'] = datetime.fromisoformat(data['adoption_date'])
data['last_review'] = datetime.fromisoformat(data['last_review'])
data['next_review'] = datetime.fromisoformat(data['next_review'])
return Dependency(**data)
# CLI interface
if __name__ == "__main__":
import sys
registry = DependencyRegistry()
if len(sys.argv) < 2:
print("Usage: python dependency_registry.py [command]")
print("Commands: report, review, vulnerable")
sys.exit(1)
command = sys.argv[1]
if command == "report":
report = registry.generate_health_report()
print("\n=== Dependency Health Report ===")
print(f"Total Dependencies: {report['total_dependencies']}")
print(f"Needs Review: {report['needs_review']}")
print(f"High Risk: {report['high_risk']}")
print(f"Critical Risk: {report['critical_risk']}")
print(f"With Vulnerabilities: {report['with_vulnerabilities']}")
print(f"Deprecating: {report['deprecating']}")
print(f"\nHealth Score: {report['health_score']:.1f}/100")
elif command == "review":
deps = registry.get_dependencies_needing_review()
print(f"\n=== Dependencies Needing Review ({len(deps)}) ===")
for dep in deps:
print(f"- {dep.name} (Owner: {dep.owner})")
print(f" Last review: {dep.last_review.date()}")
elif command == "vulnerable":
deps = registry.get_vulnerable_dependencies()
print(f"\n=== Dependencies with Vulnerabilities ({len(deps)}) ===")
for dep in deps:
print(f"- {dep.name} ({len(dep.known_vulnerabilities)} vulnerabilities)")
for vuln in dep.known_vulnerabilities:
print(f"{vuln.get('id', 'Unknown')}: {vuln.get('severity', 'Unknown')}")
Security Monitoring¶
Automated Vulnerability Scanning¶
Critical Process
Automated vulnerability scanning should run on every build and at least daily in production.
Integration points:
- Pre-commit hooks - Prevent committing known vulnerable dependencies
- CI/CD pipeline - Block builds with critical vulnerabilities
- Scheduled scans - Daily checks for newly discovered issues
- Dependency updates - Automatic PRs for security patches
# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
schedule:
- cron: '0 0 * * *' # Daily at midnight
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install safety
pip install -r requirements.txt
- name: Run Safety check
run: safety check --json --output safety-report.json
continue-on-error: true
- name: Analyze results
run: |
python scripts/analyze_vulnerabilities.py safety-report.json
- name: Upload report
uses: actions/upload-artifact@v3
with:
name: security-report
path: safety-report.json
- name: Notify on critical vulnerabilities
if: failure()
uses: 8398a7/action-slack@v3
with:
status: ${{ job.status }}
text: 'Critical security vulnerabilities detected!'
webhook_url: ${{ secrets.SLACK_WEBHOOK }}
# .github/workflows/security-scan.yml
name: Security Scan
on:
push:
branches: [main, develop]
pull_request:
branches: [main, develop]
schedule:
- cron: '0 0 * * *'
jobs:
security-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Node.js
uses: actions/setup-node@v3
with:
node-version: '18'
- name: Install dependencies
run: npm ci
- name: Run npm audit
run: npm audit --json > audit-report.json
continue-on-error: true
- name: Check for critical vulnerabilities
run: |
CRITICAL=$(npm audit --json | jq '.metadata.vulnerabilities.critical')
if [ "$CRITICAL" -gt 0 ]; then
echo "::error::Found $CRITICAL critical vulnerabilities"
exit 1
fi
- name: Generate readable report
run: npm audit --audit-level=moderate
<!-- pom.xml -->
<plugin>
<groupId>org.owasp</groupId>
<artifactId>dependency-check-maven</artifactId>
<version>8.4.0</version>
<configuration>
<failBuildOnCVSS>7</failBuildOnCVSS>
<format>ALL</format>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
Vulnerability Response Workflow¶
graph TD
A[Vulnerability Detected] --> B{Severity?}
B -->|Critical| C[Immediate Action Required]
B -->|High| D[Urgent - 48 Hours]
B -->|Medium| E[Standard - 1 Week]
B -->|Low| F[Scheduled - Next Sprint]
C --> G[Notify Security Team]
G --> H[Assess Impact]
H --> I{Patch Available?}
I -->|Yes| J[Apply Patch]
I -->|No| K[Implement Workaround]
J --> L[Emergency Deploy]
K --> M[Plan Migration]
D --> H
E --> N[Create Ticket]
F --> N
N --> O[Schedule Fix]
O --> P[Regular Deploy] Update Workflows¶
Regular Update Schedule¶
| Frequency | Focus | Process |
|---|---|---|
| Daily | Security patches | Automated PR creation |
| Weekly | Patch updates | Review and merge |
| Monthly | Minor updates | Testing and deployment |
| Quarterly | Major updates | Planning and migration |
Automated Update Management¶
Using tools like Dependabot or Renovate:
# .github/dependabot.yml
version: 2
updates:
# Python dependencies
- package-ecosystem: "pip"
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 10
reviewers:
- "backend-team"
labels:
- "dependencies"
- "python"
# Group patch updates
groups:
patch-updates:
patterns:
- "*"
update-types:
- "patch"
# Security updates
allow:
- dependency-type: "direct"
- dependency-type: "indirect"
update-types: ["security"]
# JavaScript dependencies
- package-ecosystem: "npm"
directory: "/"
schedule:
interval: "daily"
open-pull-requests-limit: 10
reviewers:
- "frontend-team"
labels:
- "dependencies"
- "javascript"
versioning-strategy: increase
{
"extends": ["config:base"],
"schedule": ["before 3am on Monday"],
"timezone": "Kenya/Nairobi",
"labels": ["dependencies"],
"assignees": ["@backend-team"],
"reviewers": ["@tech-lead"],
"packageRules": [
{
"matchUpdateTypes": ["patch"],
"groupName": "patch dependencies",
"automerge": true,
"automergeType": "pr",
"platformAutomerge": true
},
{
"matchUpdateTypes": ["minor"],
"groupName": "minor dependencies",
"schedule": ["before 3am on first day of month"]
},
{
"matchUpdateTypes": ["major"],
"groupName": "major dependencies",
"schedule": ["before 3am on first day of quarter"],
"dependencyDashboardApproval": true
},
{
"matchPackagePatterns": ["^@security/"],
"schedule": ["at any time"],
"automerge": true
}
],
"vulnerabilityAlerts": {
"enabled": true,
"labels": ["security"],
"assignees": ["@security-team"]
}
}
Deprecation and Migration¶
Migration Planning Template¶
When a dependency needs replacement:
# Migration Plan: [Old Dependency] → [New Dependency]
## Overview
- **Old Dependency**: [name] v[version]
- **New Dependency**: [name] v[version]
- **Migration Owner**: @[team/person]
- **Target Completion**: [date]
- **Status**: [Planning/In Progress/Complete]
## Rationale
Why are we migrating?
- [ ] Security vulnerabilities
- [ ] Maintenance issues (abandoned project)
- [ ] Performance problems
- [ ] Missing features
- [ ] Better alternative available
- [ ] Other: [specify]
## Impact Assessment
### Affected Components
| Component | Type | Effort | Risk |
|-----------|------|--------|------|
| User API | Module | Medium | Low |
| Auth Service | Service | High | Medium |
| Admin Dashboard | UI | Low | Low |
### Dependencies
Other packages that depend on this:
- [dependency-1]
- [dependency-2]
## Migration Strategy
### Phase 1: Preparation (Week 1-2)
- [ ] Document current usage patterns
- [ ] Identify all integration points
- [ ] Create adapter layer for new dependency
- [ ] Set up feature flags
- [ ] Write compatibility tests
### Phase 2: Implementation (Week 3-6)
- [ ] Integrate new dependency
- [ ] Implement adapters
- [ ] Migrate component A
- [ ] Migrate component B
- [ ] Migrate component C
### Phase 3: Testing (Week 7-8)
- [ ] Unit tests passing
- [ ] Integration tests passing
- [ ] Performance tests passing
- [ ] Security scan clean
- [ ] UAT complete
### Phase 4: Deployment (Week 9-10)
- [ ] Deploy to dev environment
- [ ] Deploy to staging environment
- [ ] Canary deployment to production (10%)
- [ ] Full production deployment (100%)
- [ ] Monitor for 1 week
### Phase 5: Cleanup (Week 11-12)
- [ ] Remove old dependency
- [ ] Remove compatibility layer
- [ ] Remove feature flags
- [ ] Update documentation
- [ ] Archive migration notes
## Rollback Plan
If migration fails:
1. Disable feature flag
2. Revert deployment
3. Fall back to old dependency
4. Review and address issues
5. Reschedule migration attempt
## Success Criteria
- [ ] All functionality working with new dependency
- [ ] No performance regression
- [ ] No new security vulnerabilities
- [ ] Old dependency fully removed
- [ ] Documentation updated
## Risk Mitigation
| Risk | Impact | Mitigation |
|------|--------|------------|
| Breaking changes in production | High | Feature flags, canary deployment |
| Performance degradation | Medium | Load testing, monitoring |
| Team unfamiliar with new tool | Medium | Training session, documentation |
Metrics and Reporting¶
Monthly Report Template¶
# generate_dependency_report.py
from dependency_registry import DependencyRegistry, SecurityRisk
from datetime import datetime
import json
def generate_monthly_report(registry: DependencyRegistry) -> str:
"""Generate a monthly dependency health report."""
report = []
report.append("# Monthly Dependency Report")
report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d')}\n")
# Health overview
health = registry.generate_health_report()
report.append("## Health Overview")
report.append(f"- **Health Score**: {health['health_score']:.1f}/100")
report.append(f"- **Total Dependencies**: {health['total_dependencies']}")
report.append(f"- **Needs Review**: {health['needs_review']}")
report.append(f"- **With Vulnerabilities**: {health['with_vulnerabilities']}")
report.append(f"- **Deprecating**: {health['deprecating']}\n")
# Critical items
critical = registry.get_dependencies_by_risk(SecurityRisk.CRITICAL)
if critical:
report.append("## Critical Issues")
for dep in critical:
report.append(f"- **{dep.name}** (Owner: {dep.owner})")
report.append(f" - {len(dep.known_vulnerabilities)} critical vulnerabilities")
# Needs review
review_needed = registry.get_dependencies_needing_review()
if review_needed:
report.append("\n## Review Needed")
for dep in review_needed:
days_overdue = (datetime.now() - dep.next_review).days
report.append(f"- **{dep.name}** ({days_overdue} days overdue, Owner: {dep.owner})")
# Deprecations
deprecating = [d for d in registry.dependencies.values()
if d.status.value == "DEPRECATING"]
if deprecating:
report.append("\n## Active Migrations")
for dep in deprecating:
report.append(f"- **{dep.name}** → {dep.replacement_plan}")
return "\n".join(report)
if __name__ == "__main__":
registry = DependencyRegistry()
report = generate_monthly_report(registry)
print(report)
# Save to file
with open(f"dependency-report-{datetime.now().strftime('%Y-%m')}.md", 'w') as f:
f.write(report)
Common Usage Patterns¶
Overview¶
Beyond selecting and integrating libraries, developers need consistent patterns for using them effectively. This section covers:
- Asynchronous operations - Promises, async/await, cancellation
- State management - Predictable state updates and observation
- Data access - Repository patterns and caching strategies
- UI composition - Component patterns and communication
- Cross-cutting concerns - Logging, feature flags, error handling
Language-Agnostic Principles
While examples use specific languages, the patterns apply broadly across ecosystems.
Asynchronous Operation Patterns¶
Promise/Future Pattern¶
Principle: Consistently structure asynchronous code flow using language-appropriate patterns.
// GOOD: Consistent async/await pattern
async function fetchUserProfile(userId) {
try {
// Chain async operations naturally
const user = await userApi.fetchUser(userId);
const permissions = await permissionsApi.fetchPermissions(user.id);
const preferences = await preferencesApi.fetchPreferences(user.id);
return {
user,
permissions,
preferences
};
} catch (error) {
logger.error(`Failed to fetch user profile: ${error.message}`);
throw new UserProfileError('Unable to load user profile', error);
}
}
// BAD: Callback hell
function fetchUserProfileBad(userId, callback) {
userApi.fetchUser(userId, (err, user) => {
if (err) return callback(err);
permissionsApi.fetchPermissions(user.id, (err, permissions) => {
if (err) return callback(err);
preferencesApi.fetchPreferences(user.id, (err, preferences) => {
if (err) return callback(err);
callback(null, { user, permissions, preferences });
});
});
});
}
// Parallel execution when operations are independent
async function fetchUserDashboard(userId) {
try {
// Execute independent operations in parallel
const [user, notifications, analytics] = await Promise.all([
userApi.fetchUser(userId),
notificationApi.fetchRecent(userId),
analyticsApi.fetchStats(userId)
]);
return { user, notifications, analytics };
} catch (error) {
logger.error('Dashboard fetch failed:', error);
throw error;
}
}
import asyncio
from typing import Dict, Any
# GOOD: Async/await pattern
async def fetch_user_profile(user_id: str) -> Dict[str, Any]:
"""Fetch complete user profile with related data."""
try:
# Sequential async operations
user = await user_api.fetch_user(user_id)
permissions = await permissions_api.fetch_permissions(user['id'])
preferences = await preferences_api.fetch_preferences(user['id'])
return {
'user': user,
'permissions': permissions,
'preferences': preferences
}
except Exception as e:
logger.error(f"Failed to fetch user profile: {str(e)}")
raise UserProfileError('Unable to load user profile') from e
# Parallel execution
async def fetch_user_dashboard(user_id: str) -> Dict[str, Any]:
"""Fetch dashboard data in parallel."""
try:
# Execute independent operations concurrently
user_task = user_api.fetch_user(user_id)
notifications_task = notification_api.fetch_recent(user_id)
analytics_task = analytics_api.fetch_stats(user_id)
user, notifications, analytics = await asyncio.gather(
user_task,
notifications_task,
analytics_task
)
return {
'user': user,
'notifications': notifications,
'analytics': analytics
}
except Exception as e:
logger.error(f"Dashboard fetch failed: {str(e)}")
raise
// GOOD: CompletableFuture pattern
public CompletableFuture<UserProfile> fetchUserProfile(String userId) {
return userApi.fetchUser(userId)
.thenCompose(user ->
permissionsApi.fetchPermissions(user.getId())
.thenCombine(
preferencesApi.fetchPreferences(user.getId()),
(permissions, preferences) ->
new UserProfile(user, permissions, preferences)
)
)
.exceptionally(error -> {
logger.error("Failed to fetch user profile", error);
throw new UserProfileException("Unable to load user profile", error);
});
}
// Parallel execution
public CompletableFuture<Dashboard> fetchUserDashboard(String userId) {
CompletableFuture<User> userFuture = userApi.fetchUser(userId);
CompletableFuture<List<Notification>> notificationsFuture =
notificationApi.fetchRecent(userId);
CompletableFuture<Analytics> analyticsFuture =
analyticsApi.fetchStats(userId);
return CompletableFuture.allOf(userFuture, notificationsFuture, analyticsFuture)
.thenApply(v -> new Dashboard(
userFuture.join(),
notificationsFuture.join(),
analyticsFuture.join()
))
.exceptionally(error -> {
logger.error("Dashboard fetch failed", error);
throw new DashboardException(error);
});
}
Cancellation Pattern¶
Principle: Support cancellation for long-running operations to prevent wasted resources.
class DataFetcher {
constructor(apiClient) {
this.apiClient = apiClient;
this.activeRequests = new Map();
}
async fetchWithCancellation(url, requestId, options = {}) {
// Create abort controller for this request
const abortController = new AbortController();
this.activeRequests.set(requestId, abortController);
try {
const response = await this.apiClient.get(url, {
...options,
signal: abortController.signal
});
return response.data;
} catch (error) {
if (error.name === 'AbortError') {
console.log(`Request ${requestId} was cancelled`);
return null;
}
throw error;
} finally {
this.activeRequests.delete(requestId);
}
}
cancelRequest(requestId) {
const controller = this.activeRequests.get(requestId);
if (controller) {
controller.abort();
this.activeRequests.delete(requestId);
}
}
cancelAll() {
for (const [id, controller] of this.activeRequests.entries()) {
controller.abort();
}
this.activeRequests.clear();
}
}
// Usage in React component
function UserSearch() {
const [results, setResults] = useState([]);
const fetcher = useRef(new DataFetcher(apiClient));
useEffect(() => {
const requestId = 'search-' + Date.now();
fetcher.current.fetchWithCancellation(
`/users/search?q=${query}`,
requestId
).then(data => {
if (data) setResults(data);
});
// Cleanup: cancel on unmount or when query changes
return () => fetcher.current.cancelRequest(requestId);
}, [query]);
return <ResultsList results={results} />;
}
import asyncio
from typing import Optional, Any
class DataFetcher:
def __init__(self, api_client):
self.api_client = api_client
self.active_tasks = {}
async def fetch_with_timeout(
self,
url: str,
request_id: str,
timeout: float = 30.0
) -> Optional[Any]:
"""Fetch data with timeout and cancellation support."""
async def fetch_task():
try:
response = await self.api_client.get(url)
return response
except asyncio.CancelledError:
print(f"Request {request_id} was cancelled")
return None
# Create task and store reference
task = asyncio.create_task(fetch_task())
self.active_tasks[request_id] = task
try:
# Wait with timeout
result = await asyncio.wait_for(task, timeout=timeout)
return result
except asyncio.TimeoutError:
print(f"Request {request_id} timed out after {timeout}s")
task.cancel()
return None
finally:
self.active_tasks.pop(request_id, None)
def cancel_request(self, request_id: str):
"""Cancel a specific request."""
task = self.active_tasks.get(request_id)
if task and not task.done():
task.cancel()
def cancel_all(self):
"""Cancel all active requests."""
for task in self.active_tasks.values():
if not task.done():
task.cancel()
self.active_tasks.clear()
State Management Patterns¶
Unidirectional Data Flow¶
Principle: Maintain predictable state changes through explicit, traceable updates.
// Actions - What happened
const actionTypes = {
ADD_TODO: 'ADD_TODO',
TOGGLE_TODO: 'TOGGLE_TODO',
DELETE_TODO: 'DELETE_TODO',
LOAD_TODOS: 'LOAD_TODOS',
LOAD_TODOS_SUCCESS: 'LOAD_TODOS_SUCCESS',
LOAD_TODOS_ERROR: 'LOAD_TODOS_ERROR'
};
// Action creators
const addTodo = (text) => ({
type: actionTypes.ADD_TODO,
payload: {
id: Date.now(),
text,
completed: false
}
});
const toggleTodo = (id) => ({
type: actionTypes.TOGGLE_TODO,
payload: { id }
});
// Async action with thunk
const loadTodos = () => async (dispatch) => {
dispatch({ type: actionTypes.LOAD_TODOS });
try {
const todos = await todoApi.fetchAll();
dispatch({
type: actionTypes.LOAD_TODOS_SUCCESS,
payload: todos
});
} catch (error) {
dispatch({
type: actionTypes.LOAD_TODOS_ERROR,
payload: error.message
});
}
};
// Reducer - How state changes
const initialState = {
items: [],
loading: false,
error: null
};
function todosReducer(state = initialState, action) {
switch (action.type) {
case actionTypes.ADD_TODO:
return {
...state,
items: [...state.items, action.payload]
};
case actionTypes.TOGGLE_TODO:
return {
...state,
items: state.items.map(todo =>
todo.id === action.payload.id
? { ...todo, completed: !todo.completed }
: todo
)
};
case actionTypes.DELETE_TODO:
return {
...state,
items: state.items.filter(todo => todo.id !== action.payload.id)
};
case actionTypes.LOAD_TODOS:
return {
...state,
loading: true,
error: null
};
case actionTypes.LOAD_TODOS_SUCCESS:
return {
...state,
loading: false,
items: action.payload
};
case actionTypes.LOAD_TODOS_ERROR:
return {
...state,
loading: false,
error: action.payload
};
default:
return state;
}
}
// Selectors - How to read state
const selectAllTodos = (state) => state.todos.items;
const selectActiveTodos = (state) => state.todos.items.filter(t => !t.completed);
const selectCompletedTodos = (state) => state.todos.items.filter(t => t.completed);
const selectTodoById = (state, id) => state.todos.items.find(t => t.id === id);
from dataclasses import dataclass, replace
from typing import List, Optional
from enum import Enum
class TodoAction(Enum):
ADD = "ADD"
TOGGLE = "TOGGLE"
DELETE = "DELETE"
@dataclass(frozen=True)
class Todo:
id: int
text: str
completed: bool = False
@dataclass(frozen=True)
class TodoState:
"""Immutable state container for todos."""
todos: List[Todo]
loading: bool = False
error: Optional[str] = None
def add_todo(self, text: str) -> 'TodoState':
"""Add a new todo, returning new state."""
new_todo = Todo(
id=max([t.id for t in self.todos], default=0) + 1,
text=text,
completed=False
)
return replace(self, todos=[*self.todos, new_todo])
def toggle_todo(self, todo_id: int) -> 'TodoState':
"""Toggle todo completion, returning new state."""
new_todos = [
replace(todo, completed=not todo.completed)
if todo.id == todo_id
else todo
for todo in self.todos
]
return replace(self, todos=new_todos)
def delete_todo(self, todo_id: int) -> 'TodoState':
"""Delete a todo, returning new state."""
new_todos = [todo for todo in self.todos if todo.id != todo_id]
return replace(self, todos=new_todos)
def set_loading(self, loading: bool) -> 'TodoState':
"""Update loading state."""
return replace(self, loading=loading, error=None)
def set_error(self, error: str) -> 'TodoState':
"""Set error state."""
return replace(self, loading=False, error=error)
# Store manages state transitions
class TodoStore:
def __init__(self):
self._state = TodoState(todos=[])
self._listeners = []
@property
def state(self) -> TodoState:
return self._state
def dispatch(self, action: TodoAction, **kwargs):
"""Dispatch an action to update state."""
old_state = self._state
if action == TodoAction.ADD:
self._state = self._state.add_todo(kwargs['text'])
elif action == TodoAction.TOGGLE:
self._state = self._state.toggle_todo(kwargs['id'])
elif action == TodoAction.DELETE:
self._state = self._state.delete_todo(kwargs['id'])
# Notify listeners if state changed
if self._state is not old_state:
self._notify_listeners()
def subscribe(self, listener):
"""Subscribe to state changes."""
self._listeners.append(listener)
return lambda: self._listeners.remove(listener)
def _notify_listeners(self):
"""Notify all listeners of state change."""
for listener in self._listeners:
listener(self._state)
Data Access Patterns¶
Repository Pattern with Caching¶
Principle: Abstract data access behind repositories with intelligent caching.
// Repository with built-in caching
class CachedRepository {
constructor(apiClient, cacheTTL = 300000) { // 5 minutes default
this.apiClient = apiClient;
this.cacheTTL = cacheTTL;
this.cache = new Map();
}
async findById(id) {
const cacheKey = `user:${id}`;
const cached = this._getFromCache(cacheKey);
if (cached) {
console.log(`Cache hit: ${cacheKey}`);
return cached;
}
console.log(`Cache miss: ${cacheKey}`);
const data = await this.apiClient.get(`/users/${id}`);
this._setInCache(cacheKey, data);
return data;
}
async findAll(filters = {}) {
const cacheKey = `users:all:${JSON.stringify(filters)}`;
const cached = this._getFromCache(cacheKey);
if (cached) {
return cached;
}
const data = await this.apiClient.get('/users', { params: filters });
this._setInCache(cacheKey, data, this.cacheTTL / 2); // Shorter TTL for lists
return data;
}
async save(entity) {
const data = await this.apiClient.post('/users', entity);
// Invalidate related cache entries
this._invalidatePattern('user:');
this._invalidatePattern('users:all:');
// Cache the new entity
this._setInCache(`user:${data.id}`, data);
return data;
}
async update(id, updates) {
const data = await this.apiClient.put(`/users/${id}`, updates);
// Update cache
this._setInCache(`user:${id}`, data);
this._invalidatePattern('users:all:');
return data;
}
async delete(id) {
await this.apiClient.delete(`/users/${id}`);
// Invalidate cache
this._deleteFromCache(`user:${id}`);
this._invalidatePattern('users:all:');
}
_getFromCache(key) {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() > entry.expires) {
this.cache.delete(key);
return null;
}
return entry.data;
}
_setInCache(key, data, ttl = this.cacheTTL) {
this.cache.set(key, {
data,
expires: Date.now() + ttl
});
}
_deleteFromCache(key) {
this.cache.delete(key);
}
_invalidatePattern(pattern) {
for (const key of this.cache.keys()) {
if (key.startsWith(pattern)) {
this.cache.delete(key);
}
}
}
clearCache() {
this.cache.clear();
}
}
Pagination Pattern¶
Principle: Consistent pagination handling across different data sources.
class PaginatedRepository {
constructor(apiClient, pageSize = 20) {
this.apiClient = apiClient;
this.pageSize = pageSize;
}
// Offset-based pagination
async findPage(page = 1, filters = {}) {
const offset = (page - 1) * this.pageSize;
const response = await this.apiClient.get('/users', {
params: {
...filters,
limit: this.pageSize,
offset
}
});
return {
data: response.data,
pagination: {
page,
pageSize: this.pageSize,
totalItems: response.total,
totalPages: Math.ceil(response.total / this.pageSize),
hasNext: page < Math.ceil(response.total / this.pageSize),
hasPrev: page > 1
}
};
}
// Cursor-based pagination (better for large datasets)
async findPageCursor(cursor = null, filters = {}) {
const response = await this.apiClient.get('/users', {
params: {
...filters,
limit: this.pageSize,
cursor
}
});
return {
data: response.data,
pagination: {
pageSize: this.pageSize,
nextCursor: response.nextCursor,
prevCursor: response.prevCursor,
hasNext: !!response.nextCursor,
hasPrev: !!response.prevCursor
}
};
}
// Async iterator for seamless pagination
async *iterateAll(filters = {}) {
let page = 1;
let hasMore = true;
while (hasMore) {
const result = await this.findPage(page, filters);
for (const item of result.data) {
yield item;
}
hasMore = result.pagination.hasNext;
page++;
}
}
}
// Usage
const repo = new PaginatedRepository(apiClient);
// Manual pagination
const page1 = await repo.findPage(1, { active: true });
console.log(`Showing ${page1.data.length} of ${page1.pagination.totalItems}`);
// Iterate all pages automatically
for await (const user of repo.iterateAll({ active: true })) {
console.log(user.name);
}
UI Component Composition¶
Component Communication Patterns¶
// GOOD: Parent controls state, child emits events
function TodoList({ todos, onToggle, onDelete }) {
return (
<ul className="todo-list">
{todos.map(todo => (
<TodoItem
key={todo.id}
todo={todo}
onToggle={() => onToggle(todo.id)}
onDelete={() => onDelete(todo.id)}
/>
))}
</ul>
);
}
function TodoItem({ todo, onToggle, onDelete }) {
return (
<li className={todo.completed ? 'completed' : ''}>
<input
type="checkbox"
checked={todo.completed}
onChange={onToggle}
/>
<span>{todo.text}</span>
<button onClick={onDelete}>Delete</button>
</li>
);
}
// Parent component manages state
function TodoApp() {
const [todos, setTodos] = useState([]);
const handleToggle = (id) => {
setTodos(todos.map(todo =>
todo.id === id
? { ...todo, completed: !todo.completed }
: todo
));
};
const handleDelete = (id) => {
setTodos(todos.filter(todo => todo.id !== id));
};
return (
<TodoList
todos={todos}
onToggle={handleToggle}
onDelete={handleDelete}
/>
);
}
// Flexible data fetching component
function DataFetcher({ url, children }) {
const [data, setData] = useState(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState(null);
useEffect(() => {
fetchData();
}, [url]);
async function fetchData() {
try {
setLoading(true);
const response = await fetch(url);
const json = await response.json();
setData(json);
} catch (err) {
setError(err);
} finally {
setLoading(false);
}
}
// Pass state to children via render prop
return children({ data, loading, error, refetch: fetchData });
}
// Usage - consumer decides how to render
function UserProfile({ userId }) {
return (
<DataFetcher url={`/api/users/${userId}`}>
{({ data, loading, error, refetch }) => {
if (loading) return <Spinner />;
if (error) return <Error message={error.message} />;
if (!data) return null;
return (
<div>
<h1>{data.name}</h1>
<p>{data.email}</p>
<button onClick={refetch}>Refresh</button>
</div>
);
}}
</DataFetcher>
);
}
Cross-Cutting Patterns¶
Structured Logging¶
class Logger {
constructor(context = {}) {
this.context = context;
}
log(level, message, metadata = {}) {
const logEntry = {
timestamp: new Date().toISOString(),
level,
message,
...this.context,
...metadata
};
console.log(JSON.stringify(logEntry));
// Send to logging service
if (this.shouldSendToService(level)) {
this.sendToLoggingService(logEntry);
}
}
info(message, metadata) {
this.log('INFO', message, metadata);
}
error(message, error, metadata) {
this.log('ERROR', message, {
...metadata,
error: {
name: error.name,
message: error.message,
stack: error.stack
}
});
}
withContext(additionalContext) {
return new Logger({ ...this.context, ...additionalContext });
}
}
// Usage
const logger = new Logger({ service: 'user-api', version: '1.0.0' });
// Request-specific logger
app.use((req, res, next) => {
req.logger = logger.withContext({
requestId: req.id,
path: req.path,
method: req.method
});
next();
});
// In route handler
app.get('/users/:id', async (req, res) => {
req.logger.info('Fetching user', { userId: req.params.id });
try {
const user = await userService.findById(req.params.id);
res.json(user);
} catch (error) {
req.logger.error('Failed to fetch user', error, { userId: req.params.id });
res.status(500).json({ error: 'Internal server error' });
}
});
Pattern Selection Guide¶
| Scenario | Recommended Pattern | Why |
|---|---|---|
| Async API calls | Async/await with proper error handling | Readable, maintainable |
| Parallel operations | Promise.all / asyncio.gather | Performance |
| Cancelable operations | AbortController / asyncio timeout | Resource management |
| State management | Unidirectional flow (Redux-like) | Predictability |
| Data access | Repository pattern with caching | Abstraction, performance |
| Large datasets | Cursor-based pagination | Scalability |
| Component reuse | Props down, events up | Clear contracts |
| Flexible rendering | Render props / hooks | Composition |
Best Practices Summary¶
DO¶
- Evaluate systematically using documented criteria
- Encapsulate library usage behind abstractions
- Pin versions in production environments
- Monitor for security vulnerabilities
- Document dependency decisions and rationale
- Test dependency updates before production deployment
- Review dependencies regularly (at least quarterly)
- Clean up unused dependencies promptly
- Use proven patterns for common scenarios
- Maintain a central dependency registry
- Automate security scanning and updates
- Plan migrations carefully with proper testing
DON'T¶
- Add dependencies without evaluation
- Call libraries directly from business logic
- Ignore security vulnerability warnings
- Use unpinned versions in production
- Skip changelog review before updates
- Forget to document customizations
- Accumulate unused dependencies
- Monkey patch libraries without documentation
- Rush major updates without planning
- Neglect lifecycle management
- Skip abstraction layers
- Forget rollback strategies
Comprehensive Summary¶
Dependency Selection¶
Key Takeaways:
- Use the evaluation scorecard to systematically assess dependencies
- Consider maintenance, documentation, security, and community health
- Document all decisions with clear rationale
- Follow the "anti-dependency principle" - implement simple functionality internally
- Establish clear approval workflows based on dependency criticality
Critical Metrics:
- Evaluation score ≥ 7.5 for approval
- No critical/high security vulnerabilities
- Active maintenance (updates within 6 months)
- Compatible licensing
Integration Patterns¶
Key Takeaways:
- Always abstract external libraries behind domain interfaces
- Use adapter, facade, or repository patterns appropriately
- Maintain upgrade flexibility through proper encapsulation
- Test integration points at multiple levels
- Handle version updates systematically by type (patch/minor/major)
Core Principles:
- Single point of change for library upgrades
- Business logic independent of implementation details
- Comprehensive test coverage for integrations
- Compatibility layers for breaking changes
Lifecycle Management¶
Key Takeaways:
- Maintain a comprehensive dependency registry
- Automate security vulnerability scanning
- Schedule regular dependency reviews
- Plan migrations with detailed checklists
- Track health metrics continuously
Management Cadence:
- Daily: Security patches (automated)
- Weekly: Patch updates (reviewed)
- Monthly: Minor updates (tested)
- Quarterly: Major updates (planned)
- Quarterly: Full dependency audit
Usage Patterns¶
Key Takeaways:
- Use async/await for asynchronous operations
- Implement proper cancellation for long-running tasks
- Follow unidirectional data flow for state management
- Abstract data access with repository pattern
- Use caching strategically for performance
- Implement structured logging for observability
Pattern Principles:
- Consistency across the codebase
- Readability and maintainability
- Testability through clear boundaries
- Performance through intelligent patterns
Quick Reference¶
Evaluation Checklist¶
- Complete evaluation scorecard
- Check security vulnerabilities
- Review license compatibility
- Assess maintenance activity
- Document decision rationale
- Get appropriate approval
- Add to dependency registry
Integration Checklist¶
- Create abstraction layer
- Implement adapter/facade
- Add comprehensive tests
- Document usage patterns
- Pin version appropriately
- Set up monitoring
Maintenance Checklist¶
- Assign dependency owner
- Schedule regular reviews
- Enable automated scanning
- Configure update automation
- Plan for deprecation
- Monitor health metrics
Migration Checklist¶
- Document rationale
- Assess impact
- Create migration plan
- Implement with feature flags
- Test thoroughly
- Deploy gradually
- Monitor post-deployment
- Clean up old dependency
Final Recommendations¶
For New Projects¶
- Start with a registry - Document dependencies from day one
- Automate from the start - Set up security scanning in CI/CD
- Define standards early - Establish evaluation and approval processes
- Use abstractions - Don't tightly couple to libraries
- Plan for change - Assume dependencies will need replacement
For Existing Projects¶
- Audit current state - Inventory all dependencies
- Address critical issues - Fix security vulnerabilities immediately
- Establish processes - Implement evaluation and review workflows
- Refactor gradually - Add abstraction layers over time
- Monitor continuously - Track health metrics and act on alerts
For Team Success¶
- Share knowledge - Document decisions and patterns
- Assign ownership - Make dependency management someone's job
- Review regularly - Make it part of team rituals
- Improve continuously - Learn from migrations and incidents
- Balance pragmatism - Not every problem needs a library
Additional Resources¶
Tools and Platforms¶
Security Scanning:
- Snyk - Vulnerability scanning and remediation
- Dependabot - Automated dependency updates
- OWASP Dependency-Check - Software composition analysis
Dependency Management:
- Renovate - Automated dependency updates
- Libraries.io - Dependency monitoring
- Deps.dev - Open source insights
Documentation:
- Architecture Decision Records (ADR) - Document decisions
- Keep a Changelog - Changelog best practices
Further Reading¶
- OWASP Dependency Management Cheat Sheet
- Semantic Versioning 2.0.0
- The Twelve-Factor App - Dependencies
- npm Best Practices Guide
Getting Support¶
Internal Escalation¶
| Issue Type | Contact | SLA |
|---|---|---|
| Security vulnerability | Security Team- Led by Caleb | 24 hours |
| License concern | Legal Unit- martin | 1 week |
| Architecture decision | Project Team | 2 days |
| Technical blocker | CTO | 1 day |
Community Resources¶
- Stack Overflow - Technical questions and solutions
- GitHub Discussions - Library-specific questions
- Reddit r/programming - General discussion and advice
- Dev.to - Tutorials and best practices
Last updated: November 2025