Advanced Usage

Explore advanced features, async patterns, and production-ready best practices for professional PM2 process management.

Advanced Async Patterns

Leverage sophisticated async patterns for high-performance process management.

Concurrent Process Operations

import asyncio
from pm2 import PM2Manager
from typing import List, Dict, Any

class ConcurrentPM2Manager:
    def __init__(self, max_concurrent: int = 10):
        self.manager = PM2Manager()
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session_stats = {
            'operations': 0,
            'successes': 0,
            'failures': 0
        }

    async def batch_start_processes(self, configs: List[Dict[str, Any]]) -> List[Dict]:
        """Start multiple processes concurrently with rate limiting"""
        async def start_single(config):
            async with self.semaphore:
                try:
                    process = await self.manager.start_async(config)
                    self.session_stats['successes'] += 1
                    return {
                        'success': True,
                        'process': process,
                        'config': config,
                        'error': None
                    }
                except Exception as e:
                    self.session_stats['failures'] += 1
                    return {
                        'success': False,
                        'process': None,
                        'config': config,
                        'error': str(e)
                    }
                finally:
                    self.session_stats['operations'] += 1

        # Execute all starts concurrently
        tasks = [start_single(config) for config in configs]
        results = await asyncio.gather(*tasks, return_exceptions=False)
        
        return results

    async def rolling_restart(self, process_names: List[str], 
                            delay_between: float = 2.0) -> Dict[str, bool]:
        """Perform rolling restart with zero downtime"""
        results = {}
        
        for name in process_names:
            try:
                # Get current process info
                process = await self.manager.get_process_async(name)
                old_pid = process.pid
                
                # Restart process
                await self.manager.restart_async(name)
                
                # Wait for stabilization
                await asyncio.sleep(delay_between)
                
                # Verify restart
                new_process = await self.manager.get_process_async(name)
                results[name] = new_process.pid != old_pid and new_process.status == 'online'
                
            except Exception as e:
                results[name] = False
                print(f"Rolling restart failed for {name}: {e}")
        
        return results

# Usage example
async def main():
    manager = ConcurrentPM2Manager(max_concurrent=5)
    
    # Batch start configuration
    apps = [
        {"name": f"worker-{i}", "script": "worker.js", "instances": 1}
        for i in range(10)
    ]
    
    results = await manager.batch_start_processes(apps)
    
    # Analyze results
    successful = [r for r in results if r['success']]
    failed = [r for r in results if not r['success']]
    
    print(f"Started {len(successful)} processes successfully")
    print(f"Failed to start {len(failed)} processes")
    
    # Rolling restart after 30 seconds
    await asyncio.sleep(30)
    process_names = [r['process'].name for r in successful]
    restart_results = await manager.rolling_restart(process_names)
    
    print(f"Rolling restart results: {restart_results}")

asyncio.run(main())

Async Processing Pipeline

import asyncio
from asyncio import Queue
from dataclasses import dataclass
from typing import AsyncGenerator, Callable, Any
from pm2 import PM2Manager, ProcessMetrics

@dataclass
class ProcessTask:
    action: str
    target: str
    params: dict
    priority: int = 0
    retry_count: int = 0
    max_retries: int = 3

