Explore advanced features, async patterns, and production-ready best practices for professional PM2 process management.
Leverage sophisticated async patterns for high-performance process management.
import asyncio
from pm2 import PM2Manager
from typing import List, Dict, Any
class ConcurrentPM2Manager:
def __init__(self, max_concurrent: int = 10):
self.manager = PM2Manager()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session_stats = {
'operations': 0,
'successes': 0,
'failures': 0
}
async def batch_start_processes(self, configs: List[Dict[str, Any]]) -> List[Dict]:
"""Start multiple processes concurrently with rate limiting"""
async def start_single(config):
async with self.semaphore:
try:
process = await self.manager.start_async(config)
self.session_stats['successes'] += 1
return {
'success': True,
'process': process,
'config': config,
'error': None
}
except Exception as e:
self.session_stats['failures'] += 1
return {
'success': False,
'process': None,
'config': config,
'error': str(e)
}
finally:
self.session_stats['operations'] += 1
# Execute all starts concurrently
tasks = [start_single(config) for config in configs]
results = await asyncio.gather(*tasks, return_exceptions=False)
return results
async def rolling_restart(self, process_names: List[str],
delay_between: float = 2.0) -> Dict[str, bool]:
"""Perform rolling restart with zero downtime"""
results = {}
for name in process_names:
try:
# Get current process info
process = await self.manager.get_process_async(name)
old_pid = process.pid
# Restart process
await self.manager.restart_async(name)
# Wait for stabilization
await asyncio.sleep(delay_between)
# Verify restart
new_process = await self.manager.get_process_async(name)
results[name] = new_process.pid != old_pid and new_process.status == 'online'
except Exception as e:
results[name] = False
print(f"Rolling restart failed for {name}: {e}")
return results
# Usage example
async def main():
manager = ConcurrentPM2Manager(max_concurrent=5)
# Batch start configuration
apps = [
{"name": f"worker-{i}", "script": "worker.js", "instances": 1}
for i in range(10)
]
results = await manager.batch_start_processes(apps)
# Analyze results
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
print(f"Started {len(successful)} processes successfully")
print(f"Failed to start {len(failed)} processes")
# Rolling restart after 30 seconds
await asyncio.sleep(30)
process_names = [r['process'].name for r in successful]
restart_results = await manager.rolling_restart(process_names)
print(f"Rolling restart results: {restart_results}")
asyncio.run(main())
import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import AsyncGenerator, Callable, Any
from pm2 import PM2Manager, ProcessMetrics
@dataclass
class ProcessTask:
action: str
target: str
params: dict
priority: int = 0
retry_count: int = 0
max_retries: int = 3
class AsyncPM2Pipeline:
def __init__(self, workers: int = 3):
self.manager = PM2Manager()
self.task_queue = Queue()
self.result_queue = Queue()
self.workers = workers
self.running = False
async def add_task(self, task: ProcessTask):
"""Add task to processing queue"""
await self.task_queue.put(task)
async def worker(self, worker_id: int):
"""Process tasks from queue"""
while self.running:
try:
# Get task with timeout
task = await asyncio.wait_for(
self.task_queue.get(), timeout=1.0
)
print(f"Worker {worker_id} processing: {task.action} on {task.target}")
# Execute task
result = await self._execute_task(task)
# Put result in result queue
await self.result_queue.put({
'task': task,
'result': result,
'worker_id': worker_id
})
# Mark task as done
self.task_queue.task_done()
except asyncio.TimeoutError:
# No task available, continue
continue
except Exception as e:
# Handle task retry
if task.retry_count < task.max_retries:
task.retry_count += 1
await self.task_queue.put(task)
print(f"Retrying task {task.action} (attempt {task.retry_count})")
else:
print(f"Task failed permanently: {task.action} - {e}")
async def _execute_task(self, task: ProcessTask) -> Any:
"""Execute individual task based on action type"""
action_map = {
'start': self.manager.start_async,
'stop': self.manager.stop_async,
'restart': self.manager.restart_async,
'delete': self.manager.delete_async,
'reload': self.manager.reload_async
}
action_func = action_map.get(task.action)
if not action_func:
raise ValueError(f"Unknown action: {task.action}")
# Execute with parameters
if task.params:
return await action_func(task.target, **task.params)
else:
return await action_func(task.target)
async def start_pipeline(self):
"""Start the processing pipeline"""
self.running = True
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.workers)
]
# Monitor results
result_monitor = asyncio.create_task(self._monitor_results())
return workers + [result_monitor]
async def _monitor_results(self):
"""Monitor and log results"""
while self.running:
try:
result = await asyncio.wait_for(
self.result_queue.get(), timeout=1.0
)
task = result['task']
success = result['result'] is not None
print(f"✅ Task completed: {task.action} on {task.target} "
f"by worker {result['worker_id']} - Success: {success}")
except asyncio.TimeoutError:
continue
async def stop_pipeline(self):
"""Stop the pipeline gracefully"""
self.running = False
# Wait for queue to be empty
await self.task_queue.join()
# Usage example
async def pipeline_example():
pipeline = AsyncPM2Pipeline(workers=3)
# Start pipeline
tasks = await pipeline.start_pipeline()
# Add various tasks
await pipeline.add_task(ProcessTask('start', 'app1.js', {'instances': 2}))
await pipeline.add_task(ProcessTask('start', 'app2.js', {'instances': 1}))
await pipeline.add_task(ProcessTask('restart', 'app1', {}))
await pipeline.add_task(ProcessTask('stop', 'app2', {}))
# Let pipeline process for 10 seconds
await asyncio.sleep(10)
# Stop pipeline
await pipeline.stop_pipeline()
# Cancel all tasks
for task in tasks:
task.cancel()
asyncio.run(pipeline_example())
Use context managers for automatic resource management and cleanup.
from contextlib import asynccontextmanager, contextmanager
from typing import List, Dict, Optional, AsyncGenerator
from pm2 import PM2Manager, PM2Process
class ManagedPM2Process:
"""Context manager for automatic process lifecycle management"""
def __init__(self, manager: PM2Manager, config: Dict,
auto_delete: bool = True, auto_stop: bool = True):
self.manager = manager
self.config = config
self.auto_delete = auto_delete
self.auto_stop = auto_stop
self.process: Optional[PM2Process] = None
self.created = False
def __enter__(self):
"""Start process on context entry"""
try:
self.process = self.manager.start(self.config)
self.created = True
return self.process
except Exception as e:
print(f"Failed to start process: {e}")
raise
def __exit__(self, exc_type, exc_val, exc_tb):
"""Clean up process on context exit"""
if self.process and self.created:
try:
if self.auto_stop and self.process.status == 'online':
self.manager.stop(self.process.name)
print(f"Stopped process: {self.process.name}")
if self.auto_delete:
self.manager.delete(self.process.name)
print(f"Deleted process: {self.process.name}")
except Exception as e:
print(f"Cleanup error for {self.process.name}: {e}")
# Don't suppress exceptions
return False
@asynccontextmanager
async def managed_pm2_process_async(manager: PM2Manager, config: Dict,
auto_delete: bool = True) -> AsyncGenerator[PM2Process, None]:
"""Async context manager for process lifecycle"""
process = None
try:
# Start process
process = await manager.start_async(config)
print(f"Started managed process: {process.name}")
yield process
except Exception as e:
print(f"Error in managed process: {e}")
raise
finally:
# Cleanup
if process:
try:
if process.status == 'online':
await manager.stop_async(process.name)
print(f"Stopped managed process: {process.name}")
if auto_delete:
await manager.delete_async(process.name)
print(f"Deleted managed process: {process.name}")
except Exception as e:
print(f"Cleanup error: {e}")
@contextmanager
def pm2_cluster_context(manager: PM2Manager, base_config: Dict,
instances: int = 2):
"""Context manager for managing a cluster of processes"""
processes = []
try:
# Start cluster
for i in range(instances):
config = base_config.copy()
config['name'] = f"{base_config['name']}-{i}"
config['env'] = config.get('env', {}).copy()
config['env']['INSTANCE_ID'] = str(i)
config['env']['PORT'] = str(int(config['env'].get('PORT', 3000)) + i)
process = manager.start(config)
processes.append(process)
print(f"Started cluster with {len(processes)} instances")
yield processes
finally:
# Cleanup all processes
for process in processes:
try:
if process.status == 'online':
manager.stop(process.name)
manager.delete(process.name)
print(f"Cleaned up: {process.name}")
except Exception as e:
print(f"Cleanup error for {process.name}: {e}")
# Usage examples
def context_manager_examples():
manager = PM2Manager()
# Example 1: Single managed process
config = {
"name": "temp-worker",
"script": "worker.js",
"instances": 1
}
with ManagedPM2Process(manager, config) as process:
print(f"Working with process: {process.name} (PID: {process.pid})")
# Do work with process
metrics = process.get_metrics()
print(f"CPU: {metrics.cpu_percent}%, Memory: {metrics.memory_usage}MB")
# Process will be automatically cleaned up
# Example 2: Cluster management
cluster_config = {
"name": "api-cluster",
"script": "api.js",
"env": {"PORT": "3000", "NODE_ENV": "production"}
}
with pm2_cluster_context(manager, cluster_config, instances=3) as cluster:
print(f"Cluster running with {len(cluster)} processes")
# Monitor cluster
for process in cluster:
print(f"{process.name}: {process.status}")
# All processes will be cleaned up automatically
# Async example
async def async_context_example():
manager = PM2Manager()
config = {
"name": "async-temp",
"script": "app.js"
}
async with managed_pm2_process_async(manager, config) as process:
print(f"Async managed process: {process.name}")
# Async work
metrics = await process.get_metrics_async()
print(f"Async metrics: CPU {metrics.cpu_percent}%")
# Automatic cleanup on exit
# Run examples
context_manager_examples()
asyncio.run(async_context_example())
Efficiently manage multiple processes with batch operations and bulk actions.
from typing import List, Dict, Union, Callable, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import logging
@dataclass
class BatchResult:
success_count: int
failure_count: int
total_count: int
results: List[Dict[str, Any]]
execution_time: float
@property
def success_rate(self) -> float:
return (self.success_count / self.total_count) * 100 if self.total_count > 0 else 0
class PM2BatchOperator:
"""High-performance batch operations for PM2 processes"""
def __init__(self, manager: PM2Manager, max_workers: int = 5,
timeout_per_operation: float = 30.0):
self.manager = manager
self.max_workers = max_workers
self.timeout = timeout_per_operation
self.logger = logging.getLogger(__name__)
def batch_start(self, configs: List[Dict[str, Any]],
parallel: bool = True) -> BatchResult:
"""Start multiple processes in batch"""
start_time = time.time()
if parallel:
return self._parallel_operation(configs, self._start_single, start_time)
else:
return self._sequential_operation(configs, self._start_single, start_time)
def batch_stop(self, identifiers: List[Union[str, int]],
parallel: bool = True) -> BatchResult:
"""Stop multiple processes in batch"""
start_time = time.time()
configs = [{'identifier': ident} for ident in identifiers]
if parallel:
return self._parallel_operation(configs, self._stop_single, start_time)
else:
return self._sequential_operation(configs, self._stop_single, start_time)
def batch_restart(self, identifiers: List[Union[str, int]],
stagger_delay: float = 0.5) -> BatchResult:
"""Restart multiple processes with optional staggering"""
start_time = time.time()
results = []
success_count = 0
for i, identifier in enumerate(identifiers):
try:
# Add stagger delay between restarts
if i > 0 and stagger_delay > 0:
time.sleep(stagger_delay)
result = self.manager.restart(identifier)
results.append({
'identifier': identifier,
'success': True,
'result': result,
'error': None
})
success_count += 1
except Exception as e:
results.append({
'identifier': identifier,
'success': False,
'result': None,
'error': str(e)
})
execution_time = time.time() - start_time
return BatchResult(
success_count=success_count,
failure_count=len(identifiers) - success_count,
total_count=len(identifiers),
results=results,
execution_time=execution_time
)
def _parallel_operation(self, configs: List[Dict], operation_func: Callable,
start_time: float) -> BatchResult:
"""Execute operations in parallel using ThreadPoolExecutor"""
results = []
success_count = 0
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all tasks
future_to_config = {
executor.submit(operation_func, config): config
for config in configs
}
# Collect results as they complete
for future in as_completed(future_to_config, timeout=self.timeout * len(configs)):
config = future_to_config[future]
try:
result = future.result(timeout=self.timeout)
results.append({
'config': config,
'success': True,
'result': result,
'error': None
})
success_count += 1
except Exception as e:
results.append({
'config': config,
'success': False,
'result': None,
'error': str(e)
})
execution_time = time.time() - start_time
return BatchResult(
success_count=success_count,
failure_count=len(configs) - success_count,
total_count=len(configs),
results=results,
execution_time=execution_time
)
def _sequential_operation(self, configs: List[Dict], operation_func: Callable,
start_time: float) -> BatchResult:
"""Execute operations sequentially"""
results = []
success_count = 0
for config in configs:
try:
result = operation_func(config)
results.append({
'config': config,
'success': True,
'result': result,
'error': None
})
success_count += 1
except Exception as e:
results.append({
'config': config,
'success': False,
'result': None,
'error': str(e)
})
execution_time = time.time() - start_time
return BatchResult(
success_count=success_count,
failure_count=len(configs) - success_count,
total_count=len(configs),
results=results,
execution_time=execution_time
)
def _start_single(self, config: Dict) -> Any:
"""Start a single process"""
return self.manager.start(config)
def _stop_single(self, config: Dict) -> Any:
"""Stop a single process"""
return self.manager.stop(config['identifier'])
# Usage example
def batch_operations_example():
manager = PM2Manager()
batch_operator = PM2BatchOperator(manager, max_workers=3)
# Batch start multiple applications
app_configs = [
{
"name": f"worker-{i}",
"script": "worker.js",
"instances": 1,
"env": {"WORKER_ID": str(i), "PORT": str(3000 + i)}
}
for i in range(10)
]
print("Starting batch operations...")
# Start all processes in parallel
start_result = batch_operator.batch_start(app_configs, parallel=True)
print(f"Batch Start Results:")
print(f" Success: {start_result.success_count}/{start_result.total_count}")
print(f" Success Rate: {start_result.success_rate:.1f}%")
print(f" Execution Time: {start_result.execution_time:.2f}s")
# Wait a bit
time.sleep(5)
# Get names of successfully started processes
successful_names = [
r['result'].name for r in start_result.results
if r['success'] and r['result']
]
# Batch restart with staggering
restart_result = batch_operator.batch_restart(successful_names, stagger_delay=1.0)
print(f"\nBatch Restart Results:")
print(f" Success: {restart_result.success_count}/{restart_result.total_count}")
print(f" Execution Time: {restart_result.execution_time:.2f}s")
# Stop all processes
stop_result = batch_operator.batch_stop(successful_names, parallel=True)
print(f"\nBatch Stop Results:")
print(f" Success: {stop_result.success_count}/{stop_result.total_count}")
print(f" Execution Time: {stop_result.execution_time:.2f}s")
batch_operations_example()
This library is an independent Python wrapper for the PM2 Process Manager. PM2 is a separate open-source project developed by Keymetrics/Unitech. This Python library is not affiliated with, endorsed by, or officially supported by the PM2 team.
This wrapper library communicates with PM2 through its command-line interface and does not modify or redistribute any PM2 code. Users must install PM2 separately. All PM2 trademarks and copyrights belong to their respective owners.