- DeviceName comes from template's NAME field (e.g., 'Treatlife SS02') - Config keys use different names (e.g., 'TreatLife_SW_SS02S') - Now falls back to checking template NAME if key doesn't match - Fixes issue where devices with template-based names weren't getting console settings - Both console_settings and device_diff use same matching logic
513 lines
18 KiB
Python
513 lines
18 KiB
Python
"""Device comparison and diagnostics module."""
|
|
|
|
import logging
|
|
from typing import Dict, List, Tuple, Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from utils import send_tasmota_command
|
|
|
|
|
|
class DeviceComparison:
|
|
"""Compare configuration between two Tasmota devices."""
|
|
|
|
def __init__(self, config: Optional[Dict] = None, logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize device comparison.
|
|
|
|
Args:
|
|
config: Configuration dictionary (for comparing to config file)
|
|
logger: Optional logger instance
|
|
"""
|
|
self.config = config
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
def get_device_full_status(self, device_ip: str, device_name: str) -> Dict:
|
|
"""
|
|
Get complete device status and configuration.
|
|
|
|
Args:
|
|
device_ip: Device IP address
|
|
device_name: Device name for logging
|
|
|
|
Returns:
|
|
Dictionary with all device status information
|
|
"""
|
|
self.logger.info(f"Querying full status from {device_name} ({device_ip})")
|
|
|
|
device_info = {
|
|
'name': device_name,
|
|
'ip': device_ip,
|
|
'firmware': {},
|
|
'network': {},
|
|
'mqtt': {},
|
|
'setoptions': {},
|
|
'rules': {},
|
|
'gpio': {},
|
|
'other': {}
|
|
}
|
|
|
|
# Get Status 0 (all status)
|
|
result, success = send_tasmota_command(device_ip, "Status%200", timeout=10, logger=self.logger)
|
|
if success and result:
|
|
# Extract firmware info
|
|
if 'StatusFWR' in result:
|
|
device_info['firmware'] = result['StatusFWR']
|
|
|
|
# Extract network info
|
|
if 'StatusNET' in result:
|
|
device_info['network'] = result['StatusNET']
|
|
|
|
# Extract MQTT info
|
|
if 'StatusMQT' in result:
|
|
device_info['mqtt'] = result['StatusMQT']
|
|
|
|
# Extract basic status
|
|
if 'Status' in result:
|
|
device_info['other']['status'] = result['Status']
|
|
|
|
# Get all SetOptions (0-150)
|
|
self.logger.debug(f"Querying SetOptions from {device_name}")
|
|
for i in range(0, 151):
|
|
result, success = send_tasmota_command(device_ip, f"SetOption{i}", timeout=5, logger=self.logger)
|
|
if success and result:
|
|
key = f"SetOption{i}"
|
|
if key in result:
|
|
device_info['setoptions'][key] = result[key]
|
|
|
|
# Get Rules (1-3)
|
|
self.logger.debug(f"Querying Rules from {device_name}")
|
|
for i in range(1, 4):
|
|
result, success = send_tasmota_command(device_ip, f"Rule{i}%204", timeout=5, logger=self.logger)
|
|
if success and result:
|
|
key = f"Rule{i}"
|
|
if key in result:
|
|
device_info['rules'][key] = result[key]
|
|
|
|
# Get other important settings
|
|
for cmd in ['ButtonDebounce', 'SwitchDebounce', 'Template', 'Module']:
|
|
result, success = send_tasmota_command(device_ip, cmd, timeout=5, logger=self.logger)
|
|
if success and result:
|
|
device_info['other'][cmd] = result
|
|
|
|
return device_info
|
|
|
|
def get_expected_config(self, device_type: str) -> Dict:
|
|
"""
|
|
Get expected configuration from config file for a device type.
|
|
|
|
Args:
|
|
device_type: Device type/DeviceName
|
|
|
|
Returns:
|
|
Dictionary with expected configuration
|
|
"""
|
|
expected = {
|
|
'name': 'Expected (from config)',
|
|
'setoptions': {},
|
|
'rules': {},
|
|
'console_set': None
|
|
}
|
|
|
|
if not self.config:
|
|
return expected
|
|
|
|
# Find console_set for this device type
|
|
device_list = self.config.get('device_list', {})
|
|
console_set_name = None
|
|
|
|
# Try exact match first
|
|
if device_type in device_list:
|
|
console_set_name = device_list[device_type].get('console_set')
|
|
else:
|
|
# Try case-insensitive match on key
|
|
device_type_lower = device_type.lower()
|
|
for template_name, template_data in device_list.items():
|
|
if template_name.lower() == device_type_lower:
|
|
console_set_name = template_data.get('console_set')
|
|
break
|
|
|
|
# If still not found, try matching against template NAME field
|
|
if not console_set_name:
|
|
import json
|
|
for template_name, template_data in device_list.items():
|
|
template_str = template_data.get('template', '')
|
|
if template_str:
|
|
try:
|
|
template_obj = json.loads(template_str)
|
|
template_device_name = template_obj.get('NAME', '')
|
|
if template_device_name.lower() == device_type_lower:
|
|
console_set_name = template_data.get('console_set')
|
|
break
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if not console_set_name:
|
|
self.logger.warning(f"No console_set found for device type: {device_type}")
|
|
return expected
|
|
|
|
expected['console_set'] = console_set_name
|
|
|
|
# Get console commands for this set
|
|
console_set = self.config.get('console_set', {})
|
|
commands = console_set.get(console_set_name, [])
|
|
|
|
# Parse commands into SetOptions and Rules
|
|
for command in commands:
|
|
if not command or not command.strip():
|
|
continue
|
|
|
|
parts = command.split(None, 1)
|
|
if not parts:
|
|
continue
|
|
|
|
param_name = parts[0]
|
|
param_value = parts[1] if len(parts) > 1 else ""
|
|
|
|
if param_name.startswith('SetOption'):
|
|
# Store raw value - we'll normalize during comparison
|
|
expected['setoptions'][param_name] = param_value
|
|
elif param_name.lower().startswith('rule'):
|
|
# For rules, just store the rule text
|
|
expected['rules'][param_name] = {
|
|
'State': 'ON', # Rules should be enabled
|
|
'Once': 'OFF', # Should not be Once
|
|
'Rules': param_value
|
|
}
|
|
|
|
return expected
|
|
|
|
def compare_device_to_config(self, device_ip: str, device_name: str, device_type: str) -> Dict:
|
|
"""
|
|
Compare a device's actual configuration to expected config from file.
|
|
|
|
Args:
|
|
device_ip: Device IP address
|
|
device_name: Device name
|
|
device_type: Device type/DeviceName
|
|
|
|
Returns:
|
|
Dictionary with comparison results
|
|
"""
|
|
self.logger.info(f"Comparing {device_name} vs expected config for {device_type}")
|
|
|
|
# Get actual device config
|
|
device = self.get_device_full_status(device_ip, device_name)
|
|
|
|
# Get expected config
|
|
expected = self.get_expected_config(device_type)
|
|
|
|
# Compare only SetOptions and Rules that are in config
|
|
setoption_diffs = []
|
|
for key, expected_value in expected['setoptions'].items():
|
|
actual_value = device['setoptions'].get(key)
|
|
|
|
# Normalize values for comparison (0/1 vs OFF/ON)
|
|
def normalize(val):
|
|
if val is None:
|
|
return None
|
|
val_str = str(val).upper()
|
|
if val_str in ['0', 'OFF', 'FALSE']:
|
|
return 'OFF'
|
|
elif val_str in ['1', 'ON', 'TRUE']:
|
|
return 'ON'
|
|
return str(val)
|
|
|
|
if normalize(actual_value) != normalize(expected_value):
|
|
setoption_diffs.append({
|
|
'key': key,
|
|
'device1_value': actual_value,
|
|
'device2_value': expected_value
|
|
})
|
|
|
|
rule_diffs = []
|
|
for key, expected_rule in expected['rules'].items():
|
|
# Normalize key (rule1 vs Rule1)
|
|
rule_key = key[0].upper() + key[1:] # Capitalize first letter
|
|
actual_rule = device['rules'].get(rule_key, {})
|
|
|
|
# Compare rule content
|
|
rules_match = True
|
|
if not actual_rule:
|
|
rules_match = False
|
|
else:
|
|
# Compare State, Once, and Rules content
|
|
expected_state = expected_rule.get('State', 'ON')
|
|
actual_state = actual_rule.get('State', 'OFF')
|
|
if actual_state != expected_state:
|
|
rules_match = False
|
|
|
|
expected_once = expected_rule.get('Once', 'OFF')
|
|
actual_once = actual_rule.get('Once', 'ON')
|
|
if actual_once != expected_once:
|
|
rules_match = False
|
|
|
|
expected_rules = expected_rule.get('Rules', '')
|
|
actual_rules = actual_rule.get('Rules', '')
|
|
if actual_rules.strip() != expected_rules.strip():
|
|
rules_match = False
|
|
|
|
if not rules_match:
|
|
rule_diffs.append({
|
|
'key': rule_key,
|
|
'device1_value': actual_rule if actual_rule else {},
|
|
'device2_value': expected_rule
|
|
})
|
|
|
|
differences = {
|
|
'device1': {'name': device_name, 'ip': device_ip},
|
|
'device2': {'name': f"Expected ({expected['console_set']})", 'ip': 'config file'},
|
|
'firmware': [],
|
|
'network': [],
|
|
'mqtt': [],
|
|
'setoptions': setoption_diffs,
|
|
'rules': rule_diffs,
|
|
'other': []
|
|
}
|
|
|
|
return differences
|
|
|
|
def compare_devices(self, device1_ip: str, device1_name: str,
|
|
device2_ip: str, device2_name: str) -> Dict:
|
|
"""
|
|
Compare two devices and return differences.
|
|
|
|
Args:
|
|
device1_ip: First device IP
|
|
device1_name: First device name
|
|
device2_ip: Second device IP
|
|
device2_name: Second device name
|
|
|
|
Returns:
|
|
Dictionary with comparison results
|
|
"""
|
|
self.logger.info(f"Comparing {device1_name} vs {device2_name}")
|
|
|
|
# Get full status from both devices in parallel
|
|
self.logger.info("Querying devices in parallel...")
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
future1 = executor.submit(self.get_device_full_status, device1_ip, device1_name)
|
|
future2 = executor.submit(self.get_device_full_status, device2_ip, device2_name)
|
|
|
|
device1 = future1.result()
|
|
device2 = future2.result()
|
|
|
|
# Compare and find differences
|
|
differences = {
|
|
'device1': {'name': device1_name, 'ip': device1_ip},
|
|
'device2': {'name': device2_name, 'ip': device2_ip},
|
|
'firmware': self._compare_section(device1['firmware'], device2['firmware']),
|
|
'network': self._compare_section(device1['network'], device2['network']),
|
|
'mqtt': self._compare_section(device1['mqtt'], device2['mqtt']),
|
|
'setoptions': self._compare_section(device1['setoptions'], device2['setoptions']),
|
|
'rules': self._compare_section(device1['rules'], device2['rules']),
|
|
'other': self._compare_section(device1['other'], device2['other'])
|
|
}
|
|
|
|
return differences
|
|
|
|
def _compare_section(self, section1: Dict, section2: Dict) -> List[Dict]:
|
|
"""
|
|
Compare two configuration sections.
|
|
|
|
Args:
|
|
section1: First device section
|
|
section2: Second device section
|
|
|
|
Returns:
|
|
List of differences
|
|
"""
|
|
differences = []
|
|
|
|
# Get all keys from both sections
|
|
all_keys = set(section1.keys()) | set(section2.keys())
|
|
|
|
for key in sorted(all_keys):
|
|
val1 = section1.get(key)
|
|
val2 = section2.get(key)
|
|
|
|
if val1 != val2:
|
|
differences.append({
|
|
'key': key,
|
|
'device1_value': val1,
|
|
'device2_value': val2
|
|
})
|
|
|
|
return differences
|
|
|
|
def print_comparison_report(self, comparison: Dict) -> None:
|
|
"""
|
|
Print human-readable comparison report.
|
|
|
|
Args:
|
|
comparison: Comparison results dictionary
|
|
"""
|
|
print("\n" + "=" * 80)
|
|
print("DEVICE COMPARISON REPORT")
|
|
print("=" * 80)
|
|
|
|
device1 = comparison['device1']
|
|
device2 = comparison['device2']
|
|
|
|
print(f"\nDevice 1: {device1['name']} ({device1['ip']})")
|
|
print(f"Device 2: {device2['name']} ({device2['ip']})")
|
|
|
|
# Print firmware info
|
|
print("\n" + "-" * 80)
|
|
print("FIRMWARE DIFFERENCES")
|
|
if comparison['firmware']:
|
|
self._print_differences(comparison['firmware'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
# Print network differences
|
|
print("-" * 80)
|
|
print("NETWORK DIFFERENCES")
|
|
if comparison['network']:
|
|
self._print_differences(comparison['network'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
# Print MQTT differences
|
|
print("-" * 80)
|
|
print("MQTT DIFFERENCES")
|
|
if comparison['mqtt']:
|
|
self._print_differences(comparison['mqtt'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
# Print SetOption differences
|
|
print("-" * 80)
|
|
print("SETOPTION DIFFERENCES")
|
|
if comparison['setoptions']:
|
|
self._print_differences(comparison['setoptions'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
# Print Rule differences (special row format)
|
|
print("-" * 80)
|
|
print("RULE DIFFERENCES")
|
|
if comparison['rules']:
|
|
self._print_rule_differences(comparison['rules'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
# Print other differences
|
|
print("-" * 80)
|
|
print("OTHER CONFIGURATION DIFFERENCES")
|
|
if comparison['other']:
|
|
self._print_differences(comparison['other'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
print() # Blank line after section
|
|
|
|
print("\n" + "=" * 80)
|
|
print("END OF REPORT")
|
|
print("=" * 80 + "\n")
|
|
|
|
def _print_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
|
|
"""
|
|
Print list of differences in columnar format.
|
|
|
|
Args:
|
|
differences: List of difference dictionaries
|
|
device1_name: First device name
|
|
device2_name: Second device name
|
|
"""
|
|
if not differences:
|
|
return
|
|
|
|
# Calculate column widths
|
|
max_key_len = max(len(str(diff['key'])) for diff in differences)
|
|
max_val1_len = max(len(str(diff['device1_value'])) for diff in differences)
|
|
max_val2_len = max(len(str(diff['device2_value'])) for diff in differences)
|
|
|
|
# Ensure minimum column widths
|
|
key_width = max(max_key_len, 15)
|
|
val1_width = max(max_val1_len, len(device1_name), 20)
|
|
val2_width = max(max_val2_len, len(device2_name), 20)
|
|
|
|
# Truncate device names if too long for column headers
|
|
dev1_header = device1_name[:val1_width]
|
|
dev2_header = device2_name[:val2_width]
|
|
|
|
# Print header row
|
|
print(f"\n{'Parameter':<{key_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
|
|
print("-" * key_width + " " + "-" * val1_width + " " + "-" * val2_width)
|
|
|
|
# Print each difference
|
|
for diff in differences:
|
|
key = str(diff['key'])
|
|
val1 = str(diff['device1_value'])
|
|
val2 = str(diff['device2_value'])
|
|
|
|
# Truncate values if too long
|
|
if len(val1) > 60:
|
|
val1 = val1[:57] + "..."
|
|
if len(val2) > 60:
|
|
val2 = val2[:57] + "..."
|
|
|
|
print(f"{key:<{key_width}} {val1:<{val1_width}} {val2:<{val2_width}}")
|
|
|
|
def _print_rule_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
|
|
"""
|
|
Print rule differences in columnar format for easier field comparison.
|
|
|
|
Args:
|
|
differences: List of difference dictionaries
|
|
device1_name: First device name
|
|
device2_name: Second device name
|
|
"""
|
|
if not differences:
|
|
return
|
|
|
|
# Group differences by rule number
|
|
rules_by_number = {}
|
|
for diff in differences:
|
|
rule_key = str(diff['key'])
|
|
rules_by_number[rule_key] = diff
|
|
|
|
# Print each rule's details in columnar format
|
|
for rule_num in sorted(rules_by_number.keys()):
|
|
diff = rules_by_number[rule_num]
|
|
val1 = diff['device1_value']
|
|
val2 = diff['device2_value']
|
|
|
|
print(f"\n{rule_num}:")
|
|
|
|
# Extract all fields from both devices
|
|
if isinstance(val1, dict) and isinstance(val2, dict):
|
|
all_fields = set(val1.keys()) | set(val2.keys())
|
|
|
|
# Calculate column widths
|
|
max_field_len = max(len(field) for field in all_fields) if all_fields else 10
|
|
field_width = max(max_field_len, 10)
|
|
|
|
dev1_header = device1_name
|
|
dev2_header = device2_name
|
|
|
|
# Calculate value column widths
|
|
val1_width = max(len(dev1_header), 20)
|
|
val2_width = max(len(dev2_header), 20)
|
|
|
|
# Print header
|
|
print(f"{'Field':<{field_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
|
|
print("-" * field_width + " " + "-" * val1_width + " " + "-" * val2_width)
|
|
|
|
# Print each field
|
|
for field in sorted(all_fields):
|
|
v1 = str(val1.get(field, 'N/A'))
|
|
v2 = str(val2.get(field, 'N/A'))
|
|
|
|
# Truncate if too long
|
|
if len(v1) > val1_width:
|
|
v1 = v1[:val1_width-3] + "..."
|
|
if len(v2) > val2_width:
|
|
v2 = v2[:val2_width-3] + "..."
|
|
|
|
print(f"{field:<{field_width}} {v1:<{val1_width}} {v2:<{val2_width}}")
|