293 lines
13 KiB
Python
293 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
import time
|
|
import requests
|
|
from unittest.mock import patch, MagicMock
|
|
|
|
# Add the current directory to the path so we can import TasmotaManager
|
|
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
|
from TasmotaManager import TasmotaDiscovery
|
|
|
|
# Set up logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
def create_mock_config():
|
|
"""Create a minimal configuration for testing"""
|
|
return {
|
|
"mqtt": {
|
|
"Host": "test.mqtt.server",
|
|
"Port": 1883,
|
|
"User": "testuser",
|
|
"Password": "testpass",
|
|
"Topic": "%hostname_base%",
|
|
"FullTopic": "%prefix%/%topic%/",
|
|
"NoRetain": False,
|
|
"console": {
|
|
"SwitchRetain": "Off",
|
|
"ButtonRetain": "Off",
|
|
"PowerRetain": "On",
|
|
"SetOption1": "0",
|
|
"rule1": "on button1#state=10 do power0 toggle endon"
|
|
}
|
|
}
|
|
}
|
|
|
|
def create_mock_device():
|
|
"""Create a mock device for testing"""
|
|
return {
|
|
"name": "TestDevice-1234",
|
|
"ip": "192.168.1.100",
|
|
"mac": "aa:bb:cc:dd:ee:ff",
|
|
"hostname": "TestDevice-1234"
|
|
}
|
|
|
|
def test_retry_logic():
|
|
"""Test the retry logic for console commands"""
|
|
logger.info("Starting retry logic test")
|
|
|
|
# Create a TasmotaDiscovery instance
|
|
discovery = TasmotaDiscovery(debug=True)
|
|
discovery.config = create_mock_config()
|
|
|
|
# Create a mock device
|
|
device = create_mock_device()
|
|
|
|
# Create a mock response for successful requests
|
|
mock_success = MagicMock()
|
|
mock_success.status_code = 200
|
|
mock_success.json.return_value = {
|
|
"StatusFWR": {"Version": "9.4.0.5"},
|
|
"StatusNET": {"Hostname": "TestDevice-1234"},
|
|
"MqttHost": {"Host": "test.mqtt.server", "Port": 1883, "User": "testuser"}
|
|
}
|
|
|
|
# Create a mock for requests.get that simulates timeouts for specific commands
|
|
original_requests_get = requests.get
|
|
|
|
def mock_requests_get(url, **kwargs):
|
|
# Simulate timeouts for specific commands
|
|
if "ButtonRetain" in url:
|
|
# Simulate timeout for the first two attempts, then succeed
|
|
if mock_requests_get.button_retain_attempts < 2:
|
|
mock_requests_get.button_retain_attempts += 1
|
|
logger.info(f"Simulating timeout for ButtonRetain (attempt {mock_requests_get.button_retain_attempts})")
|
|
raise requests.exceptions.Timeout("Connection timed out")
|
|
logger.info("ButtonRetain request succeeding on third attempt")
|
|
return mock_success
|
|
elif "SetOption1" in url:
|
|
# Always timeout for SetOption1
|
|
logger.info("Simulating timeout for SetOption1")
|
|
raise requests.exceptions.Timeout("Connection timed out")
|
|
elif "rule1" in url and "Rule1" not in url:
|
|
# Simulate HTTP error for rule1
|
|
logger.info("Simulating HTTP error for rule1")
|
|
mock_error = MagicMock()
|
|
mock_error.status_code = 500
|
|
return mock_error
|
|
else:
|
|
# All other requests succeed
|
|
return mock_success
|
|
|
|
# Initialize the counter
|
|
mock_requests_get.button_retain_attempts = 0
|
|
|
|
# Apply the mock
|
|
with patch('requests.get', side_effect=mock_requests_get):
|
|
# Create a minimal device_details list with just our test device
|
|
all_devices = [device]
|
|
|
|
# Initialize the command_failures list
|
|
discovery.command_failures = []
|
|
|
|
# Process the device
|
|
logger.info("Processing test device")
|
|
|
|
# Simulate the relevant parts of get_device_details
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
|
|
# Get console parameters
|
|
console_params = discovery.config['mqtt']['console']
|
|
|
|
# Process retain parameters
|
|
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
|
|
for param in retain_params:
|
|
if param in console_params:
|
|
try:
|
|
final_value = console_params[param]
|
|
opposite_value = "On" if final_value.lower() == "off" else "Off"
|
|
|
|
# First command (opposite state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
logger.info(f"{name}: Set {param} to {opposite_value}")
|
|
success = True
|
|
else:
|
|
logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.Timeout as e:
|
|
logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
|
|
if not success:
|
|
logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
discovery.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {opposite_value}",
|
|
"error": last_error
|
|
})
|
|
|
|
# Second command (final state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
|
|
success = False
|
|
attempts = 0
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
logger.info(f"{name}: Set {param} to {final_value}")
|
|
success = True
|
|
else:
|
|
logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.Timeout as e:
|
|
logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
|
|
if not success:
|
|
logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
discovery.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {final_value}",
|
|
"error": last_error
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
|
|
discovery.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} (both steps)",
|
|
"error": str(e)
|
|
})
|
|
|
|
# Process regular console parameters
|
|
for param, value in console_params.items():
|
|
if param in retain_params:
|
|
continue
|
|
|
|
# Regular console parameter - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{value}"
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
logger.info(f"{name}: Set console parameter {param} to {value}")
|
|
success = True
|
|
else:
|
|
logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.Timeout as e:
|
|
logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(0.1) # Reduced wait time for testing
|
|
|
|
if not success:
|
|
logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
|
|
discovery.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {value}",
|
|
"error": last_error
|
|
})
|
|
|
|
# Print summary of command failures
|
|
if discovery.command_failures:
|
|
failure_count = len(discovery.command_failures)
|
|
print("\n" + "="*80)
|
|
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
|
|
print("="*80)
|
|
|
|
# Group failures by device for better readability
|
|
failures_by_device = {}
|
|
for failure in discovery.command_failures:
|
|
device_name = failure['device']
|
|
if device_name not in failures_by_device:
|
|
failures_by_device[device_name] = []
|
|
failures_by_device[device_name].append(failure)
|
|
|
|
# Print failures grouped by device
|
|
for device_name, failures in failures_by_device.items():
|
|
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
|
|
print("-" * 40)
|
|
for i, failure in enumerate(failures, 1):
|
|
print(f" {i}. Command: {failure['command']}")
|
|
print(f" Error: {failure['error']}")
|
|
|
|
print("\n" + "="*80)
|
|
|
|
return True
|
|
else:
|
|
logger.info("No command failures detected")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
logger.info("Starting command retry test")
|
|
test_result = test_retry_logic()
|
|
if test_result:
|
|
logger.info("Test completed successfully - detected and reported command failures")
|
|
else:
|
|
logger.error("Test failed - no command failures detected") |