Compare commits

..

No commits in common. "master" and "parallel-device-diff" have entirely different histories.

5 changed files with 93 additions and 431 deletions

View File

@ -308,14 +308,14 @@ def main():
help='Generate UniFi hostname comparison report')
parser.add_argument('--Device', type=str,
help='Process single device by IP or hostname')
parser.add_argument('--diff', nargs='+', metavar='DEVICE',
help='Compare devices: --diff DEVICE (vs config) or --diff DEVICE1 DEVICE2 (vs each other)')
parser.add_argument('--diff', nargs=2, metavar=('DEVICE1', 'DEVICE2'),
help='Compare two devices and show configuration differences')
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.debug)
logger.info("TasmotaManager v2.1 starting")
logger.info("TasmotaManager v2.0 starting")
# Ensure data directory exists
ensure_data_directory()
@ -339,11 +339,7 @@ def main():
# Handle device comparison mode
if args.diff:
if len(args.diff) < 1 or len(args.diff) > 2:
logger.error("--diff requires 1 or 2 device arguments")
return 1
device_comp = DeviceComparison(config, logger)
device_comp = DeviceComparison(logger)
# Get devices list to resolve names/IPs
devices = discovery.get_tasmota_devices()
@ -351,37 +347,20 @@ def main():
# Find device 1
device1 = find_device_by_identifier(devices, args.diff[0], logger)
if not device1:
logger.error(f"Device not found: {args.diff[0]}")
logger.error(f"Device 1 not found: {args.diff[0]}")
return 1
if len(args.diff) == 1:
# Single device: compare to config file
# Get device type from Status
from utils import send_tasmota_command
result, success = send_tasmota_command(device1['ip'], "Status%200", timeout=10, logger=logger)
if not success or not result:
logger.error(f"Failed to get device status for {device1['name']}")
return 1
# Find device 2
device2 = find_device_by_identifier(devices, args.diff[1], logger)
if not device2:
logger.error(f"Device 2 not found: {args.diff[1]}")
return 1
device_type = result.get('Status', {}).get('DeviceName', '')
if not device_type:
logger.error(f"Could not determine device type for {device1['name']}")
return 1
comparison = device_comp.compare_device_to_config(
device1['ip'], device1['name'], device_type
)
else:
# Two devices: compare to each other
device2 = find_device_by_identifier(devices, args.diff[1], logger)
if not device2:
logger.error(f"Device 2 not found: {args.diff[1]}")
return 1
comparison = device_comp.compare_devices(
device1['ip'], device1['name'],
device2['ip'], device2['name']
)
# Compare devices
comparison = device_comp.compare_devices(
device1['ip'], device1['name'],
device2['ip'], device2['name']
)
# Print report
device_comp.print_comparison_report(comparison)

View File

