Files
decluttarr/src/utils/rest.py
Benjamin Harder bae0c67d13 Cleanup of main.py, and fixing
TypeError: unsupported operand type(s) for +: 'dict' and 'dict'
2024-03-29 14:47:16 +01:00

73 lines
2.8 KiB
Python

########### Functions to call radarr/sonarr APIs
import logging
import asyncio
import requests
from requests.exceptions import RequestException
import json
from config.config import settingsDict
# GET
async def rest_get(url, api_key=None, params=None, cookies=None):
try:
headers = {'X-Api-Key': api_key} if api_key else None
response = await asyncio.get_event_loop().run_in_executor(None, lambda: requests.get(url, params=params, headers=headers, cookies=cookies, verify=settingsDict['SSL_VERIFICATION']))
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
print("HTTP Error:", e)
except RequestException as e:
return response.text
except ValueError as e:
logging.error(f'Error parsing JSON response from {url}: {e}')
return None
# DELETE
async def rest_delete(url, api_key, params=None):
if settingsDict['TEST_RUN']: return
try:
headers = {'X-Api-Key': api_key}
response = await asyncio.get_event_loop().run_in_executor(None, lambda: requests.delete(url, params=params, headers=headers, verify=settingsDict['SSL_VERIFICATION']))
response.raise_for_status()
if response.status_code in [200, 204]:
return None
return response.json()
except RequestException as e:
logging.error(f'Error making API request to {url}: {e}')
return None
except ValueError as e:
logging.error(f'Error parsing JSON response from {url}: {e}')
return None
# POST
async def rest_post(url, data=None, json=None, headers=None, cookies=None):
if settingsDict['TEST_RUN']: return
try:
response = await asyncio.get_event_loop().run_in_executor(None, lambda: requests.post(url, data=data, json=json, headers=headers, cookies=cookies, verify=settingsDict['SSL_VERIFICATION']))
response.raise_for_status()
if response.status_code in (200,201):
return None
return response.json()
except RequestException as e:
logging.error(f'Error making API request to {url}: {e}')
return None
except ValueError as e:
logging.error(f'Error parsing JSON response from {url}: {e}')
return None
# PUT
async def rest_put(url, api_key, data):
if settingsDict['TEST_RUN']: return
try:
headers = {'X-Api-Key': api_key} | {"content-type": "application/json"}
response = await asyncio.get_event_loop().run_in_executor(None, lambda: requests.put(url, data=data, headers=headers, verify=settingsDict['SSL_VERIFICATION']))
response.raise_for_status()
return response.json()
except RequestException as e:
logging.error(f'Error making API request to {url}: {e}')
return None
except ValueError as e:
logging.error(f'Error parsing JSON response from {url}: {e}')
return None