#!/usr/bin/env python3 """ Test integrazione Telegram REALE – Canale configurato ====================================================== Testa: 1. Lettura canale Telegram dal database (bot_token + chat_id) 2. Chiamata diretta alla funzione send_message (httpx → Bot API) 3. Verifica risposta Telegram (message_id, chat, testo) 4. Test via NotificationService.test_channel (flusso completo) Eseguire DENTRO il container backend: docker exec pecflow-backend-1 python \ /app/tests/integration/test_telegram_real.py Oppure specificando credenziali manuali via env: docker exec -e TELEGRAM_BOT_TOKEN=xxx -e TELEGRAM_CHAT_ID=-100yyy \ pecflow-backend-1 python /app/tests/integration/test_telegram_real.py """ import asyncio import base64 import json import os import sys from datetime import datetime # ─── Variabili d'ambiente ───────────────────────────────────────────────────── os.environ.setdefault("ENCRYPTION_KEY", "6465762d656e6372797074696f6e2d6b65792d6e6f742d666f722d70726f6400") os.environ.setdefault("SECRET_KEY", "dev-secret-key-not-for-production-use-only-for-local-0000000000000") os.environ.setdefault("DATABASE_URL", "postgresql+asyncpg://pecflow:pecflow_dev_password@db:5432/pecflow") os.environ.setdefault("REDIS_URL", "redis://redis:6379/0") os.environ.setdefault("MINIO_ENDPOINT", "minio:9000") # ─── Utilities ──────────────────────────────────────────────────────────────── def _sep(char: str = "─", width: int = 60) -> None: print(char * width) def _banner(bot_token_masked: str, chat_id: str) -> None: _sep("═") print(" PecFlow – Test Telegram Reale") _sep("═") print(f" Timestamp : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f" Bot Token : {bot_token_masked}") print(f" Chat ID : {chat_id}") _sep("═") print() def _mask_token(token: str) -> str: """Nasconde la parte centrale del token per sicurezza.""" parts = token.split(":") if len(parts) == 2: return f"{parts[0]}:{'*' * 10}...{parts[1][-6:]}" return token[:8] + "..." + token[-6:] def _decrypt_b64(enc: str) -> dict: """Decifra il config_enc (base64 semplice come in notification_service.py).""" raw = base64.b64decode(enc.encode()) return json.loads(raw.decode()) # ─── STEP 1: Lettura canale dal DB ─────────────────────────────────────────── async def load_channel_from_db() -> tuple[str, str, str] | None: """ Carica bot_token e chat_id dal primo canale Telegram nel DB. Restituisce (channel_id, bot_token, chat_id) o None se non trovato. """ _sep() print("STEP 1 – LETTURA CANALE TELEGRAM DAL DATABASE") _sep() try: from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy import text db_url = os.environ["DATABASE_URL"] engine = create_async_engine(db_url) async with AsyncSession(engine) as session: result = await session.execute( text( "SELECT id, name, config, config_enc " "FROM notification_channels " "WHERE channel_type = 'telegram' AND is_active = true " "ORDER BY created_at DESC LIMIT 1" ) ) row = result.fetchone() if not row: print(" ⚠️ Nessun canale Telegram attivo trovato nel database.") print(" Crea un canale Telegram tramite l'interfaccia o via API.") return None channel_id = str(row[0]) channel_name = row[1] config = row[2] or {} config_enc = row[3] chat_id = str(config.get("chat_id", "")) if not chat_id: print(f" ❌ Il canale '{channel_name}' non ha chat_id configurato.") return None if not config_enc: print(f" ❌ Il canale '{channel_name}' non ha bot_token configurato (config_enc mancante).") return None secret = _decrypt_b64(config_enc) bot_token = secret.get("bot_token", "") if not bot_token: print(f" ❌ Il canale '{channel_name}' non ha bot_token nel config_enc.") return None print(f" ✅ Canale trovato: '{channel_name}'") print(f" ID : {channel_id}") print(f" Chat ID : {chat_id}") print(f" Bot Token : {_mask_token(bot_token)}") print() return channel_id, bot_token, chat_id except Exception as exc: print(f" ❌ Errore lettura DB: {exc}") import traceback traceback.print_exc() return None # ─── STEP 2: Invio diretto via httpx ───────────────────────────────────────── async def test_direct_send(bot_token: str, chat_id: str) -> bool: """ Testa send_message() direttamente (senza passare da DB/service). """ _sep() print("STEP 2 – INVIO DIRETTO VIA BOT API (httpx)") _sep() from app.notifications.telegram import TelegramError, send_message ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") text = ( f"🧪 PecFlow – Test integrazione\n\n" f"Test invio diretto via send_message()\n\n" f"🕐 {ts}" ) print(f" Chiamata: send_message(chat_id={chat_id}, parse_mode=HTML)") print() try: result = await send_message( bot_token=bot_token, chat_id=chat_id, text=text, parse_mode="HTML", ) msg_id = result.get("message_id") chat_info = result.get("chat", {}) chat_title = chat_info.get("title") or chat_info.get("username") or chat_info.get("id") date = result.get("date") print(f" ✅ INVIO OK") print(f" message_id : {msg_id}") print(f" chat : {chat_title}") print(f" date : {date}") print() return True except TelegramError as exc: print(f" ❌ TelegramError: {exc}") if exc.http_status: print(f" HTTP status: {exc.http_status}") if exc.api_code: print(f" API code : {exc.api_code}") print() return False except Exception as exc: print(f" ❌ Errore imprevisto: {exc}") import traceback traceback.print_exc() print() return False # ─── STEP 3: Test via send_test_message ────────────────────────────────────── async def test_send_test_message(bot_token: str, chat_id: str) -> bool: """ Testa send_test_message() che invia il messaggio formattato standard. """ _sep() print("STEP 3 – INVIO MESSAGGIO DI TEST FORMATTATO") _sep() from app.notifications.telegram import TelegramError, send_test_message print(" Chiamata: send_test_message(channel_name='Test PecFlow')") print() try: result = await send_test_message( bot_token=bot_token, chat_id=chat_id, channel_name="Test PecFlow", ) msg_id = result.get("message_id") print(f" ✅ INVIO OK – message_id={msg_id}") print() return True except TelegramError as exc: print(f" ❌ TelegramError: {exc}") print() return False # ─── STEP 4: Test via NotificationService ──────────────────────────────────── async def test_via_notification_service(channel_id: str) -> bool: """ Testa il flusso completo: NotificationService.test_channel() che carica dal DB, decifra il token e chiama la Bot API. """ _sep() print("STEP 4 – FLUSSO COMPLETO VIA NotificationService.test_channel()") _sep() import uuid from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession from sqlalchemy import text # Recupera il tenant_id del canale db_url = os.environ["DATABASE_URL"] engine = create_async_engine(db_url) try: async with AsyncSession(engine) as session: r = await session.execute( text("SELECT tenant_id FROM notification_channels WHERE id = :id"), {"id": channel_id}, ) row = r.fetchone() if not row: print(f" ❌ Canale {channel_id} non trovato nel DB.") return False tenant_id = uuid.UUID(str(row[0])) print(f" Tenant ID : {tenant_id}") print(f" Channel ID: {channel_id}") print() from app.services.notification_service import NotificationService async with AsyncSession(engine) as session: service = NotificationService(session) result = await service.test_channel( channel_id=uuid.UUID(channel_id), tenant_id=tenant_id, ) if result.success: print(f" ✅ test_channel OK") print(f" Messaggio: {result.message}") print(f" HTTP status: {result.http_status}") else: print(f" ❌ test_channel FALLITO") print(f" Motivo: {result.message}") print(f" HTTP status: {result.http_status}") print() return result.success except Exception as exc: print(f" ❌ Errore: {exc}") import traceback traceback.print_exc() return False # ─── Main ───────────────────────────────────────────────────────────────────── async def main() -> None: # Controlla se le credenziali sono passate via env (override manuale) manual_bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") manual_chat_id = os.environ.get("TELEGRAM_CHAT_ID") if manual_bot_token and manual_chat_id: channel_id = None bot_token = manual_bot_token chat_id = manual_chat_id print("⚙️ Usando credenziali da variabili d'ambiente TELEGRAM_BOT_TOKEN / TELEGRAM_CHAT_ID") print() else: # Carica dal DB db_result = await load_channel_from_db() if not db_result: print("❌ Impossibile procedere: nessun canale Telegram configurato.") sys.exit(1) channel_id, bot_token, chat_id = db_result _banner(_mask_token(bot_token), chat_id) results: list[tuple[str, bool]] = [] # STEP 2: invio diretto ok = await test_direct_send(bot_token, chat_id) results.append(("Invio diretto (send_message)", ok)) # STEP 3: messaggio formattato ok = await test_send_test_message(bot_token, chat_id) results.append(("Messaggio di test formattato (send_test_message)", ok)) # STEP 4: flusso completo via NotificationService (solo se abbiamo il channel_id dal DB) if channel_id: ok = await test_via_notification_service(channel_id) results.append(("Flusso completo (NotificationService.test_channel)", ok)) # ── Riepilogo ──────────────────────────────────────────────────────────── _sep("═") print(" RIEPILOGO TEST TELEGRAM") _sep("═") all_ok = True for name, success in results: icon = "✅" if success else "❌" print(f" {icon} {name}") if not success: all_ok = False _sep("═") print() if all_ok: print("🎉 Tutti i test Telegram superati con successo!") else: print("⚠️ Alcuni test Telegram sono falliti.") sys.exit(1) if __name__ == "__main__": asyncio.run(main())