Start, stop, and manage PM2 processes with simple commands.
from pm2 import PM2Manager
# Initialize PM2 manager
manager = PM2Manager()
# Start a process
process = manager.start({
"name": "my-app",
"script": "app.js",
"instances": 1
})
print(f"Started process: {process.name} (PID: {process.pid})")
# Stop the process
manager.stop("my-app")
print("Process stopped")
Use async/await patterns for non-blocking process management.
import asyncio
from pm2 import PM2Manager
async def manage_processes():
async with PM2Manager() as manager:
# Start multiple processes
processes = await asyncio.gather(
manager.start_async("app1.js"),
manager.start_async("app2.js"),
manager.start_async("app3.js")
)
# Monitor processes
for process in processes:
metrics = await process.get_metrics_async()
print(f"{process.name}: CPU {metrics.cpu_percent}%, Memory {metrics.memory_usage}MB")
# Graceful shutdown
await asyncio.gather(*[
manager.stop_async(p.name) for p in processes
])
# Run async function
asyncio.run(manage_processes())
Monitor process metrics and health in real-time.
import time
from pm2 import PM2Manager
manager = PM2Manager()
# Start a process to monitor
process = manager.start("app.js")
# Monitor for 30 seconds
for i in range(6):
# Get current metrics
metrics = process.get_metrics()
print(f"=== Monitoring {process.name} ===")
print(f"Status: {process.status}")
print(f"CPU: {metrics.cpu_percent:.1f}%")
print(f"Memory: {metrics.memory_usage / 1024 / 1024:.1f} MB")
print(f"Uptime: {metrics.uptime}s")
print(f"Restarts: {metrics.restarts}")
print("=" * 30)
time.sleep(5)
Use detailed configuration options for production deployments.
from pm2 import PM2Manager, ProcessConfiguration
# Create detailed configuration
config = ProcessConfiguration(
name="production-app",
script="app.js",
instances=4, # Cluster mode
exec_mode="cluster",
env={
"NODE_ENV": "production",
"PORT": "3000",
"DATABASE_URL": "postgresql://..."
},
env_production={
"NODE_ENV": "production",
"DEBUG": "false"
},
log_file="/var/log/app.log",
error_file="/var/log/app-error.log",
pid_file="/var/run/app.pid",
max_memory_restart="500M",
node_args="--max-old-space-size=1024",
watch=False,
ignore_watch=["node_modules", ".git"]
)
# Start with configuration
manager = PM2Manager()
process = manager.start(config)
print(f"Started {process.name} with {config.instances} instances")
Handle errors gracefully with comprehensive exception handling.
from pm2 import PM2Manager, PM2Exception, ProcessNotFoundError, PM2CommandError
manager = PM2Manager()
try:
# Try to start a process
process = manager.start({
"name": "test-app",
"script": "nonexistent.js" # This will fail
})
except PM2CommandError as e:
print(f"PM2 command failed: {e}")
print(f"Command: {e.command}")
print(f"Exit code: {e.exit_code}")
print(f"Error output: {e.stderr}")
except PM2Exception as e:
print(f"PM2 error: {e}")
# Handle process not found
try:
process = manager.get_process("nonexistent-app")
except ProcessNotFoundError as e:
print(f"Process not found: {e}")
# List available processes
processes = manager.list()
print("Available processes:")
for p in processes:
print(f" - {p.name} (ID: {p.pid})")
Best practices and patterns for production environments.
import logging
from contextlib import asynccontextmanager
from pm2 import PM2Manager, ProcessConfiguration
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ProductionPM2Manager:
def __init__(self):
self.manager = PM2Manager()
self.processes = {}
async def deploy_app(self, app_config):
"""Deploy application with zero-downtime"""
try:
# Start new instance
new_process = await self.manager.start_async(app_config)
# Health check
if await self.health_check(new_process):
# Stop old instance if exists
old_name = f"{app_config['name']}-old"
if old_name in self.processes:
await self.manager.stop_async(old_name)
self.processes[app_config['name']] = new_process
logger.info(f"Successfully deployed {app_config['name']}")
return True
else:
# Rollback
await self.manager.delete_async(new_process.name)
logger.error(f"Health check failed for {app_config['name']}")
return False
except Exception as e:
logger.error(f"Deployment failed: {e}")
return False
async def health_check(self, process, timeout=30):
"""Check if process is healthy"""
import asyncio
for _ in range(timeout):
metrics = await process.get_metrics_async()
if process.status == "online" and metrics.cpu_percent < 90:
return True
await asyncio.sleep(1)
return False
# Usage
manager = ProductionPM2Manager()
# Deploy configuration
config = {
"name": "api-server",
"script": "server.js",
"instances": "max",
"exec_mode": "cluster",
"env": {"NODE_ENV": "production"}
}
asyncio.run(manager.deploy_app(config))
This library is an independent Python wrapper for the PM2 Process Manager. PM2 is a separate open-source project developed by Keymetrics/Unitech. This Python library is not affiliated with, endorsed by, or officially supported by the PM2 team.
This wrapper library communicates with PM2 through its command-line interface and does not modify or redistribute any PM2 code. Users must install PM2 separately. All PM2 trademarks and copyrights belong to their respective owners.