@ -226,9 +226,6 @@ class ConfigurationManager:
self.logger.debug(f"{device_name}: Raw FullTopic from device: '{current_full_topic}'")
# Check if device has URL-encoded spaces at the beginning (broken state)
has_leading_encoded_space = current_full_topic.startswith('%20')
# Normalize: remove any URL-encoded spaces from the beginning of current value
# This handles the case where the device returns '%20%prefix%' instead of '%prefix%'
while current_full_topic.startswith('%20'):
@ -239,10 +236,7 @@ class ConfigurationManager:
self.logger.debug(f"{device_name}: Comparing FullTopic: current='{current_full_topic}' vs expected='{mqtt_full_topic_normalized}'")
# Update if values don't match OR if device had leading encoded space (needs fixing)
if current_full_topic != mqtt_full_topic_normalized or has_leading_encoded_space:
if has_leading_encoded_space:
self.logger.info(f"{device_name}: FullTopic has invalid leading space, will fix")
if current_full_topic != mqtt_full_topic_normalized:
updates_needed.append(('FullTopic', mqtt_full_topic_normalized))
# Handle NoRetain (SetOption62)
@ -261,13 +255,7 @@ class ConfigurationManager:
failed_updates = []
for setting_name, setting_value in updates_needed:
# URL encode the value, especially important for FullTopic which contains % and /
from urllib.parse import quote
# Convert value to string and URL encode it
encoded_value = quote(str(setting_value), safe='')
command = f"{setting_name}%20{encoded_value}"
self.logger.debug(f"{device_name}: Sending command: {command}")
command = f"{setting_name}%20{setting_value}"
result, success = retry_command(
lambda: send_tasmota_command(device_ip, command, timeout=5, logger=self.logger),

View File

@ -41,11 +41,12 @@ class ConsoleSettingsManager:
if not device_ip:
return False, "No IP address"
# Get device type from DeviceName for template matching
device_type = device_details.get('Status', {}).get('DeviceName', '')
# Get hostname base for template matching
hostname = device_details.get('StatusNET', {}).get('Hostname', device_name)
hostname_base = get_hostname_base(hostname)
# Find which console_set to use for this device
console_set_name = self._get_console_set_name(device_type)
console_set_name = self._get_console_set_name(hostname_base)
if not console_set_name:
self.logger.debug(f"{device_name}: No console settings configured")
@ -85,42 +86,22 @@ class ConsoleSettingsManager:
self.logger.info(f"{device_name}: All console settings applied successfully")
return True, "Applied"
def _get_console_set_name(self, device_type: str) -> Optional[str]:
def _get_console_set_name(self, hostname_base: str) -> Optional[str]:
"""
Get the console_set name for a device based on DeviceName.
Get the console_set name for a device based on hostname.
Args:
device_type: Device type/DeviceName from Status
hostname_base: Base hostname of device
Returns:
str: Console set name or None
"""
device_list = self.config.get('device_list', {})
# First try exact match on key
if device_type in device_list:
return device_list[device_type].get('console_set')
# Then try case-insensitive match on key
device_type_lower = device_type.lower()
for template_name, template_data in device_list.items():
if template_name.lower() == device_type_lower:
if hostname_base.lower() in template_name.lower():
return template_data.get('console_set')
# Finally, try matching against the template NAME field
# DeviceName comes from the template's NAME field
import json
for template_name, template_data in device_list.items():
template_str = template_data.get('template', '')
if template_str:
try:
template_obj = json.loads(template_str)
template_device_name = template_obj.get('NAME', '')
if template_device_name.lower() == device_type_lower:
return template_data.get('console_set')
except json.JSONDecodeError:
continue
return None
def _get_console_commands(self, console_set_name: str) -> List[str]:
@ -179,14 +160,8 @@ class ConsoleSettingsManager:
time.sleep(0.5) # Brief delay between commands
# Send the actual command - properly URL encode
from urllib.parse import quote
# Split command into param and value, then URL encode the value part
parts = command.split(None, 1)
if len(parts) == 2:
escaped_command = parts[0] + '%20' + quote(parts[1], safe='')
else:
escaped_command = command.replace(' ', '%20')
# Send the actual command
escaped_command = command.replace(' ', '%20')
result, success = retry_command(
lambda: send_tasmota_command(device_ip, escaped_command, timeout=5, logger=self.logger),
@ -200,33 +175,28 @@ class ConsoleSettingsManager:
self.logger.error(f"{device_name}: Failed to set {param_name} after 3 attempts")
return False
# Verification disabled - it overwhelms devices with extra queries
# Trust that the command was applied if no error was returned
# Verify the command was applied (if possible)
if not self._verify_command(device_ip, device_name, param_name, param_value):
self.logger.warning(f"{device_name}: Verification failed for {param_name}")
# Don't return False here - some commands can't be verified
# Check if this is a rule definition - if so, enable it
if param_name.lower().startswith('rule'):
rule_number = param_name.lower().replace('rule', '')
if rule_number.isdigit():
# Wait for device to process the rule before enabling
time.sleep(0.5)
# Enable the rule with ON command
# Note: Rule{N} 4 doesn't work reliably, use ON instead
enable_command = f"Rule{rule_number}%20ON"
# Use Rule{N} 4 to enable without setting Once flag
# Rule 1 = Enable + Once ON (WRONG - causes single-fire issue)
# Rule 4 = Enable only (CORRECT - allows repeated firing)
enable_command = f"Rule{rule_number}%204"
self.logger.debug(f"{device_name}: Enabling rule{rule_number} (Once=OFF)")
result, success = retry_command(
lambda: send_tasmota_command(device_ip, enable_command, timeout=5, logger=self.logger),
max_attempts=3,
delay=1.0,
logger=self.logger,
device_name=device_name
result, success = send_tasmota_command(
device_ip, enable_command, timeout=5, logger=self.logger
)
if not success:
self.logger.error(f"{device_name}: Failed to enable rule{rule_number} after 3 attempts")
return False
self.logger.warning(f"{device_name}: Failed to enable rule{rule_number}")
time.sleep(0.3) # Brief delay between commands
return True
@ -260,19 +230,9 @@ class ConsoleSettingsManager:
return True # Can't verify, assume success
# Check if value matches
current_value = str(result.get(param_name, ''))
expected = str(expected_value)
current_value = result.get(param_name, '')
# Normalize values for comparison (Tasmota returns ON/OFF for 1/0)
def normalize_value(val: str) -> str:
val_lower = val.lower()
if val_lower in ['on', '1', 'true']:
return '1'
elif val_lower in ['off', '0', 'false']:
return '0'
return val_lower
if normalize_value(current_value) == normalize_value(expected):
if str(current_value) == str(expected_value):
return True
return False

View File

@ -9,15 +9,13 @@ from utils import send_tasmota_command
class DeviceComparison:
"""Compare configuration between two Tasmota devices."""
def __init__(self, config: Optional[Dict] = None, logger: Optional[logging.Logger] = None):
def __init__(self, logger: Optional[logging.Logger] = None):
"""
Initialize device comparison.
Args:
config: Configuration dictionary (for comparing to config file)
logger: Optional logger instance
"""
self.config = config
self.logger = logger or logging.getLogger(__name__)
def get_device_full_status(self, device_ip: str, device_name: str) -> Dict:
@ -90,181 +88,6 @@ class DeviceComparison:
return device_info
def get_expected_config(self, device_type: str) -> Dict:
"""
Get expected configuration from config file for a device type.
Args:
device_type: Device type/DeviceName
Returns:
Dictionary with expected configuration
"""
expected = {
'name': 'Expected (from config)',
'setoptions': {},
'rules': {},
'console_set': None
}
if not self.config:
return expected
# Find console_set for this device type
device_list = self.config.get('device_list', {})
console_set_name = None
# Try exact match first
if device_type in device_list:
console_set_name = device_list[device_type].get('console_set')
else:
# Try case-insensitive match on key
device_type_lower = device_type.lower()
for template_name, template_data in device_list.items():
if template_name.lower() == device_type_lower:
console_set_name = template_data.get('console_set')
break
# If still not found, try matching against template NAME field
if not console_set_name:
import json
for template_name, template_data in device_list.items():
template_str = template_data.get('template', '')
if template_str:
try:
template_obj = json.loads(template_str)
template_device_name = template_obj.get('NAME', '')
if template_device_name.lower() == device_type_lower:
console_set_name = template_data.get('console_set')
break
except json.JSONDecodeError:
continue
if not console_set_name:
self.logger.warning(f"No console_set found for device type: {device_type}")
return expected
expected['console_set'] = console_set_name
# Get console commands for this set
console_set = self.config.get('console_set', {})
commands = console_set.get(console_set_name, [])
# Parse commands into SetOptions and Rules
for command in commands:
if not command or not command.strip():
continue
parts = command.split(None, 1)
if not parts:
continue
param_name = parts[0]
param_value = parts[1] if len(parts) > 1 else ""
if param_name.startswith('SetOption'):
# Store raw value - we'll normalize during comparison
expected['setoptions'][param_name] = param_value
elif param_name.lower().startswith('rule'):
# For rules, just store the rule text
expected['rules'][param_name] = {
'State': 'ON', # Rules should be enabled
'Once': 'OFF', # Should not be Once
'Rules': param_value
}
return expected
def compare_device_to_config(self, device_ip: str, device_name: str, device_type: str) -> Dict:
"""
Compare a device's actual configuration to expected config from file.
Args:
device_ip: Device IP address
device_name: Device name
device_type: Device type/DeviceName
Returns:
Dictionary with comparison results
"""
self.logger.info(f"Comparing {device_name} vs expected config for {device_type}")
# Get actual device config
device = self.get_device_full_status(device_ip, device_name)
# Get expected config
expected = self.get_expected_config(device_type)
# Compare only SetOptions and Rules that are in config
setoption_diffs = []
for key, expected_value in expected['setoptions'].items():
actual_value = device['setoptions'].get(key)
# Normalize values for comparison (0/1 vs OFF/ON)
def normalize(val):
if val is None:
return None
val_str = str(val).upper()
if val_str in ['0', 'OFF', 'FALSE']:
return 'OFF'
elif val_str in ['1', 'ON', 'TRUE']:
return 'ON'
return str(val)
if normalize(actual_value) != normalize(expected_value):
setoption_diffs.append({
'key': key,
'device1_value': actual_value,
'device2_value': expected_value
})
rule_diffs = []
for key, expected_rule in expected['rules'].items():
# Normalize key (rule1 vs Rule1)
rule_key = key[0].upper() + key[1:] # Capitalize first letter
actual_rule = device['rules'].get(rule_key, {})
# Compare rule content
rules_match = True
if not actual_rule:
rules_match = False
else:
# Compare State, Once, and Rules content
expected_state = expected_rule.get('State', 'ON')
actual_state = actual_rule.get('State', 'OFF')
if actual_state != expected_state:
rules_match = False
expected_once = expected_rule.get('Once', 'OFF')
actual_once = actual_rule.get('Once', 'ON')
if actual_once != expected_once:
rules_match = False
expected_rules = expected_rule.get('Rules', '')
actual_rules = actual_rule.get('Rules', '')
if actual_rules.strip() != expected_rules.strip():
rules_match = False
if not rules_match:
rule_diffs.append({
'key': rule_key,
'device1_value': actual_rule if actual_rule else {},
'device2_value': expected_rule
})
differences = {
'device1': {'name': device_name, 'ip': device_ip},
'device2': {'name': f"Expected ({expected['console_set']})", 'ip': 'config file'},
'firmware': [],
'network': [],
'mqtt': [],
'setoptions': setoption_diffs,
'rules': rule_diffs,
'other': []
}
return differences
def compare_devices(self, device1_ip: str, device1_name: str,
device2_ip: str, device2_name: str) -> Dict:
"""
@ -354,56 +177,62 @@ class DeviceComparison:
# Print firmware info
print("\n" + "-" * 80)
print("FIRMWARE DIFFERENCES")
print("-" * 80)
if comparison['firmware']:
self._print_differences(comparison['firmware'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
# Print network differences
print("-" * 80)
print("\n" + "-" * 80)
print("NETWORK DIFFERENCES")
print("-" * 80)
if comparison['network']:
self._print_differences(comparison['network'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
# Print MQTT differences
print("-" * 80)
print("\n" + "-" * 80)
print("MQTT DIFFERENCES")
print("-" * 80)
if comparison['mqtt']:
self._print_differences(comparison['mqtt'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
# Print SetOption differences
print("-" * 80)
print("\n" + "-" * 80)
print("SETOPTION DIFFERENCES")
print("-" * 80)
if comparison['setoptions']:
self._print_differences(comparison['setoptions'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
# Print Rule differences (special row format)
print("-" * 80)
# Print Rule differences
print("\n" + "-" * 80)
print("RULE DIFFERENCES")
print("-" * 80)
if comparison['rules']:
self._print_rule_differences(comparison['rules'], device1['name'], device2['name'])
self._print_differences(comparison['rules'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
# Print other differences
print("-" * 80)
print("\n" + "-" * 80)
print("OTHER CONFIGURATION DIFFERENCES")
print("-" * 80)
if comparison['other']:
self._print_differences(comparison['other'], device1['name'], device2['name'])
else:
print("No differences found")
print() # Blank line after section
print("\n" + "=" * 80)
print("END OF REPORT")
@ -411,102 +240,18 @@ class DeviceComparison:
def _print_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
"""
Print list of differences in columnar format.
Print list of differences in readable format.
Args:
differences: List of difference dictionaries
device1_name: First device name
device2_name: Second device name
"""
if not differences:
return
# Calculate column widths
max_key_len = max(len(str(diff['key'])) for diff in differences)
max_val1_len = max(len(str(diff['device1_value'])) for diff in differences)
max_val2_len = max(len(str(diff['device2_value'])) for diff in differences)
# Ensure minimum column widths
key_width = max(max_key_len, 15)
val1_width = max(max_val1_len, len(device1_name), 20)
val2_width = max(max_val2_len, len(device2_name), 20)
# Truncate device names if too long for column headers
dev1_header = device1_name[:val1_width]
dev2_header = device2_name[:val2_width]
# Print header row
print(f"\n{'Parameter':<{key_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
print("-" * key_width + " " + "-" * val1_width + " " + "-" * val2_width)
# Print each difference
for diff in differences:
key = str(diff['key'])
val1 = str(diff['device1_value'])
val2 = str(diff['device2_value'])
# Truncate values if too long
if len(val1) > 60:
val1 = val1[:57] + "..."
if len(val2) > 60:
val2 = val2[:57] + "..."
print(f"{key:<{key_width}} {val1:<{val1_width}} {val2:<{val2_width}}")
def _print_rule_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
"""
Print rule differences in columnar format for easier field comparison.
Args:
differences: List of difference dictionaries
device1_name: First device name
device2_name: Second device name
"""
if not differences:
return
# Group differences by rule number
rules_by_number = {}
for diff in differences:
rule_key = str(diff['key'])
rules_by_number[rule_key] = diff
# Print each rule's details in columnar format
for rule_num in sorted(rules_by_number.keys()):
diff = rules_by_number[rule_num]
key = diff['key']
val1 = diff['device1_value']
val2 = diff['device2_value']
print(f"\n{rule_num}:")
# Extract all fields from both devices
if isinstance(val1, dict) and isinstance(val2, dict):
all_fields = set(val1.keys()) | set(val2.keys())
# Calculate column widths
max_field_len = max(len(field) for field in all_fields) if all_fields else 10
field_width = max(max_field_len, 10)
dev1_header = device1_name
dev2_header = device2_name
# Calculate value column widths
val1_width = max(len(dev1_header), 20)
val2_width = max(len(dev2_header), 20)
# Print header
print(f"{'Field':<{field_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
print("-" * field_width + " " + "-" * val1_width + " " + "-" * val2_width)
# Print each field
for field in sorted(all_fields):
v1 = str(val1.get(field, 'N/A'))
v2 = str(val2.get(field, 'N/A'))
# Truncate if too long
if len(v1) > val1_width:
v1 = v1[:val1_width-3] + "..."
if len(v2) > val2_width:
v2 = v2[:val2_width-3] + "..."
print(f"{field:<{field_width}} {v1:<{val1_width}} {v2:<{val2_width}}")
print(f"\n{key}:")
print(f" {device1_name:20} = {val1}")
print(f" {device2_name:20} = {val2}")

View File

@ -54,19 +54,19 @@
},
"Gosund_WP5_Plug": {
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,17,0,0,0,56,57,21,0,0],\"FLAG\":0,\"BASE\":18}",
"console_set": "Plug"
"console_set": "Traditional"
},
"Gosund_Plug": {
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,32,0,0,0,320,321,224,0,0,0],\"FLAG\":0,\"BASE\":18}",
"console_set": "Plug"
"console_set": "Traditional"
},
"CloudFree_X10S_Plug": {
"template": "{\"NAME\":\"Aoycocr X10S\",\"GPIO\":[56,0,57,0,21,134,0,0,131,17,132,0,0],\"FLAG\":0,\"BASE\":45}",
"console_set": "Plug"
"console_set": "Traditional"
},
"Sonoff_S31_PM_Plug": {
"template": "{\"NAME\":\"Sonoff S31\",\"GPIO\":[17,145,0,146,0,0,0,0,21,56,0,0,0],\"FLAG\":0,\"BASE\":41}",
"console_set": "Plug"
"console_set": "Traditional"
},
"Sonoff TX Ultimate 1": {
"template": "{\"NAME\":\"Sonoff T5-1C-120\",\"GPIO\":[0,0,7808,0,7840,3872,0,0,0,1376,0,7776,0,0,224,3232,0,480,3200,0,0,0,3840,0,0,0,0,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":1}",
@ -85,7 +85,7 @@
"SetOption13 0",
"SetOption19 0",
"SetOption32 8",
"SetOption40 0",
"SetOption40 40",
"SetOption53 1",
"SetOption73 1",
"rule1 on button1#state=10 do power0 toggle endon"
@ -102,19 +102,9 @@
"SetOption13 0",
"SetOption19 0",
"SetOption32 8",
"SetOption40 0",
"SetOption40 40",
"SetOption53 1",
"SetOption73 1"
],
"Plug": [
"SwitchRetain Off",
"ButtonRetain Off",
"PowerRetain On",
"PowerOnState 3",
"SetOption1 0",
"SetOption3 1",
"SetOption73 0",
"rule1 on button1#state=10 do power0 toggle endon"
]
}
}