487b28e682
- Implemented a Python script for integrating Plex Media Server with Sonarr to manage TV show seasons based on viewing habits. - Added features for automated season management, exclusion list support, and detailed logging. - Created a requirements.txt file to include necessary packages: plexapi, requests, pytest, pytest-mock, and requests-mock. - Developed unit tests for the API verification function using pytest and requests_mock. - Included a standalone script to unmonitor excluded shows in Sonarr. - Added a test script for manual API verification with mock and real API tests.
158 lines
6.7 KiB
Python
158 lines
6.7 KiB
Python
import pytest
|
|
import requests
|
|
import requests_mock
|
|
from unittest.mock import patch, MagicMock
|
|
import sys
|
|
import os
|
|
|
|
# Add the current directory to the path to import plexsonarr
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
# Import the function we want to test
|
|
# We need to mock the Plex connection since we're only testing the API verification
|
|
with patch('plexapi.server.PlexServer'), patch('plexsonarr.tv_shows'):
|
|
import plexsonarr
|
|
|
|
|
|
class TestVerifySonarrApi:
|
|
"""Test cases for verify_sonarr_api function"""
|
|
|
|
def test_verify_sonarr_api_success(self, capsys):
|
|
"""Test successful API verification"""
|
|
with requests_mock.Mocker() as m:
|
|
# Mock successful response
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, json={'status': 'ok'}, status_code=200)
|
|
|
|
# Call the function
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
# Check that the success message was printed
|
|
captured = capsys.readouterr()
|
|
assert "Sonarr API is alive and the token is correct." in captured.out
|
|
|
|
def test_verify_sonarr_api_http_error(self):
|
|
"""Test API verification with HTTP error (e.g., 401 Unauthorized)"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, status_code=401, text="Unauthorized")
|
|
|
|
# Should raise an exception
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
def test_verify_sonarr_api_connection_error(self, capsys):
|
|
"""Test API verification with connection error"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, exc=requests.exceptions.ConnectionError("Connection failed"))
|
|
|
|
# Should raise an exception and print error message
|
|
with pytest.raises(requests.exceptions.ConnectionError):
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Failed to verify Sonarr API:" in captured.out
|
|
|
|
def test_verify_sonarr_api_timeout(self):
|
|
"""Test API verification with timeout"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, exc=requests.exceptions.Timeout("Request timeout"))
|
|
|
|
with pytest.raises(requests.exceptions.Timeout):
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
def test_verify_sonarr_api_invalid_json(self):
|
|
"""Test API verification with invalid JSON response"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, text="Invalid JSON", status_code=200)
|
|
|
|
# Should still succeed as we only check status code
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
def test_verify_sonarr_api_404_not_found(self):
|
|
"""Test API verification with 404 Not Found"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, status_code=404, text="Not Found")
|
|
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
def test_verify_sonarr_api_500_server_error(self):
|
|
"""Test API verification with 500 Server Error"""
|
|
with requests_mock.Mocker() as m:
|
|
expected_url = f"{plexsonarr.SONARR_SERVER_URL}/api/v3/system/status?apikey={plexsonarr.SONARR_API_KEY}"
|
|
m.get(expected_url, status_code=500, text="Internal Server Error")
|
|
|
|
with pytest.raises(requests.exceptions.HTTPError):
|
|
plexsonarr.verify_sonarr_api()
|
|
|
|
|
|
class TestVerifySonarrApiStandalone:
|
|
"""Test the verify_sonarr_api function in isolation without importing the full module"""
|
|
|
|
@patch('requests.get')
|
|
def test_verify_sonarr_api_standalone_success(self, mock_get, capsys):
|
|
"""Test the function logic directly with mocked requests"""
|
|
# Setup
|
|
mock_response = MagicMock()
|
|
mock_response.raise_for_status.return_value = None
|
|
mock_get.return_value = mock_response
|
|
|
|
# Define the function locally for testing
|
|
def verify_sonarr_api():
|
|
SONARR_SERVER_URL = "http://test:8989"
|
|
SONARR_API_KEY = "test_key"
|
|
url = f"{SONARR_SERVER_URL}/api/v3/system/status?apikey={SONARR_API_KEY}"
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
print("Sonarr API is alive and the token is correct.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to verify Sonarr API: {e}")
|
|
raise
|
|
|
|
# Execute
|
|
verify_sonarr_api()
|
|
|
|
# Verify
|
|
mock_get.assert_called_once_with("http://test:8989/api/v3/system/status?apikey=test_key")
|
|
mock_response.raise_for_status.assert_called_once()
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Sonarr API is alive and the token is correct." in captured.out
|
|
|
|
@patch('requests.get')
|
|
def test_verify_sonarr_api_standalone_failure(self, mock_get, capsys):
|
|
"""Test the function logic with mocked failure"""
|
|
# Setup
|
|
mock_get.side_effect = requests.exceptions.ConnectionError("Connection failed")
|
|
|
|
# Define the function locally for testing
|
|
def verify_sonarr_api():
|
|
SONARR_SERVER_URL = "http://test:8989"
|
|
SONARR_API_KEY = "test_key"
|
|
url = f"{SONARR_SERVER_URL}/api/v3/system/status?apikey={SONARR_API_KEY}"
|
|
try:
|
|
response = requests.get(url)
|
|
response.raise_for_status()
|
|
print("Sonarr API is alive and the token is correct.")
|
|
except requests.exceptions.RequestException as e:
|
|
print(f"Failed to verify Sonarr API: {e}")
|
|
raise
|
|
|
|
# Execute and verify exception
|
|
with pytest.raises(requests.exceptions.ConnectionError):
|
|
verify_sonarr_api()
|
|
|
|
captured = capsys.readouterr()
|
|
assert "Failed to verify Sonarr API:" in captured.out
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Run specific tests
|
|
pytest.main([__file__, "-v"])
|