#!/usr/bin/env python3 import json import logging import os import sys import time import requests from unittest.mock import patch, MagicMock # Add the current directory to the path so we can import TasmotaManager sys.path.append(os.path.dirname(os.path.abspath(__file__))) from TasmotaManager import TasmotaDiscovery # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) def create_mock_config(): """Create a minimal configuration for testing""" return { "mqtt": { "Host": "test.mqtt.server", "Port": 1883, "User": "testuser", "Password": "testpass", "Topic": "%hostname_base%", "FullTopic": "%prefix%/%topic%/", "NoRetain": False, "console": { "SwitchRetain": "Off", "ButtonRetain": "Off", "PowerRetain": "On", "SetOption1": "0", "rule1": "on button1#state=10 do power0 toggle endon" } } } def create_mock_device(): """Create a mock device for testing""" return { "name": "TestDevice-1234", "ip": "192.168.1.100", "mac": "aa:bb:cc:dd:ee:ff", "hostname": "TestDevice-1234" } def test_retry_logic(): """Test the retry logic for console commands""" logger.info("Starting retry logic test") # Create a TasmotaDiscovery instance discovery = TasmotaDiscovery(debug=True) discovery.config = create_mock_config() # Create a mock device device = create_mock_device() # Create a mock response for successful requests mock_success = MagicMock() mock_success.status_code = 200 mock_success.json.return_value = { "StatusFWR": {"Version": "9.4.0.5"}, "StatusNET": {"Hostname": "TestDevice-1234"}, "MqttHost": {"Host": "test.mqtt.server", "Port": 1883, "User": "testuser"} } # Create a mock for requests.get that simulates timeouts for specific commands original_requests_get = requests.get def mock_requests_get(url, **kwargs): # Simulate timeouts for specific commands if "ButtonRetain" in url: # Simulate timeout for the first two attempts, then succeed if mock_requests_get.button_retain_attempts < 2: mock_requests_get.button_retain_attempts += 1 logger.info(f"Simulating timeout for ButtonRetain (attempt {mock_requests_get.button_retain_attempts})") raise requests.exceptions.Timeout("Connection timed out") logger.info("ButtonRetain request succeeding on third attempt") return mock_success elif "SetOption1" in url: # Always timeout for SetOption1 logger.info("Simulating timeout for SetOption1") raise requests.exceptions.Timeout("Connection timed out") elif "rule1" in url and "Rule1" not in url: # Simulate HTTP error for rule1 logger.info("Simulating HTTP error for rule1") mock_error = MagicMock() mock_error.status_code = 500 return mock_error else: # All other requests succeed return mock_success # Initialize the counter mock_requests_get.button_retain_attempts = 0 # Apply the mock with patch('requests.get', side_effect=mock_requests_get): # Create a minimal device_details list with just our test device all_devices = [device] # Initialize the command_failures list discovery.command_failures = [] # Process the device logger.info("Processing test device") # Simulate the relevant parts of get_device_details name = device.get('name', 'Unknown') ip = device.get('ip') # Get console parameters console_params = discovery.config['mqtt']['console'] # Process retain parameters retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] for param in retain_params: if param in console_params: try: final_value = console_params[param] opposite_value = "On" if final_value.lower() == "off" else "Off" # First command (opposite state) - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" success = False attempts = 0 max_attempts = 3 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: logger.info(f"{name}: Set {param} to {opposite_value}") success = True else: logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.Timeout as e: logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.RequestException as e: logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing if not success: logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}") discovery.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {opposite_value}", "error": last_error }) # Second command (final state) - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{final_value}" success = False attempts = 0 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: logger.info(f"{name}: Set {param} to {final_value}") success = True else: logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.Timeout as e: logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.RequestException as e: logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing if not success: logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}") discovery.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {final_value}", "error": last_error }) except Exception as e: logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}") discovery.command_failures.append({ "device": name, "ip": ip, "command": f"{param} (both steps)", "error": str(e) }) # Process regular console parameters for param, value in console_params.items(): if param in retain_params: continue # Regular console parameter - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{value}" success = False attempts = 0 max_attempts = 3 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: logger.info(f"{name}: Set console parameter {param} to {value}") success = True else: logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.Timeout as e: logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing except requests.exceptions.RequestException as e: logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(0.1) # Reduced wait time for testing if not success: logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}") discovery.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {value}", "error": last_error }) # Print summary of command failures if discovery.command_failures: failure_count = len(discovery.command_failures) print("\n" + "="*80) print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts") print("="*80) # Group failures by device for better readability failures_by_device = {} for failure in discovery.command_failures: device_name = failure['device'] if device_name not in failures_by_device: failures_by_device[device_name] = [] failures_by_device[device_name].append(failure) # Print failures grouped by device for device_name, failures in failures_by_device.items(): print(f"\nDevice: {device_name} ({failures[0]['ip']})") print("-" * 40) for i, failure in enumerate(failures, 1): print(f" {i}. Command: {failure['command']}") print(f" Error: {failure['error']}") print("\n" + "="*80) return True else: logger.info("No command failures detected") return False if __name__ == "__main__": logger.info("Starting command retry test") test_result = test_retry_logic() if test_result: logger.info("Test completed successfully - detected and reported command failures") else: logger.error("Test failed - no command failures detected")