TasmotaManager/tests/test_command_retry.py
2025-10-28 00:21:08 +00:00

293 lines
13 KiB
Python

#!/usr/bin/env python3
import json
import logging
import os
import sys
import time
import requests
from unittest.mock import patch, MagicMock
# Add the current directory to the path so we can import TasmotaManager
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from TasmotaManager import TasmotaDiscovery
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def create_mock_config():
"""Create a minimal configuration for testing"""
return {
"mqtt": {
"Host": "test.mqtt.server",
"Port": 1883,
"User": "testuser",
"Password": "testpass",
"Topic": "%hostname_base%",
"FullTopic": "%prefix%/%topic%/",
"NoRetain": False,
"console": {
"SwitchRetain": "Off",
"ButtonRetain": "Off",
"PowerRetain": "On",
"SetOption1": "0",
"rule1": "on button1#state=10 do power0 toggle endon"
}
}
}
def create_mock_device():
"""Create a mock device for testing"""
return {
"name": "TestDevice-1234",
"ip": "192.168.1.100",
"mac": "aa:bb:cc:dd:ee:ff",
"hostname": "TestDevice-1234"
}
def test_retry_logic():
"""Test the retry logic for console commands"""
logger.info("Starting retry logic test")
# Create a TasmotaDiscovery instance
discovery = TasmotaDiscovery(debug=True)
discovery.config = create_mock_config()
# Create a mock device
device = create_mock_device()
# Create a mock response for successful requests
mock_success = MagicMock()
mock_success.status_code = 200
mock_success.json.return_value = {
"StatusFWR": {"Version": "9.4.0.5"},
"StatusNET": {"Hostname": "TestDevice-1234"},
"MqttHost": {"Host": "test.mqtt.server", "Port": 1883, "User": "testuser"}
}
# Create a mock for requests.get that simulates timeouts for specific commands
original_requests_get = requests.get
def mock_requests_get(url, **kwargs):
# Simulate timeouts for specific commands
if "ButtonRetain" in url:
# Simulate timeout for the first two attempts, then succeed
if mock_requests_get.button_retain_attempts < 2:
mock_requests_get.button_retain_attempts += 1
logger.info(f"Simulating timeout for ButtonRetain (attempt {mock_requests_get.button_retain_attempts})")
raise requests.exceptions.Timeout("Connection timed out")
logger.info("ButtonRetain request succeeding on third attempt")
return mock_success
elif "SetOption1" in url:
# Always timeout for SetOption1
logger.info("Simulating timeout for SetOption1")
raise requests.exceptions.Timeout("Connection timed out")
elif "rule1" in url and "Rule1" not in url:
# Simulate HTTP error for rule1
logger.info("Simulating HTTP error for rule1")
mock_error = MagicMock()
mock_error.status_code = 500
return mock_error
else:
# All other requests succeed
return mock_success
# Initialize the counter
mock_requests_get.button_retain_attempts = 0
# Apply the mock
with patch('requests.get', side_effect=mock_requests_get):
# Create a minimal device_details list with just our test device
all_devices = [device]
# Initialize the command_failures list
discovery.command_failures = []
# Process the device
logger.info("Processing test device")
# Simulate the relevant parts of get_device_details
name = device.get('name', 'Unknown')
ip = device.get('ip')
# Get console parameters
console_params = discovery.config['mqtt']['console']
# Process retain parameters
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
for param in retain_params:
if param in console_params:
try:
final_value = console_params[param]
opposite_value = "On" if final_value.lower() == "off" else "Off"
# First command (opposite state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
logger.info(f"{name}: Set {param} to {opposite_value}")
success = True
else:
logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.Timeout as e:
logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.RequestException as e:
logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
if not success:
logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
discovery.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {opposite_value}",
"error": last_error
})
# Second command (final state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
success = False
attempts = 0
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
logger.info(f"{name}: Set {param} to {final_value}")
success = True
else:
logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.Timeout as e:
logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.RequestException as e:
logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
if not success:
logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
discovery.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {final_value}",
"error": last_error
})
except Exception as e:
logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
discovery.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} (both steps)",
"error": str(e)
})
# Process regular console parameters
for param, value in console_params.items():
if param in retain_params:
continue
# Regular console parameter - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{value}"
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
logger.info(f"{name}: Set console parameter {param} to {value}")
success = True
else:
logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.Timeout as e:
logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
except requests.exceptions.RequestException as e:
logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(0.1) # Reduced wait time for testing
if not success:
logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
discovery.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {value}",
"error": last_error
})
# Print summary of command failures
if discovery.command_failures:
failure_count = len(discovery.command_failures)
print("\n" + "="*80)
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
print("="*80)
# Group failures by device for better readability
failures_by_device = {}
for failure in discovery.command_failures:
device_name = failure['device']
if device_name not in failures_by_device:
failures_by_device[device_name] = []
failures_by_device[device_name].append(failure)
# Print failures grouped by device
for device_name, failures in failures_by_device.items():
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
print("-" * 40)
for i, failure in enumerate(failures, 1):
print(f" {i}. Command: {failure['command']}")
print(f" Error: {failure['error']}")
print("\n" + "="*80)
return True
else:
logger.info("No command failures detected")
return False
if __name__ == "__main__":
logger.info("Starting command retry test")
test_result = test_retry_logic()
if test_result:
logger.info("Test completed successfully - detected and reported command failures")
else:
logger.error("Test failed - no command failures detected")