Enhanced readability with table-style columnar output
Changes:
- Converted vertical list format to columnar table format
- First column: Parameter name
- Second column: Device 1 value
- Third column: Device 2 value
- Added header row with device names
- Auto-adjusts column widths based on content
- Truncates very long values (>60 chars) with "..."
- Maintains section separation (Firmware, Network, MQTT, etc.)
Before:
SetOption32:
KitchenMain = 40
KitchenBar = 8
After:
Parameter KitchenMain-3040 KitchenBar-7845
--------------- -------------------- --------------------
SetOption32 40 8
Much easier to scan and compare values side-by-side!
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
284 lines
9.6 KiB
Python
284 lines
9.6 KiB
Python
"""Device comparison and diagnostics module."""
|
|
|
|
import logging
|
|
from typing import Dict, List, Tuple, Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from utils import send_tasmota_command
|
|
|
|
|
|
class DeviceComparison:
|
|
"""Compare configuration between two Tasmota devices."""
|
|
|
|
def __init__(self, logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize device comparison.
|
|
|
|
Args:
|
|
logger: Optional logger instance
|
|
"""
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
def get_device_full_status(self, device_ip: str, device_name: str) -> Dict:
|
|
"""
|
|
Get complete device status and configuration.
|
|
|
|
Args:
|
|
device_ip: Device IP address
|
|
device_name: Device name for logging
|
|
|
|
Returns:
|
|
Dictionary with all device status information
|
|
"""
|
|
self.logger.info(f"Querying full status from {device_name} ({device_ip})")
|
|
|
|
device_info = {
|
|
'name': device_name,
|
|
'ip': device_ip,
|
|
'firmware': {},
|
|
'network': {},
|
|
'mqtt': {},
|
|
'setoptions': {},
|
|
'rules': {},
|
|
'gpio': {},
|
|
'other': {}
|
|
}
|
|
|
|
# Get Status 0 (all status)
|
|
result, success = send_tasmota_command(device_ip, "Status%200", timeout=10, logger=self.logger)
|
|
if success and result:
|
|
# Extract firmware info
|
|
if 'StatusFWR' in result:
|
|
device_info['firmware'] = result['StatusFWR']
|
|
|
|
# Extract network info
|
|
if 'StatusNET' in result:
|
|
device_info['network'] = result['StatusNET']
|
|
|
|
# Extract MQTT info
|
|
if 'StatusMQT' in result:
|
|
device_info['mqtt'] = result['StatusMQT']
|
|
|
|
# Extract basic status
|
|
if 'Status' in result:
|
|
device_info['other']['status'] = result['Status']
|
|
|
|
# Get all SetOptions (0-150)
|
|
self.logger.debug(f"Querying SetOptions from {device_name}")
|
|
for i in range(0, 151):
|
|
result, success = send_tasmota_command(device_ip, f"SetOption{i}", timeout=5, logger=self.logger)
|
|
if success and result:
|
|
key = f"SetOption{i}"
|
|
if key in result:
|
|
device_info['setoptions'][key] = result[key]
|
|
|
|
# Get Rules (1-3)
|
|
self.logger.debug(f"Querying Rules from {device_name}")
|
|
for i in range(1, 4):
|
|
result, success = send_tasmota_command(device_ip, f"Rule{i}%204", timeout=5, logger=self.logger)
|
|
if success and result:
|
|
key = f"Rule{i}"
|
|
if key in result:
|
|
device_info['rules'][key] = result[key]
|
|
|
|
# Get other important settings
|
|
for cmd in ['ButtonDebounce', 'SwitchDebounce', 'Template', 'Module']:
|
|
result, success = send_tasmota_command(device_ip, cmd, timeout=5, logger=self.logger)
|
|
if success and result:
|
|
device_info['other'][cmd] = result
|
|
|
|
return device_info
|
|
|
|
def compare_devices(self, device1_ip: str, device1_name: str,
|
|
device2_ip: str, device2_name: str) -> Dict:
|
|
"""
|
|
Compare two devices and return differences.
|
|
|
|
Args:
|
|
device1_ip: First device IP
|
|
device1_name: First device name
|
|
device2_ip: Second device IP
|
|
device2_name: Second device name
|
|
|
|
Returns:
|
|
Dictionary with comparison results
|
|
"""
|
|
self.logger.info(f"Comparing {device1_name} vs {device2_name}")
|
|
|
|
# Get full status from both devices in parallel
|
|
self.logger.info("Querying devices in parallel...")
|
|
|
|
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
future1 = executor.submit(self.get_device_full_status, device1_ip, device1_name)
|
|
future2 = executor.submit(self.get_device_full_status, device2_ip, device2_name)
|
|
|
|
device1 = future1.result()
|
|
device2 = future2.result()
|
|
|
|
# Compare and find differences
|
|
differences = {
|
|
'device1': {'name': device1_name, 'ip': device1_ip},
|
|
'device2': {'name': device2_name, 'ip': device2_ip},
|
|
'firmware': self._compare_section(device1['firmware'], device2['firmware']),
|
|
'network': self._compare_section(device1['network'], device2['network']),
|
|
'mqtt': self._compare_section(device1['mqtt'], device2['mqtt']),
|
|
'setoptions': self._compare_section(device1['setoptions'], device2['setoptions']),
|
|
'rules': self._compare_section(device1['rules'], device2['rules']),
|
|
'other': self._compare_section(device1['other'], device2['other'])
|
|
}
|
|
|
|
return differences
|
|
|
|
def _compare_section(self, section1: Dict, section2: Dict) -> List[Dict]:
|
|
"""
|
|
Compare two configuration sections.
|
|
|
|
Args:
|
|
section1: First device section
|
|
section2: Second device section
|
|
|
|
Returns:
|
|
List of differences
|
|
"""
|
|
differences = []
|
|
|
|
# Get all keys from both sections
|
|
all_keys = set(section1.keys()) | set(section2.keys())
|
|
|
|
for key in sorted(all_keys):
|
|
val1 = section1.get(key)
|
|
val2 = section2.get(key)
|
|
|
|
if val1 != val2:
|
|
differences.append({
|
|
'key': key,
|
|
'device1_value': val1,
|
|
'device2_value': val2
|
|
})
|
|
|
|
return differences
|
|
|
|
def print_comparison_report(self, comparison: Dict) -> None:
|
|
"""
|
|
Print human-readable comparison report.
|
|
|
|
Args:
|
|
comparison: Comparison results dictionary
|
|
"""
|
|
print("\n" + "=" * 80)
|
|
print("DEVICE COMPARISON REPORT")
|
|
print("=" * 80)
|
|
|
|
device1 = comparison['device1']
|
|
device2 = comparison['device2']
|
|
|
|
print(f"\nDevice 1: {device1['name']} ({device1['ip']})")
|
|
print(f"Device 2: {device2['name']} ({device2['ip']})")
|
|
|
|
# Print firmware info
|
|
print("\n" + "-" * 80)
|
|
print("FIRMWARE DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['firmware']:
|
|
self._print_differences(comparison['firmware'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
# Print network differences
|
|
print("\n" + "-" * 80)
|
|
print("NETWORK DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['network']:
|
|
self._print_differences(comparison['network'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
# Print MQTT differences
|
|
print("\n" + "-" * 80)
|
|
print("MQTT DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['mqtt']:
|
|
self._print_differences(comparison['mqtt'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
# Print SetOption differences
|
|
print("\n" + "-" * 80)
|
|
print("SETOPTION DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['setoptions']:
|
|
self._print_differences(comparison['setoptions'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
# Print Rule differences
|
|
print("\n" + "-" * 80)
|
|
print("RULE DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['rules']:
|
|
self._print_differences(comparison['rules'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
# Print other differences
|
|
print("\n" + "-" * 80)
|
|
print("OTHER CONFIGURATION DIFFERENCES")
|
|
print("-" * 80)
|
|
|
|
if comparison['other']:
|
|
self._print_differences(comparison['other'], device1['name'], device2['name'])
|
|
else:
|
|
print("No differences found")
|
|
|
|
print("\n" + "=" * 80)
|
|
print("END OF REPORT")
|
|
print("=" * 80 + "\n")
|
|
|
|
def _print_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
|
|
"""
|
|
Print list of differences in columnar format.
|
|
|
|
Args:
|
|
differences: List of difference dictionaries
|
|
device1_name: First device name
|
|
device2_name: Second device name
|
|
"""
|
|
if not differences:
|
|
return
|
|
|
|
# Calculate column widths
|
|
max_key_len = max(len(str(diff['key'])) for diff in differences)
|
|
max_val1_len = max(len(str(diff['device1_value'])) for diff in differences)
|
|
max_val2_len = max(len(str(diff['device2_value'])) for diff in differences)
|
|
|
|
# Ensure minimum column widths
|
|
key_width = max(max_key_len, 15)
|
|
val1_width = max(max_val1_len, len(device1_name), 20)
|
|
val2_width = max(max_val2_len, len(device2_name), 20)
|
|
|
|
# Truncate device names if too long for column headers
|
|
dev1_header = device1_name[:val1_width]
|
|
dev2_header = device2_name[:val2_width]
|
|
|
|
# Print header row
|
|
print(f"\n{'Parameter':<{key_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
|
|
print("-" * key_width + " " + "-" * val1_width + " " + "-" * val2_width)
|
|
|
|
# Print each difference
|
|
for diff in differences:
|
|
key = str(diff['key'])
|
|
val1 = str(diff['device1_value'])
|
|
val2 = str(diff['device2_value'])
|
|
|
|
# Truncate values if too long
|
|
if len(val1) > 60:
|
|
val1 = val1[:57] + "..."
|
|
if len(val2) > 60:
|
|
val2 = val2[:57] + "..."
|
|
|
|
print(f"{key:<{key_width}} {val1:<{val1_width}} {val2:<{val2_width}}")
|