class AsyncPM2Pipeline:
    def __init__(self, workers: int = 3):
        self.manager = PM2Manager()
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.workers = workers
        self.running = False
        
    async def add_task(self, task: ProcessTask):
        """Add task to processing queue"""
        await self.task_queue.put(task)
    
    async def worker(self, worker_id: int):
        """Process tasks from queue"""
        while self.running:
            try:
                # Get task with timeout
                task = await asyncio.wait_for(
                    self.task_queue.get(), timeout=1.0
                )
                
                print(f"Worker {worker_id} processing: {task.action} on {task.target}")
                
                # Execute task
                result = await self._execute_task(task)
                
                # Put result in result queue
                await self.result_queue.put({
                    'task': task,
                    'result': result,
                    'worker_id': worker_id
                })
                
                # Mark task as done
                self.task_queue.task_done()
                
            except asyncio.TimeoutError:
                # No task available, continue
                continue
            except Exception as e:
                # Handle task retry
                if task.retry_count < task.max_retries:
                    task.retry_count += 1
                    await self.task_queue.put(task)
                    print(f"Retrying task {task.action} (attempt {task.retry_count})")
                else:
                    print(f"Task failed permanently: {task.action} - {e}")
    
    async def _execute_task(self, task: ProcessTask) -> Any:
        """Execute individual task based on action type"""
        action_map = {
            'start': self.manager.start_async,
            'stop': self.manager.stop_async,
            'restart': self.manager.restart_async,
            'delete': self.manager.delete_async,
            'reload': self.manager.reload_async
        }
        
        action_func = action_map.get(task.action)
        if not action_func:
            raise ValueError(f"Unknown action: {task.action}")
        
        # Execute with parameters
        if task.params:
            return await action_func(task.target, **task.params)
        else:
            return await action_func(task.target)
    
    async def start_pipeline(self):
        """Start the processing pipeline"""
        self.running = True
        workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.workers)
        ]
        
        # Monitor results
        result_monitor = asyncio.create_task(self._monitor_results())
        
        return workers + [result_monitor]
    
    async def _monitor_results(self):
        """Monitor and log results"""
        while self.running:
            try:
                result = await asyncio.wait_for(
                    self.result_queue.get(), timeout=1.0
                )
                
                task = result['task']
                success = result['result'] is not None
                
                print(f"✅ Task completed: {task.action} on {task.target} "
                      f"by worker {result['worker_id']} - Success: {success}")
                
            except asyncio.TimeoutError:
                continue
    
    async def stop_pipeline(self):
        """Stop the pipeline gracefully"""
        self.running = False
        
        # Wait for queue to be empty
        await self.task_queue.join()

# Usage example
async def pipeline_example():
    pipeline = AsyncPM2Pipeline(workers=3)
    
    # Start pipeline
    tasks = await pipeline.start_pipeline()
    
    # Add various tasks
    await pipeline.add_task(ProcessTask('start', 'app1.js', {'instances': 2}))
    await pipeline.add_task(ProcessTask('start', 'app2.js', {'instances': 1}))
    await pipeline.add_task(ProcessTask('restart', 'app1', {}))
    await pipeline.add_task(ProcessTask('stop', 'app2', {}))
    
    # Let pipeline process for 10 seconds
    await asyncio.sleep(10)
    
    # Stop pipeline
    await pipeline.stop_pipeline()
    
    # Cancel all tasks
    for task in tasks:
        task.cancel()

asyncio.run(pipeline_example())

Advanced Context Managers

Use context managers for automatic resource management and cleanup.

Auto-Cleanup Context Manager

from contextlib import asynccontextmanager, contextmanager
from typing import List, Dict, Optional, AsyncGenerator
from pm2 import PM2Manager, PM2Process

