""" PyServe Service Runner Handles starting, stopping, and managing services. Integrates with ProcessManager for actual process management. """ import asyncio import os import signal import sys import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from ..config import Config from ..process_manager import ProcessConfig, ProcessInfo, ProcessManager, ProcessState from .state import ServiceState, StateManager @dataclass class ServiceDefinition: name: str path: str app_path: str app_type: str = "asgi" module_path: Optional[str] = None workers: int = 1 health_check_path: str = "/health" health_check_interval: float = 10.0 health_check_timeout: float = 5.0 health_check_retries: int = 3 max_restart_count: int = 5 restart_delay: float = 1.0 shutdown_timeout: float = 30.0 strip_path: bool = True env: Dict[str, str] = field(default_factory=dict) class ServiceRunner: def __init__(self, config: Config, state_manager: StateManager): self.config = config self.state_manager = state_manager self._process_manager: Optional[ProcessManager] = None self._services: Dict[str, ServiceDefinition] = {} self._running = False self._parse_services() def _parse_services(self) -> None: for ext in self.config.extensions: if ext.type == "process_orchestration": apps = ext.config.get("apps", []) for app_config in apps: service = ServiceDefinition( name=app_config.get("name", "unnamed"), path=app_config.get("path", "/"), app_path=app_config.get("app_path", ""), app_type=app_config.get("app_type", "asgi"), module_path=app_config.get("module_path"), workers=app_config.get("workers", 1), health_check_path=app_config.get("health_check_path", "/health"), health_check_interval=app_config.get("health_check_interval", 10.0), health_check_timeout=app_config.get("health_check_timeout", 5.0), health_check_retries=app_config.get("health_check_retries", 3), max_restart_count=app_config.get("max_restart_count", 5), restart_delay=app_config.get("restart_delay", 1.0), shutdown_timeout=app_config.get("shutdown_timeout", 30.0), strip_path=app_config.get("strip_path", True), env=app_config.get("env", {}), ) self._services[service.name] = service def get_services(self) -> Dict[str, ServiceDefinition]: return self._services.copy() def get_service(self, name: str) -> Optional[ServiceDefinition]: return self._services.get(name) async def start( self, services: Optional[List[str]] = None, scale_map: Optional[Dict[str, int]] = None, force_recreate: bool = False, wait_healthy: bool = False, timeout: int = 60, ) -> None: from .output import console, print_error, print_info, print_success scale_map = scale_map or {} target_services = services or list(self._services.keys()) if not target_services: print_info("No services configured. Add services to your config.yaml") return for name in target_services: if name not in self._services: print_error(f"Service '{name}' not found in configuration") return port_range = (9000, 9999) for ext in self.config.extensions: if ext.type == "process_orchestration": port_range = tuple(ext.config.get("port_range", [9000, 9999])) break self._process_manager = ProcessManager( port_range=port_range, health_check_enabled=True, ) await self._process_manager.start() self._running = True for name in target_services: service = self._services[name] workers = scale_map.get(name, service.workers) proc_config = ProcessConfig( name=name, app_path=service.app_path, app_type=service.app_type, workers=workers, module_path=service.module_path, health_check_enabled=True, health_check_path=service.health_check_path, health_check_interval=service.health_check_interval, health_check_timeout=service.health_check_timeout, health_check_retries=service.health_check_retries, max_restart_count=service.max_restart_count, restart_delay=service.restart_delay, shutdown_timeout=service.shutdown_timeout, env=service.env, ) try: await self._process_manager.register(proc_config) success = await self._process_manager.start_process(name) if success: info = self._process_manager.get_process(name) if info: self.state_manager.update_service( name, state="running", pid=info.pid, port=info.port, workers=workers, started_at=time.time(), ) print_success(f"Started service: {name}") else: self.state_manager.update_service(name, state="failed") print_error(f"Failed to start service: {name}") except Exception as e: print_error(f"Error starting {name}: {e}") self.state_manager.update_service(name, state="failed") if wait_healthy: print_info("Waiting for services to be healthy...") await self._wait_healthy(target_services, timeout) console.print("\n[bold]Services running. Press Ctrl+C to stop.[/bold]\n") try: while self._running: await asyncio.sleep(1) await self._sync_state() except asyncio.CancelledError: pass finally: await self.stop_all() async def _sync_state(self) -> None: if not self._process_manager: return for name, info in self._process_manager.get_all_processes().items(): state_str = info.state.value health_status = "healthy" if info.health_check_failures == 0 else "unhealthy" self.state_manager.update_service( name, state=state_str, pid=info.pid, port=info.port, ) service_state = self.state_manager.get_service(name) if service_state: service_state.health.status = health_status service_state.health.failures = info.health_check_failures self.state_manager.save() async def _wait_healthy(self, services: List[str], timeout: int) -> None: from .output import print_info, print_warning start_time = time.time() while time.time() - start_time < timeout: all_healthy = True for name in services: if not self._process_manager: continue info = self._process_manager.get_process(name) if not info or info.state != ProcessState.RUNNING: all_healthy = False break if all_healthy: print_info("All services healthy") return await asyncio.sleep(1) print_warning("Timeout waiting for services to become healthy") async def stop_all(self, timeout: int = 30) -> None: from .output import print_info self._running = False if self._process_manager: print_info("Stopping all services...") await self._process_manager.stop() self._process_manager = None for name in self._services: self.state_manager.update_service( name, state="stopped", pid=None, ) def stop(self) -> None: self._running = False async def start_service(self, name: str, timeout: int = 60) -> bool: from .output import print_error service = self._services.get(name) if not service: print_error(f"Service '{name}' not found") return False if not self._process_manager: self._process_manager = ProcessManager() await self._process_manager.start() proc_config = ProcessConfig( name=name, app_path=service.app_path, app_type=service.app_type, workers=service.workers, module_path=service.module_path, health_check_enabled=True, health_check_path=service.health_check_path, env=service.env, ) try: existing = self._process_manager.get_process(name) if not existing: await self._process_manager.register(proc_config) success = await self._process_manager.start_process(name) if success: info = self._process_manager.get_process(name) self.state_manager.update_service( name, state="running", pid=info.pid if info else None, port=info.port if info else 0, started_at=time.time(), ) return success except Exception as e: print_error(f"Error starting {name}: {e}") return False async def stop_service(self, name: str, timeout: int = 30, force: bool = False) -> bool: if not self._process_manager: self.state_manager.update_service(name, state="stopped", pid=None) return True try: success = await self._process_manager.stop_process(name) if success: self.state_manager.update_service( name, state="stopped", pid=None, ) return success except Exception as e: from .output import print_error print_error(f"Error stopping {name}: {e}") return False async def restart_service(self, name: str, timeout: int = 60) -> bool: if not self._process_manager: return False try: self.state_manager.update_service(name, state="restarting") success = await self._process_manager.restart_process(name) if success: info = self._process_manager.get_process(name) self.state_manager.update_service( name, state="running", pid=info.pid if info else None, port=info.port if info else 0, started_at=time.time(), ) return success except Exception as e: from .output import print_error print_error(f"Error restarting {name}: {e}") return False async def scale_service(self, name: str, workers: int, timeout: int = 60, wait: bool = True) -> bool: # For now, this requires restart with new worker count # In future, could implement hot-reloading service = self._services.get(name) if not service: return False # Update service definition service.workers = workers # Restart with new configuration return await self.restart_service(name, timeout) def start_daemon( self, services: Optional[List[str]] = None, scale_map: Optional[Dict[str, int]] = None, force_recreate: bool = False, ) -> int: import subprocess cmd = [ sys.executable, "-m", "pyserve.cli._daemon", "--config", str(self.state_manager.state_dir.parent / "config.yaml"), "--state-dir", str(self.state_manager.state_dir), ] if services: cmd.extend(["--services", ",".join(services)]) if scale_map: for name, workers in scale_map.items(): cmd.extend(["--scale", f"{name}={workers}"]) if force_recreate: cmd.append("--force-recreate") env = os.environ.copy() process = subprocess.Popen( cmd, env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, start_new_session=True, ) return process.pid