"""Device comparison and diagnostics module.""" import logging from typing import Dict, List, Tuple, Optional from concurrent.futures import ThreadPoolExecutor, as_completed from utils import send_tasmota_command class DeviceComparison: """Compare configuration between two Tasmota devices.""" def __init__(self, config: Optional[Dict] = None, logger: Optional[logging.Logger] = None): """ Initialize device comparison. Args: config: Configuration dictionary (for comparing to config file) logger: Optional logger instance """ self.config = config self.logger = logger or logging.getLogger(__name__) def get_device_full_status(self, device_ip: str, device_name: str) -> Dict: """ Get complete device status and configuration. Args: device_ip: Device IP address device_name: Device name for logging Returns: Dictionary with all device status information """ self.logger.info(f"Querying full status from {device_name} ({device_ip})") device_info = { 'name': device_name, 'ip': device_ip, 'firmware': {}, 'network': {}, 'mqtt': {}, 'setoptions': {}, 'rules': {}, 'gpio': {}, 'other': {} } # Get Status 0 (all status) result, success = send_tasmota_command(device_ip, "Status%200", timeout=10, logger=self.logger) if success and result: # Extract firmware info if 'StatusFWR' in result: device_info['firmware'] = result['StatusFWR'] # Extract network info if 'StatusNET' in result: device_info['network'] = result['StatusNET'] # Extract MQTT info if 'StatusMQT' in result: device_info['mqtt'] = result['StatusMQT'] # Extract basic status if 'Status' in result: device_info['other']['status'] = result['Status'] # Get all SetOptions (0-150) self.logger.debug(f"Querying SetOptions from {device_name}") for i in range(0, 151): result, success = send_tasmota_command(device_ip, f"SetOption{i}", timeout=5, logger=self.logger) if success and result: key = f"SetOption{i}" if key in result: device_info['setoptions'][key] = result[key] # Get Rules (1-3) self.logger.debug(f"Querying Rules from {device_name}") for i in range(1, 4): result, success = send_tasmota_command(device_ip, f"Rule{i}%204", timeout=5, logger=self.logger) if success and result: key = f"Rule{i}" if key in result: device_info['rules'][key] = result[key] # Get other important settings for cmd in ['ButtonDebounce', 'SwitchDebounce', 'Template', 'Module']: result, success = send_tasmota_command(device_ip, cmd, timeout=5, logger=self.logger) if success and result: device_info['other'][cmd] = result return device_info def get_expected_config(self, device_type: str) -> Dict: """ Get expected configuration from config file for a device type. Args: device_type: Device type/DeviceName Returns: Dictionary with expected configuration """ expected = { 'name': 'Expected (from config)', 'setoptions': {}, 'rules': {}, 'console_set': None } if not self.config: return expected # Find console_set for this device type device_list = self.config.get('device_list', {}) console_set_name = None # Try exact match first if device_type in device_list: console_set_name = device_list[device_type].get('console_set') else: # Try case-insensitive match on key device_type_lower = device_type.lower() for template_name, template_data in device_list.items(): if template_name.lower() == device_type_lower: console_set_name = template_data.get('console_set') break # If still not found, try matching against template NAME field if not console_set_name: import json for template_name, template_data in device_list.items(): template_str = template_data.get('template', '') if template_str: try: template_obj = json.loads(template_str) template_device_name = template_obj.get('NAME', '') if template_device_name.lower() == device_type_lower: console_set_name = template_data.get('console_set') break except json.JSONDecodeError: continue if not console_set_name: self.logger.warning(f"No console_set found for device type: {device_type}") return expected expected['console_set'] = console_set_name # Get console commands for this set console_set = self.config.get('console_set', {}) commands = console_set.get(console_set_name, []) # Parse commands into SetOptions and Rules for command in commands: if not command or not command.strip(): continue parts = command.split(None, 1) if not parts: continue param_name = parts[0] param_value = parts[1] if len(parts) > 1 else "" if param_name.startswith('SetOption'): # Store raw value - we'll normalize during comparison expected['setoptions'][param_name] = param_value elif param_name.lower().startswith('rule'): # For rules, just store the rule text expected['rules'][param_name] = { 'State': 'ON', # Rules should be enabled 'Once': 'OFF', # Should not be Once 'Rules': param_value } return expected def compare_device_to_config(self, device_ip: str, device_name: str, device_type: str) -> Dict: """ Compare a device's actual configuration to expected config from file. Args: device_ip: Device IP address device_name: Device name device_type: Device type/DeviceName Returns: Dictionary with comparison results """ self.logger.info(f"Comparing {device_name} vs expected config for {device_type}") # Get actual device config device = self.get_device_full_status(device_ip, device_name) # Get expected config expected = self.get_expected_config(device_type) # Compare only SetOptions and Rules that are in config setoption_diffs = [] for key, expected_value in expected['setoptions'].items(): actual_value = device['setoptions'].get(key) # Normalize values for comparison (0/1 vs OFF/ON) def normalize(val): if val is None: return None val_str = str(val).upper() if val_str in ['0', 'OFF', 'FALSE']: return 'OFF' elif val_str in ['1', 'ON', 'TRUE']: return 'ON' return str(val) if normalize(actual_value) != normalize(expected_value): setoption_diffs.append({ 'key': key, 'device1_value': actual_value, 'device2_value': expected_value }) rule_diffs = [] for key, expected_rule in expected['rules'].items(): # Normalize key (rule1 vs Rule1) rule_key = key[0].upper() + key[1:] # Capitalize first letter actual_rule = device['rules'].get(rule_key, {}) # Compare rule content rules_match = True if not actual_rule: rules_match = False else: # Compare State, Once, and Rules content expected_state = expected_rule.get('State', 'ON') actual_state = actual_rule.get('State', 'OFF') if actual_state != expected_state: rules_match = False expected_once = expected_rule.get('Once', 'OFF') actual_once = actual_rule.get('Once', 'ON') if actual_once != expected_once: rules_match = False expected_rules = expected_rule.get('Rules', '') actual_rules = actual_rule.get('Rules', '') if actual_rules.strip() != expected_rules.strip(): rules_match = False if not rules_match: rule_diffs.append({ 'key': rule_key, 'device1_value': actual_rule if actual_rule else {}, 'device2_value': expected_rule }) differences = { 'device1': {'name': device_name, 'ip': device_ip}, 'device2': {'name': f"Expected ({expected['console_set']})", 'ip': 'config file'}, 'firmware': [], 'network': [], 'mqtt': [], 'setoptions': setoption_diffs, 'rules': rule_diffs, 'other': [] } return differences def compare_devices(self, device1_ip: str, device1_name: str, device2_ip: str, device2_name: str) -> Dict: """ Compare two devices and return differences. Args: device1_ip: First device IP device1_name: First device name device2_ip: Second device IP device2_name: Second device name Returns: Dictionary with comparison results """ self.logger.info(f"Comparing {device1_name} vs {device2_name}") # Get full status from both devices in parallel self.logger.info("Querying devices in parallel...") with ThreadPoolExecutor(max_workers=2) as executor: future1 = executor.submit(self.get_device_full_status, device1_ip, device1_name) future2 = executor.submit(self.get_device_full_status, device2_ip, device2_name) device1 = future1.result() device2 = future2.result() # Compare and find differences differences = { 'device1': {'name': device1_name, 'ip': device1_ip}, 'device2': {'name': device2_name, 'ip': device2_ip}, 'firmware': self._compare_section(device1['firmware'], device2['firmware']), 'network': self._compare_section(device1['network'], device2['network']), 'mqtt': self._compare_section(device1['mqtt'], device2['mqtt']), 'setoptions': self._compare_section(device1['setoptions'], device2['setoptions']), 'rules': self._compare_section(device1['rules'], device2['rules']), 'other': self._compare_section(device1['other'], device2['other']) } return differences def _compare_section(self, section1: Dict, section2: Dict) -> List[Dict]: """ Compare two configuration sections. Args: section1: First device section section2: Second device section Returns: List of differences """ differences = [] # Get all keys from both sections all_keys = set(section1.keys()) | set(section2.keys()) for key in sorted(all_keys): val1 = section1.get(key) val2 = section2.get(key) if val1 != val2: differences.append({ 'key': key, 'device1_value': val1, 'device2_value': val2 }) return differences def print_comparison_report(self, comparison: Dict) -> None: """ Print human-readable comparison report. Args: comparison: Comparison results dictionary """ print("\n" + "=" * 80) print("DEVICE COMPARISON REPORT") print("=" * 80) device1 = comparison['device1'] device2 = comparison['device2'] print(f"\nDevice 1: {device1['name']} ({device1['ip']})") print(f"Device 2: {device2['name']} ({device2['ip']})") # Print firmware info print("\n" + "-" * 80) print("FIRMWARE DIFFERENCES") if comparison['firmware']: self._print_differences(comparison['firmware'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section # Print network differences print("-" * 80) print("NETWORK DIFFERENCES") if comparison['network']: self._print_differences(comparison['network'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section # Print MQTT differences print("-" * 80) print("MQTT DIFFERENCES") if comparison['mqtt']: self._print_differences(comparison['mqtt'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section # Print SetOption differences print("-" * 80) print("SETOPTION DIFFERENCES") if comparison['setoptions']: self._print_differences(comparison['setoptions'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section # Print Rule differences (special row format) print("-" * 80) print("RULE DIFFERENCES") if comparison['rules']: self._print_rule_differences(comparison['rules'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section # Print other differences print("-" * 80) print("OTHER CONFIGURATION DIFFERENCES") if comparison['other']: self._print_differences(comparison['other'], device1['name'], device2['name']) else: print("No differences found") print() # Blank line after section print("\n" + "=" * 80) print("END OF REPORT") print("=" * 80 + "\n") def _print_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None: """ Print list of differences in columnar format. Args: differences: List of difference dictionaries device1_name: First device name device2_name: Second device name """ if not differences: return # Calculate column widths max_key_len = max(len(str(diff['key'])) for diff in differences) max_val1_len = max(len(str(diff['device1_value'])) for diff in differences) max_val2_len = max(len(str(diff['device2_value'])) for diff in differences) # Ensure minimum column widths key_width = max(max_key_len, 15) val1_width = max(max_val1_len, len(device1_name), 20) val2_width = max(max_val2_len, len(device2_name), 20) # Truncate device names if too long for column headers dev1_header = device1_name[:val1_width] dev2_header = device2_name[:val2_width] # Print header row print(f"\n{'Parameter':<{key_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}") print("-" * key_width + " " + "-" * val1_width + " " + "-" * val2_width) # Print each difference for diff in differences: key = str(diff['key']) val1 = str(diff['device1_value']) val2 = str(diff['device2_value']) # Truncate values if too long if len(val1) > 60: val1 = val1[:57] + "..." if len(val2) > 60: val2 = val2[:57] + "..." print(f"{key:<{key_width}} {val1:<{val1_width}} {val2:<{val2_width}}") def _print_rule_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None: """ Print rule differences in columnar format for easier field comparison. Args: differences: List of difference dictionaries device1_name: First device name device2_name: Second device name """ if not differences: return # Group differences by rule number rules_by_number = {} for diff in differences: rule_key = str(diff['key']) rules_by_number[rule_key] = diff # Print each rule's details in columnar format for rule_num in sorted(rules_by_number.keys()): diff = rules_by_number[rule_num] val1 = diff['device1_value'] val2 = diff['device2_value'] print(f"\n{rule_num}:") # Extract all fields from both devices if isinstance(val1, dict) and isinstance(val2, dict): all_fields = set(val1.keys()) | set(val2.keys()) # Calculate column widths max_field_len = max(len(field) for field in all_fields) if all_fields else 10 field_width = max(max_field_len, 10) dev1_header = device1_name dev2_header = device2_name # Calculate value column widths val1_width = max(len(dev1_header), 20) val2_width = max(len(dev2_header), 20) # Print header print(f"{'Field':<{field_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}") print("-" * field_width + " " + "-" * val1_width + " " + "-" * val2_width) # Print each field for field in sorted(all_fields): v1 = str(val1.get(field, 'N/A')) v2 = str(val2.get(field, 'N/A')) # Truncate if too long if len(v1) > val1_width: v1 = v1[:val1_width-3] + "..." if len(v2) > val2_width: v2 = v2[:val2_width-3] + "..." print(f"{field:<{field_width}} {v1:<{val1_width}} {v2:<{val2_width}}")