Files
PecHub/worker/app/notifications/telegram.py
T
2026-03-27 15:13:14 +01:00

70 lines
2.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram Bot API invio messaggi via sendMessage (worker).
Copia del sender backend: i due container sono separati e non
possono condividere package, quindi il codice e' duplicato.
"""
import httpx
TELEGRAM_API_BASE = "https://api.telegram.org"
DEFAULT_TIMEOUT = 10.0
class TelegramError(Exception):
def __init__(self, message: str, http_status: int | None = None, api_code: int | None = None):
super().__init__(message)
self.http_status = http_status
self.api_code = api_code
async def send_message(
bot_token: str,
chat_id: str,
text: str,
parse_mode: str = "HTML",
disable_web_page_preview: bool = True,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
"""
Invia un messaggio a un canale/gruppo/utente Telegram.
Returns:
dict con il risultato della API Telegram (result.message_id, ecc.)
Raises:
TelegramError: in caso di errore HTTP o risposta API non-ok
"""
url = f"{TELEGRAM_API_BASE}/bot{bot_token}/sendMessage"
payload: dict = {"chat_id": chat_id, "text": text}
if parse_mode:
payload["parse_mode"] = parse_mode
if disable_web_page_preview:
payload["link_preview_options"] = {"is_disabled": True}
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, json=payload)
except httpx.TimeoutException as exc:
raise TelegramError(f"Timeout Telegram ({timeout}s)") from exc
except httpx.RequestError as exc:
raise TelegramError(f"Errore di rete Telegram: {exc}") from exc
if response.status_code != 200:
raise TelegramError(
f"Telegram API HTTP {response.status_code}: {response.text[:200]}",
http_status=response.status_code,
)
data = response.json()
if not data.get("ok"):
api_code = data.get("error_code")
description = data.get("description", "Errore sconosciuto")
raise TelegramError(
f"Telegram API error {api_code}: {description}",
http_status=response.status_code,
api_code=api_code,
)
return data.get("result", {})