kin/core/telegram.py

103 lines
3.3 KiB
Python
Raw Permalink Normal View History

"""
Kin Telegram escalation notifications.
Sends a message when a PM agent detects a blocked agent.
Bot token is read from /Volumes/secrets/env/projects.env [kin] section.
Chat ID is read from KIN_TG_CHAT_ID env var.
"""
import configparser
import json
import logging
import os
import urllib.error
import urllib.parse
import urllib.request
from pathlib import Path
_logger = logging.getLogger("kin.telegram")
_SECRETS_PATH = Path("/Volumes/secrets/env/projects.env")
_TELEGRAM_API = "https://api.telegram.org/bot{token}/sendMessage"
def _load_kin_config() -> dict:
"""Load [kin] section from projects.env. Returns dict with available keys."""
if not _SECRETS_PATH.exists():
_logger.warning("secrets not mounted: %s", _SECRETS_PATH)
return {}
parser = configparser.ConfigParser()
parser.read(str(_SECRETS_PATH))
if "kin" not in parser:
_logger.warning("No [kin] section in projects.env")
return {}
return dict(parser["kin"])
def send_telegram_escalation(
task_id: str,
project_name: str,
agent_role: str,
reason: str,
pipeline_step: str | None,
) -> bool:
"""Send a Telegram escalation message for a blocked agent.
Returns True if message was sent successfully, False otherwise.
Never raises escalation errors must never block the pipeline.
"""
config = _load_kin_config()
bot_token = config.get("tg_bot") or os.environ.get("KIN_TG_BOT_TOKEN")
if not bot_token:
_logger.warning("Telegram bot token not configured; skipping escalation for %s", task_id)
return False
chat_id = os.environ.get("KIN_TG_CHAT_ID")
if not chat_id:
_logger.warning("KIN_TG_CHAT_ID not set; skipping Telegram escalation for %s", task_id)
return False
step_info = f" (шаг {pipeline_step})" if pipeline_step else ""
text = (
f"🚨 *Эскалация* — агент заблокирован\n\n"
f"*Проект:* {_escape_md(project_name)}\n"
f"*Задача:* `{task_id}`\n"
f"*Агент:* `{agent_role}{step_info}`\n"
f"*Причина:*\n{_escape_md(reason or '')}"
)
payload = json.dumps({
"chat_id": chat_id,
"text": text,
"parse_mode": "Markdown",
}).encode("utf-8")
url = _TELEGRAM_API.format(token=bot_token)
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
if resp.status == 200:
_logger.info("Telegram escalation sent for task %s", task_id)
return True
_logger.warning("Telegram API returned status %d for task %s", resp.status, task_id)
return False
except urllib.error.URLError as exc:
_logger.warning("Telegram send failed for task %s: %s", task_id, exc)
return False
except Exception as exc:
_logger.warning("Unexpected Telegram error for task %s: %s", task_id, exc)
return False
def _escape_md(text: str) -> str:
"""Escape Markdown special characters for Telegram MarkdownV1."""
# MarkdownV1 is lenient — only escape backtick/asterisk/underscore in free text
for ch in ("*", "_", "`"):
text = text.replace(ch, f"\\{ch}")
return text