""" Kin — structured deploy module. Business logic for project deployments: - Runtime-based step templates (docker/node/python/static) - Local and SSH execution - Dependency chain traversal via project_links """ import shlex import sqlite3 import subprocess import time VALID_RUNTIMES = {"docker", "node", "python", "static"} # Base command templates per runtime. # deploy_restart_cmd (if set) is appended as the final step for all runtimes. RUNTIME_STEPS = { "docker": ["git pull", "docker compose up -d --build"], "node": ["git pull", "npm install --production", "pm2 restart all"], "python": ["git pull", "pip install -r requirements.txt"], "static": ["git pull", "nginx -s reload"], } def build_deploy_steps(project: dict) -> list[str]: """Build deploy command list based on runtime and project config. Returns empty list if deploy_runtime is not set or invalid. Appends deploy_restart_cmd as the last step if provided. """ runtime = project.get("deploy_runtime") if not runtime or runtime not in RUNTIME_STEPS: return [] steps = list(RUNTIME_STEPS[runtime]) restart_cmd = project.get("deploy_restart_cmd") if restart_cmd and restart_cmd.strip(): steps.append(restart_cmd.strip()) return steps def _build_ssh_cmd(project: dict, command: str) -> list[str]: """Build SSH subprocess command list to run a shell command on deploy_host.""" deploy_host = project.get("deploy_host") or project.get("ssh_host") ssh_user = project.get("ssh_user") or "root" ssh_key = project.get("ssh_key_path") proxy_jump = project.get("ssh_proxy_jump") deploy_path = project.get("deploy_path") full_cmd = f"cd {shlex.quote(deploy_path)} && {command}" if deploy_path else command cmd = ["ssh"] if ssh_key: cmd += ["-i", ssh_key] if proxy_jump: cmd += ["-J", proxy_jump] cmd += ["-o", "StrictHostKeyChecking=no", "-o", "BatchMode=yes"] cmd += [f"{ssh_user}@{deploy_host}", full_cmd] return cmd def execute_deploy(project: dict, conn: sqlite3.Connection) -> dict: """Execute deploy steps for a project. Returns structured result. Returns: { "success": bool, "steps": list[str], "results": list[{"step", "stdout", "stderr", "exit_code"}], } """ deploy_host = project.get("deploy_host") or project.get("ssh_host") steps = build_deploy_steps(project) if not steps: return { "success": False, "steps": [], "results": [], "error": "No deploy steps: deploy_runtime not set or invalid", } deploy_path = project.get("deploy_path") or project.get("path") or None results = [] overall_success = True for step in steps: try: if deploy_host: cmd = _build_ssh_cmd(project, step) proc = subprocess.run( cmd, capture_output=True, text=True, timeout=120, ) else: # WARNING: shell=True — deploy commands are admin-only, set by project owner proc = subprocess.run( step, shell=True, # WARNING: shell=True — command is admin-only cwd=deploy_path, capture_output=True, text=True, timeout=120, ) except subprocess.TimeoutExpired: results.append({ "step": step, "stdout": "", "stderr": "Timed out after 120 seconds", "exit_code": -1, }) overall_success = False break except Exception as e: results.append({ "step": step, "stdout": "", "stderr": str(e), "exit_code": -1, }) overall_success = False break results.append({ "step": step, "stdout": proc.stdout, "stderr": proc.stderr, "exit_code": proc.returncode, }) if proc.returncode != 0: overall_success = False break # Stop on first failure return { "success": overall_success, "steps": steps, "results": results, } def get_deploy_chain(conn: sqlite3.Connection, project_id: str) -> list[str]: """Get ordered deploy chain: project + all downstream dependents (BFS, cycle-safe). Traverses project_links where type='depends_on': - from_project depends_on to_project - When to_project changes, from_project must also be redeployed. Returns list starting with project_id, followed by dependents in BFS order. """ visited: set[str] = {project_id} chain: list[str] = [project_id] queue: list[str] = [project_id] while queue: current = queue.pop(0) rows = conn.execute( "SELECT from_project FROM project_links" " WHERE to_project = ? AND type = 'depends_on'", (current,), ).fetchall() for row in rows: dep_id = row[0] if dep_id not in visited: visited.add(dep_id) chain.append(dep_id) queue.append(dep_id) return chain def deploy_with_dependents(conn: sqlite3.Connection, project_id: str) -> dict: """Deploy project and all dependents in chain order. Returns: { "success": bool, "steps": list[str], # main project steps "results": list[dict], # main project step results "dependents_deployed": list[str], } """ from core.models import get_project chain = get_deploy_chain(conn, project_id) dependents = [pid for pid in chain if pid != project_id] # Deploy main project first main_project = get_project(conn, project_id) if not main_project: return { "success": False, "steps": [], "results": [], "dependents_deployed": [], "error": f"Project '{project_id}' not found", } main_result = execute_deploy(main_project, conn) overall_success = main_result["success"] dependents_deployed = [] if main_result["success"] and dependents: for dep_id in dependents: dep_project = get_project(conn, dep_id) if dep_project: dep_result = execute_deploy(dep_project, conn) if dep_result["success"]: dependents_deployed.append(dep_id) else: overall_success = False return { "success": overall_success, "steps": main_result.get("steps", []), "results": main_result.get("results", []), "dependents_deployed": dependents_deployed, }