class ManagedPM2Process:
    """Context manager for automatic process lifecycle management"""
    
    def __init__(self, manager: PM2Manager, config: Dict, 
                 auto_delete: bool = True, auto_stop: bool = True):
        self.manager = manager
        self.config = config
        self.auto_delete = auto_delete
        self.auto_stop = auto_stop
        self.process: Optional[PM2Process] = None
        self.created = False
    
    def __enter__(self):
        """Start process on context entry"""
        try:
            self.process = self.manager.start(self.config)
            self.created = True
            return self.process
        except Exception as e:
            print(f"Failed to start process: {e}")
            raise
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Clean up process on context exit"""
        if self.process and self.created:
            try:
                if self.auto_stop and self.process.status == 'online':
                    self.manager.stop(self.process.name)
                    print(f"Stopped process: {self.process.name}")
                
                if self.auto_delete:
                    self.manager.delete(self.process.name)
                    print(f"Deleted process: {self.process.name}")
                    
            except Exception as e:
                print(f"Cleanup error for {self.process.name}: {e}")
        
        # Don't suppress exceptions
        return False

@asynccontextmanager
async def managed_pm2_process_async(manager: PM2Manager, config: Dict,
                                  auto_delete: bool = True) -> AsyncGenerator[PM2Process, None]:
    """Async context manager for process lifecycle"""
    process = None
    try:
        # Start process
        process = await manager.start_async(config)
        print(f"Started managed process: {process.name}")
        yield process
        
    except Exception as e:
        print(f"Error in managed process: {e}")
        raise
        
    finally:
        # Cleanup
        if process:
            try:
                if process.status == 'online':
                    await manager.stop_async(process.name)
                    print(f"Stopped managed process: {process.name}")
                
                if auto_delete:
                    await manager.delete_async(process.name)
                    print(f"Deleted managed process: {process.name}")
                    
            except Exception as e:
                print(f"Cleanup error: {e}")

@contextmanager
def pm2_cluster_context(manager: PM2Manager, base_config: Dict, 
                       instances: int = 2):
    """Context manager for managing a cluster of processes"""
    processes = []
    try:
        # Start cluster
        for i in range(instances):
            config = base_config.copy()
            config['name'] = f"{base_config['name']}-{i}"
            config['env'] = config.get('env', {}).copy()
            config['env']['INSTANCE_ID'] = str(i)
            config['env']['PORT'] = str(int(config['env'].get('PORT', 3000)) + i)
            
            process = manager.start(config)
            processes.append(process)
        
        print(f"Started cluster with {len(processes)} instances")
        yield processes
        
    finally:
        # Cleanup all processes
        for process in processes:
            try:
                if process.status == 'online':
                    manager.stop(process.name)
                manager.delete(process.name)
                print(f"Cleaned up: {process.name}")
            except Exception as e:
                print(f"Cleanup error for {process.name}: {e}")

# Usage examples
def context_manager_examples():
    manager = PM2Manager()
    
    # Example 1: Single managed process
    config = {
        "name": "temp-worker",
        "script": "worker.js",
        "instances": 1
    }
    
    with ManagedPM2Process(manager, config) as process:
        print(f"Working with process: {process.name} (PID: {process.pid})")
        
        # Do work with process
        metrics = process.get_metrics()
        print(f"CPU: {metrics.cpu_percent}%, Memory: {metrics.memory_usage}MB")
        
        # Process will be automatically cleaned up
    
    # Example 2: Cluster management
    cluster_config = {
        "name": "api-cluster",
        "script": "api.js",
        "env": {"PORT": "3000", "NODE_ENV": "production"}
    }
    
    with pm2_cluster_context(manager, cluster_config, instances=3) as cluster:
        print(f"Cluster running with {len(cluster)} processes")
        
        # Monitor cluster
        for process in cluster:
            print(f"{process.name}: {process.status}")
        
        # All processes will be cleaned up automatically

# Async example
async def async_context_example():
    manager = PM2Manager()
    
    config = {
        "name": "async-temp",
        "script": "app.js"
    }
    
    async with managed_pm2_process_async(manager, config) as process:
        print(f"Async managed process: {process.name}")
        
        # Async work
        metrics = await process.get_metrics_async()
        print(f"Async metrics: CPU {metrics.cpu_percent}%")
        
        # Automatic cleanup on exit

# Run examples
context_manager_examples()
asyncio.run(async_context_example())

Batch Operations

Efficiently manage multiple processes with batch operations and bulk actions.

Bulk Process Operations

from typing import List, Dict, Union, Callable, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
import time
import logging

@dataclass
class BatchResult:
    success_count: int
    failure_count: int
    total_count: int
    results: List[Dict[str, Any]]
    execution_time: float
    
    @property
    def success_rate(self) -> float:
        return (self.success_count / self.total_count) * 100 if self.total_count > 0 else 0

class PM2BatchOperator:
    """High-performance batch operations for PM2 processes"""
    
    def __init__(self, manager: PM2Manager, max_workers: int = 5,
                 timeout_per_operation: float = 30.0):
        self.manager = manager
        self.max_workers = max_workers
        self.timeout = timeout_per_operation
        self.logger = logging.getLogger(__name__)
    
    def batch_start(self, configs: List[Dict[str, Any]], 
                   parallel: bool = True) -> BatchResult:
        """Start multiple processes in batch"""
        start_time = time.time()
        
        if parallel:
            return self._parallel_operation(configs, self._start_single, start_time)
        else:
            return self._sequential_operation(configs, self._start_single, start_time)
    
    def batch_stop(self, identifiers: List[Union[str, int]], 
                  parallel: bool = True) -> BatchResult:
        """Stop multiple processes in batch"""
        start_time = time.time()
        configs = [{'identifier': ident} for ident in identifiers]
        
        if parallel:
            return self._parallel_operation(configs, self._stop_single, start_time)
        else:
            return self._sequential_operation(configs, self._stop_single, start_time)
    
    def batch_restart(self, identifiers: List[Union[str, int]], 
                     stagger_delay: float = 0.5) -> BatchResult:
        """Restart multiple processes with optional staggering"""
        start_time = time.time()
        results = []
        success_count = 0
        
        for i, identifier in enumerate(identifiers):
            try:
                # Add stagger delay between restarts
                if i > 0 and stagger_delay > 0:
                    time.sleep(stagger_delay)
                
                result = self.manager.restart(identifier)
                results.append({
                    'identifier': identifier,
                    'success': True,
                    'result': result,
                    'error': None
                })
                success_count += 1
                
            except Exception as e:
                results.append({
                    'identifier': identifier,
                    'success': False,
                    'result': None,
                    'error': str(e)
                })
        
        execution_time = time.time() - start_time
        return BatchResult(
            success_count=success_count,
            failure_count=len(identifiers) - success_count,
            total_count=len(identifiers),
            results=results,
            execution_time=execution_time
        )
    
    def _parallel_operation(self, configs: List[Dict], operation_func: Callable,
                          start_time: float) -> BatchResult:
        """Execute operations in parallel using ThreadPoolExecutor"""
        results = []
        success_count = 0
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Submit all tasks
            future_to_config = {
                executor.submit(operation_func, config): config 
                for config in configs
            }
            
            # Collect results as they complete
            for future in as_completed(future_to_config, timeout=self.timeout * len(configs)):
                config = future_to_config[future]
                try:
                    result = future.result(timeout=self.timeout)
                    results.append({
                        'config': config,
                        'success': True,
                        'result': result,
                        'error': None
                    })
                    success_count += 1
                    
                except Exception as e:
                    results.append({
                        'config': config,
                        'success': False,
                        'result': None,
                        'error': str(e)
                    })
        
        execution_time = time.time() - start_time
        return BatchResult(
            success_count=success_count,
            failure_count=len(configs) - success_count,
            total_count=len(configs),
            results=results,
            execution_time=execution_time
        )
    
    def _sequential_operation(self, configs: List[Dict], operation_func: Callable,
                            start_time: float) -> BatchResult:
        """Execute operations sequentially"""
        results = []
        success_count = 0
        
        for config in configs:
            try:
                result = operation_func(config)
                results.append({
                    'config': config,
                    'success': True,
                    'result': result,
                    'error': None
                })
                success_count += 1
                
            except Exception as e:
                results.append({
                    'config': config,
                    'success': False,
                    'result': None,
                    'error': str(e)
                })
        
        execution_time = time.time() - start_time
        return BatchResult(
            success_count=success_count,
            failure_count=len(configs) - success_count,
            total_count=len(configs),
            results=results,
            execution_time=execution_time
        )
    
    def _start_single(self, config: Dict) -> Any:
        """Start a single process"""
        return self.manager.start(config)
    
    def _stop_single(self, config: Dict) -> Any:
        """Stop a single process"""
        return self.manager.stop(config['identifier'])

# Usage example
def batch_operations_example():
    manager = PM2Manager()
    batch_operator = PM2BatchOperator(manager, max_workers=3)
    
    # Batch start multiple applications
    app_configs = [
        {
            "name": f"worker-{i}",
            "script": "worker.js",
            "instances": 1,
            "env": {"WORKER_ID": str(i), "PORT": str(3000 + i)}
        }
        for i in range(10)
    ]
    
    print("Starting batch operations...")
    
    # Start all processes in parallel
    start_result = batch_operator.batch_start(app_configs, parallel=True)
    print(f"Batch Start Results:")
    print(f"  Success: {start_result.success_count}/{start_result.total_count}")
    print(f"  Success Rate: {start_result.success_rate:.1f}%")
    print(f"  Execution Time: {start_result.execution_time:.2f}s")
    
    # Wait a bit
    time.sleep(5)
    
    # Get names of successfully started processes
    successful_names = [
        r['result'].name for r in start_result.results 
        if r['success'] and r['result']
    ]
    
    # Batch restart with staggering
    restart_result = batch_operator.batch_restart(successful_names, stagger_delay=1.0)
    print(f"\nBatch Restart Results:")
    print(f"  Success: {restart_result.success_count}/{restart_result.total_count}")
    print(f"  Execution Time: {restart_result.execution_time:.2f}s")
    
    # Stop all processes
    stop_result = batch_operator.batch_stop(successful_names, parallel=True)
    print(f"\nBatch Stop Results:")
    print(f"  Success: {stop_result.success_count}/{stop_result.total_count}")
    print(f"  Execution Time: {stop_result.execution_time:.2f}s")

batch_operations_example()