Files
2026-03-27 15:13:14 +01:00

75 lines
2.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Webhook sender (worker) POST HTTP con firma HMAC-SHA256.
Copia del sender backend: i due container sono separati.
"""
import hashlib
import hmac
import json
import uuid as uuid_mod
import httpx
DEFAULT_TIMEOUT = 10.0
class WebhookError(Exception):
def __init__(self, message: str, http_status: int | None = None):
super().__init__(message)
self.http_status = http_status
async def send_webhook(
url: str,
payload: dict,
event_type: str = "new_message",
webhook_secret: str | None = None,
timeout: float = DEFAULT_TIMEOUT,
) -> dict:
"""
Invia un payload JSON a un webhook URL.
Returns:
dict con http_status, delivery_id
Raises:
WebhookError: in caso di timeout, errore di rete o HTTP >= 400
"""
body = json.dumps(payload, ensure_ascii=False, default=str).encode("utf-8")
delivery_id = str(uuid_mod.uuid4())
headers = {
"Content-Type": "application/json",
"X-PEChub-Event": event_type,
"X-Delivery": delivery_id,
"User-Agent": "PEChub-Webhook/1.0",
}
if webhook_secret:
sig = hmac.new(
webhook_secret.encode("utf-8"),
body,
hashlib.sha256,
).hexdigest()
headers["X-Hub-Signature-256"] = f"sha256={sig}"
async with httpx.AsyncClient(timeout=timeout) as client:
try:
response = await client.post(url, content=body, headers=headers)
except httpx.TimeoutException as exc:
raise WebhookError(f"Timeout webhook dopo {timeout}s") from exc
except httpx.RequestError as exc:
raise WebhookError(f"Errore di rete webhook: {exc}") from exc
if response.status_code >= 400:
raise WebhookError(
f"Webhook HTTP {response.status_code}: {response.text[:200]}",
http_status=response.status_code,
)
return {
"http_status": response.status_code,
"delivery_id": delivery_id,
}