From d585f0f284ae4a02ad5b30255a78472fdca1e8f2 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Wed, 6 Aug 2025 20:31:57 -0500 Subject: [PATCH] Implement template activation, fix console settings duplication, and improve connection display --- TasmotaManager.py | 1055 +++++++++++++-------- TasmotaManager_fixed.py | 820 ++++++++++++++++ console_duplicate_template_fix_summary.md | 102 ++ console_settings_optimization.md | 78 ++ console_settings_summary.md | 55 ++ fulltopic_fix_summary.md | 44 + implementation_summary.md | 27 + network_configuration.json | 9 + process_unknown_optimization_summary.md | 50 + template_activation_fix_summary.md | 96 ++ test_fulltopic_fix.py | 119 +++ test_hostname_matching.py | 62 ++ test_process_unknown_optimization.py | 99 ++ test_rule1_encoding.py | 106 +++ test_template_activation.py | 236 +++++ test_template_matching.py | 239 +++++ test_unknown_device_console_settings.py | 58 ++ test_unknown_device_toggle.py | 42 + 18 files changed, 2893 insertions(+), 404 deletions(-) create mode 100644 TasmotaManager_fixed.py create mode 100644 console_duplicate_template_fix_summary.md create mode 100644 console_settings_optimization.md create mode 100644 console_settings_summary.md create mode 100644 fulltopic_fix_summary.md create mode 100644 implementation_summary.md create mode 100644 process_unknown_optimization_summary.md create mode 100644 template_activation_fix_summary.md create mode 100755 test_fulltopic_fix.py create mode 100755 test_hostname_matching.py create mode 100755 test_process_unknown_optimization.py create mode 100644 test_rule1_encoding.py create mode 100644 test_template_activation.py create mode 100644 test_template_matching.py create mode 100755 test_unknown_device_console_settings.py create mode 100755 test_unknown_device_toggle.py diff --git a/TasmotaManager.py b/TasmotaManager.py index 132570b..8d7fe6a 100644 --- a/TasmotaManager.py +++ b/TasmotaManager.py @@ -178,6 +178,15 @@ class TasmotaDiscovery: for device in all_clients: if self.is_tasmota_device(device): + # Determine connection type based on available fields + connection = "Unknown" + if device.get('essid'): + connection = f"Wireless - {device.get('essid')}" + elif device.get('radio') or device.get('wifi'): + connection = "Wireless" + elif device.get('port') or device.get('switch_port') or device.get('switch'): + connection = "Wired" + device_info = { "name": device.get('name', device.get('hostname', 'Unknown')), "ip": device.get('ip', ''), @@ -185,6 +194,7 @@ class TasmotaDiscovery: "last_seen": device.get('last_seen', ''), "hostname": device.get('hostname', ''), "notes": device.get('note', ''), + "connection": connection, } devices.append(device_info) @@ -419,12 +429,13 @@ class TasmotaDiscovery: for device in unknown_devices: name = device.get('name', 'Unknown') ip = device.get('ip') + connection = device.get('connection', 'Unknown') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue - self.logger.info(f"Processing unknown device: {name} at {ip}") + self.logger.info(f"Processing unknown device: {name} at {ip} with connection {connection}") # Check if device has a toggle button try: @@ -448,7 +459,7 @@ class TasmotaDiscovery: try: # Clear console output and show prompt print("\n" + "="*50) - print(f"DEVICE: {name} at IP: {ip}") + print(f"DEVICE: {name} at IP: {ip} Connection: {connection}") print(f"Current hostname: {original_hostname}") print("="*50) print("The device is now toggling to help you identify it.") @@ -476,13 +487,19 @@ class TasmotaDiscovery: # Prompt for new hostname print("\nPlease enter a new name for this device:") + print("(Enter nothing, 'unknown', or 'na' to assume device could not be found and end)") new_hostname = input("> ").strip() # Stop toggling stop_toggle.set() toggle_thread.join(timeout=3) - if new_hostname and new_hostname != original_hostname: + # Check for special inputs that indicate device could not be found + if not new_hostname or new_hostname.lower() == "unknown" or new_hostname.lower() == "na": + print("Assuming device could not be found, ending process") + return # End the entire process + + if new_hostname != original_hostname: print(f"Setting new hostname to: {new_hostname}") else: print("No valid hostname entered, skipping device") @@ -504,59 +521,347 @@ class TasmotaDiscovery: except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") - def configure_unknown_device(self, ip, hostname): - """Configure an unknown device with the given hostname and MQTT settings.""" + def check_and_update_template(self, ip, name): + """Check and update device template based on mqtt.config_other settings. + + Algorithm: + 1. Get the device name from the Configuration/Other page using Status 0 + 2. Get the current template using Template command + 3. Check if any key in mqtt.config_other matches the device name + 4. If a match is found, check if the template matches the value + 5. If the template doesn't match, write the value to the template + 6. If no key matches, check if any value matches the template + 7. If a value match is found, write the key to the device name + + Args: + ip: The IP address of the device + name: The name/hostname of the device + + Returns: + bool: True if template was updated, False otherwise + """ try: - # Set Friendly Name - friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}" - response = requests.get(friendly_name_url, timeout=5) - if response.status_code == 200: - self.logger.info(f"Set Friendly Name to {hostname}") - else: - self.logger.error(f"Failed to set Friendly Name to {hostname}") + # Get mqtt.config_other settings + config_other = self.config.get('mqtt', {}).get('config_other', {}) + if not config_other: + self.logger.debug(f"{name}: No mqtt.config_other settings found in configuration") + return False - # Enable MQTT if not already enabled - mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT - response = requests.get(mqtt_url, timeout=5) - if response.status_code == 200: - self.logger.info(f"Enabled MQTT for {hostname}") + # Get Status 0 for device name from Configuration/Other page + url_status0 = f"http://{ip}/cm?cmnd=Status%200" + response = requests.get(url_status0, timeout=5) + status0_data = response.json() + + # Extract device name from Status 0 response + device_name = status0_data.get("Status", {}).get("DeviceName", "") + if not device_name: + self.logger.debug(f"{name}: Could not get device name from Status 0") + return False + + self.logger.debug(f"{name}: Device name from Configuration/Other page: {device_name}") + + # Get current template + url_template = f"http://{ip}/cm?cmnd=Template" + response = requests.get(url_template, timeout=5) + template_data = response.json() + + # Log the actual response format for debugging + self.logger.debug(f"{name}: Template response: {template_data}") + + # Extract current template - handle different response formats + current_template = "" + + # Try different possible response formats + if "Template" in template_data: + current_template = template_data.get("Template", "") + elif isinstance(template_data, dict) and len(template_data) > 0: + # If there's no "Template" key but we have a dict, try to get the first value + # This handles cases where the response might be {"NAME":"...","GPIO":[...]} + first_key = next(iter(template_data)) + if isinstance(template_data[first_key], str) and "{" in template_data[first_key]: + current_template = template_data[first_key] + self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}") + # Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys + elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']): + # Convert the dict to a JSON string to match the expected format + import json + current_template = json.dumps(template_data) + self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys") + + if not current_template: + self.logger.debug(f"{name}: Could not get current template from response") + return False + + self.logger.debug(f"{name}: Current template: {current_template}") + + # Check if any key in mqtt.config_other matches the device name + template_updated = False + if device_name in config_other: + # Key matches device name, check if template matches value + template_value = config_other[device_name] + if current_template != template_value: + # Template doesn't match, write value to template + self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but template doesn't match") + self.logger.info(f"{name}: Setting template to: {template_value}") + + # URL encode the template value + import urllib.parse + encoded_value = urllib.parse.quote(template_value) + url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}" + + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.info(f"{name}: Template updated successfully") + + # Activate the template by setting module to 0 (Template module) + self.logger.info(f"{name}: Activating template by setting module to 0") + module_url = f"http://{ip}/cm?cmnd=Module%200" + module_response = requests.get(module_url, timeout=5) + + if module_response.status_code == 200: + self.logger.info(f"{name}: Module set to 0 successfully") + + # Restart the device to apply the template + self.logger.info(f"{name}: Restarting device to apply template") + restart_url = f"http://{ip}/cm?cmnd=Restart%201" + restart_response = requests.get(restart_url, timeout=5) + + if restart_response.status_code == 200: + self.logger.info(f"{name}: Device restart initiated successfully") + template_updated = True + else: + self.logger.error(f"{name}: Failed to restart device") + else: + self.logger.error(f"{name}: Failed to set module to 0") + else: + self.logger.error(f"{name}: Failed to update template") + else: + self.logger.debug(f"{name}: Device name '{device_name}' matches key in config_other and template matches value") else: - self.logger.error(f"Failed to enable MQTT for {hostname}") + # No key matches device name, check if any value matches the template + matching_key = None + for key, value in config_other.items(): + if value == current_template: + matching_key = key + break + + if matching_key: + # Value matches template, write key to device name + self.logger.info(f"{name}: Template matches value for key '{matching_key}' in config_other") + self.logger.info(f"{name}: Setting device name to: {matching_key}") + + url = f"http://{ip}/cm?cmnd=DeviceName%20{matching_key}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.info(f"{name}: Device name updated successfully") + + # Activate the template by setting module to 0 (Template module) + self.logger.info(f"{name}: Activating template by setting module to 0") + module_url = f"http://{ip}/cm?cmnd=Module%200" + module_response = requests.get(module_url, timeout=5) + + if module_response.status_code == 200: + self.logger.info(f"{name}: Module set to 0 successfully") + + # Restart the device to apply the template + self.logger.info(f"{name}: Restarting device to apply template") + restart_url = f"http://{ip}/cm?cmnd=Restart%201" + restart_response = requests.get(restart_url, timeout=5) + + if restart_response.status_code == 200: + self.logger.info(f"{name}: Device restart initiated successfully") + template_updated = True + else: + self.logger.error(f"{name}: Failed to restart device") + else: + self.logger.error(f"{name}: Failed to set module to 0") + else: + self.logger.error(f"{name}: Failed to update device name") + else: + self.logger.debug(f"{name}: No matches found in config_other for device name or template") + + return template_updated + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error checking/updating template for device at {ip}: {str(e)}") + return False + + def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False): + """Configure MQTT settings for a device. + + Args: + ip: The IP address of the device + name: The name/hostname of the device + mqtt_status: The current MQTT status of the device (from Status 6) + is_new_device: Whether this is a new device (True) or existing device (False) + set_friendly_name: Whether to set the friendly name + enable_mqtt: Whether to enable MQTT + with_retry: Whether to use retry logic + reboot: Whether to reboot the device after configuration + + Returns: + bool: True if configuration was successful, False otherwise + """ + try: + # Set Friendly Name if requested + if set_friendly_name: + friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{name}" + response = requests.get(friendly_name_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Set Friendly Name to {name}") + else: + self.logger.error(f"Failed to set Friendly Name to {name}") + + # Enable MQTT if requested + if enable_mqtt: + mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT + response = requests.get(mqtt_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Enabled MQTT for {name}") + else: + self.logger.error(f"Failed to enable MQTT for {name}") # Configure MQTT settings mqtt_config = self.config.get('mqtt', {}) - if mqtt_config: - # Get the base hostname (everything before the dash) - hostname_base = hostname.split('-')[0] if '-' in hostname else hostname + if not mqtt_config: + self.logger.error("MQTT configuration missing from config file") + return False - mqtt_fields = { - "MqttHost": mqtt_config.get('Host', ''), - "MqttPort": mqtt_config.get('Port', 1883), - "MqttUser": mqtt_config.get('User', ''), - "MqttPassword": mqtt_config.get('Password', ''), - "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), - "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), - } + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + # Define MQTT fields + mqtt_fields = { + "MqttHost": mqtt_config.get('Host', ''), + "MqttPort": mqtt_config.get('Port', 1883), + "MqttUser": mqtt_config.get('User', ''), + "MqttPassword": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + # For existing devices, check if MQTT settings need to be updated + changes_needed = [] + if not is_new_device and mqtt_status: + device_mqtt = mqtt_status.get('MqttHost', {}) + force_password_update = False - for setting, value in mqtt_fields.items(): + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['MqttHost']: + changes_needed.append(('MqttHost', mqtt_fields['MqttHost'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['MqttHost']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['MqttPort']: + changes_needed.append(('MqttPort', mqtt_fields['MqttPort'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['MqttPort']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['MqttUser']: + changes_needed.append(('MqttUser', mqtt_fields['MqttUser'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['MqttUser']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['MqttPassword'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting + no_retain = mqtt_config.get('NoRetain', False) + if no_retain: + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + else: + changes_needed.append(('SetOption62', '0')) # 0 = Use Retain + else: + # For new devices, set all MQTT settings + for field, value in mqtt_fields.items(): + changes_needed.append((field, value)) + + # Apply MQTT settings + mqtt_updated = False + for setting, value in changes_needed: + try: # For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value if setting == "FullTopic": url = f"http://{ip}/cm?cmnd={setting}{value}" else: url = f"http://{ip}/cm?cmnd={setting}%20{value}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - if setting != 'MqttPassword': - self.logger.info(f"{hostname}: Set {setting} to {value}") - else: - self.logger.info(f"{hostname}: Set MQTT Password") + + if with_retry: + # With retry logic + success = False + attempts = 0 + max_attempts = 3 + last_error = None + + while not success and attempts < max_attempts: + attempts += 1 + try: + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + mqtt_updated = True + success = True + else: + self.logger.warning(f"{name}: Failed to update {setting} (attempt {attempts}/{max_attempts})") + last_error = f"HTTP {response.status_code}" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.Timeout as e: + self.logger.warning(f"{name}: Timeout updating {setting} (attempt {attempts}/{max_attempts})") + last_error = "Timeout" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.RequestException as e: + self.logger.warning(f"{name}: Error updating {setting}: {str(e)} (attempt {attempts}/{max_attempts})") + last_error = str(e) + if attempts < max_attempts: + time.sleep(1) # Wait before retry + + if not success: + self.logger.error(f"{name}: Failed to update {setting} after {max_attempts} attempts. Last error: {last_error}") + # Track the failure for later reporting + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{setting} {value}", + "error": last_error + }) else: - self.logger.error(f"{hostname}: Failed to set {setting}") + # Without retry logic + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.info(f"{name}: Set {setting} to {value}") + else: + self.logger.info(f"{name}: Set MQTT Password") + mqtt_updated = True + else: + self.logger.error(f"{name}: Failed to set {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") - # Apply console settings before rebooting + # Apply console settings + console_updated = False console_params = mqtt_config.get('console', {}) if console_params: - self.logger.info(f"{hostname}: Setting console parameters from configuration") + self.logger.info(f"{name}: Setting console parameters from configuration") # Special handling for Retain parameters - need to send opposite state first, then final state # This is necessary because the changes are what create the update of the Retain state at the MQTT server @@ -570,26 +875,123 @@ class TasmotaDiscovery: # Set opposite state first opposite_value = "On" if final_value.lower() == "off" else "Off" - # First command (opposite state) - url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.debug(f"{hostname}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") + if with_retry: + # First command (opposite state) - with retry logic + url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" + success = False + attempts = 0 + max_attempts = 3 + last_error = None + + while not success and attempts < max_attempts: + attempts += 1 + try: + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") + console_updated = True + success = True + else: + self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})") + last_error = f"HTTP {response.status_code}" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.Timeout as e: + self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})") + last_error = "Timeout" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.RequestException as e: + self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})") + last_error = str(e) + if attempts < max_attempts: + time.sleep(1) # Wait before retry + + if not success: + self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}") + # Track the failure for later reporting + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{param} {opposite_value}", + "error": last_error + }) else: - self.logger.error(f"{hostname}: Failed to set {param} to {opposite_value}") + # First command (opposite state) - without retry logic + url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") + else: + self.logger.error(f"{name}: Failed to set {param} to {opposite_value}") # Small delay to ensure commands are processed in order time.sleep(0.5) - # Second command (final state) - url = f"http://{ip}/cm?cmnd={param}%20{final_value}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.debug(f"{hostname}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") + if with_retry: + # Second command (final state) - with retry logic + url = f"http://{ip}/cm?cmnd={param}%20{final_value}" + success = False + attempts = 0 + last_error = None + + while not success and attempts < max_attempts: + attempts += 1 + try: + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") + console_updated = True + success = True + else: + self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})") + last_error = f"HTTP {response.status_code}" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.Timeout as e: + self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})") + last_error = "Timeout" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.RequestException as e: + self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})") + last_error = str(e) + if attempts < max_attempts: + time.sleep(1) # Wait before retry + + if not success: + self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}") + # Track the failure for later reporting + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{param} {final_value}", + "error": last_error + }) else: - self.logger.error(f"{hostname}: Failed to set {param} to {final_value}") + # Second command (final state) - without retry logic + url = f"http://{ip}/cm?cmnd={param}%20{final_value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") + else: + self.logger.error(f"{name}: Failed to set {param} to {final_value}") except Exception as e: - self.logger.error(f"{hostname}: Unexpected error setting {param} commands: {str(e)}") + self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}") + # Track the failure for later reporting if using retry logic + if with_retry: + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{param} (both steps)", + "error": str(e) + }) # Process all other console parameters # Track rules that need to be enabled @@ -605,44 +1007,192 @@ class TasmotaDiscovery: # Store the rule number for later enabling rule_num = param[-1] rules_to_enable[rule_num] = True - self.logger.debug(f"{hostname}: Detected rule definition {param}, will auto-enable") + if with_retry: + self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable") + else: + self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable") + + # Skip Rule1, Rule2, etc. if we're auto-enabling rules and using retry logic + if with_retry and param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit(): + # If this is in the config, we'll respect it, but log that it's not needed + self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature") # Regular console parameter - url = f"http://{ip}/cm?cmnd={param}%20{value}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.debug(f"{hostname}: Set console parameter {param} to {value}") + # Special handling for rule parameters to properly encode the URL + if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): + # For rule commands, we need to URL encode the entire value to preserve special characters + import urllib.parse + encoded_value = urllib.parse.quote(value) + url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}" + self.logger.info(f"{name}: Sending rule command: {url}") else: - self.logger.error(f"{hostname}: Failed to set console parameter {param}") + url = f"http://{ip}/cm?cmnd={param}%20{value}" + + if with_retry: + # With retry logic + success = False + attempts = 0 + max_attempts = 3 + last_error = None + + while not success and attempts < max_attempts: + attempts += 1 + try: + response = requests.get(url, timeout=5) + if response.status_code == 200: + # Special logging for rule parameters + if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): + self.logger.info(f"{name}: Rule command response: {response.text}") + self.logger.info(f"{name}: Set rule {param} to '{value}'") + else: + self.logger.debug(f"{name}: Set console parameter {param} to {value}") + console_updated = True + success = True + else: + self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})") + last_error = f"HTTP {response.status_code}" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.Timeout as e: + self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})") + last_error = "Timeout" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.RequestException as e: + self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})") + last_error = str(e) + if attempts < max_attempts: + time.sleep(1) # Wait before retry + + if not success: + self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}") + # Track the failure for later reporting + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{param} {value}", + "error": last_error + }) + else: + # Without retry logic + response = requests.get(url, timeout=5) + if response.status_code == 200: + if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): + self.logger.info(f"{name}: Rule command response: {response.text}") + self.logger.info(f"{name}: Set rule {param} to '{value}'") + else: + self.logger.debug(f"{name}: Set console parameter {param} to {value}") + else: + self.logger.error(f"{name}: Failed to set console parameter {param}") # Auto-enable any rules that were defined + if with_retry: + self.logger.info(f"{name}: Rules to enable: {rules_to_enable}") + for rule_num in rules_to_enable: rule_enable_param = f"Rule{rule_num}" + # Skip if the rule enable command was already in the config - if any(p.lower() == rule_enable_param.lower() for p in console_params): - continue + if with_retry: + # Check if the uppercase version (Rule1) is in the config + if rule_enable_param in console_params: + self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)") + continue + # Check if the lowercase version (rule1) is in the config + lowercase_rule_param = f"rule{rule_num}" + if lowercase_rule_param in console_params: + self.logger.info(f"{name}: Found lowercase {lowercase_rule_param} in config, will enable {rule_enable_param}") + # Don't continue - we want to enable the rule + else: + self.logger.info(f"{name}: No rule definition found in config, skipping auto-enable") + continue + else: + # Simple check for any version of the rule enable command + if any(p.lower() == rule_enable_param.lower() for p in console_params): + continue + # Rule auto-enabling url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.info(f"{hostname}: Auto-enabled {rule_enable_param}") + + if with_retry: + # With retry logic + success = False + attempts = 0 + max_attempts = 3 + last_error = None + + while not success and attempts < max_attempts: + attempts += 1 + try: + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.info(f"{name}: Auto-enabled {rule_enable_param}") + console_updated = True + success = True + else: + self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})") + last_error = f"HTTP {response.status_code}" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.Timeout as e: + self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})") + last_error = "Timeout" + if attempts < max_attempts: + time.sleep(1) # Wait before retry + except requests.exceptions.RequestException as e: + self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})") + last_error = str(e) + if attempts < max_attempts: + time.sleep(1) # Wait before retry + + if not success: + self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}") + # Track the failure for later reporting + if not hasattr(self, 'command_failures'): + self.command_failures = [] + self.command_failures.append({ + "device": name, + "ip": ip, + "command": f"{rule_enable_param} 1", + "error": last_error + }) else: - self.logger.error(f"{hostname}: Failed to auto-enable {rule_enable_param}") + # Without retry logic + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.info(f"{name}: Auto-enabled {rule_enable_param}") + else: + self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}") - # Save configuration (will reboot the device) - save_url = f"http://{ip}/cm?cmnd=Restart%201" - response = requests.get(save_url, timeout=5) - if response.status_code == 200: - self.logger.info(f"Saved configuration and rebooted {hostname}") - else: - self.logger.error(f"Failed to save configuration for {hostname}") - - return True + # Reboot the device if requested + if reboot: + save_url = f"http://{ip}/cm?cmnd=Restart%201" + response = requests.get(save_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Saved configuration and rebooted {name}") + else: + self.logger.error(f"Failed to save configuration for {name}") + + return mqtt_updated or console_updated except requests.exceptions.RequestException as e: self.logger.error(f"Error configuring device at {ip}: {str(e)}") return False + + def configure_unknown_device(self, ip, hostname): + """Configure an unknown device with the given hostname and MQTT settings.""" + return self.configure_mqtt_settings( + ip=ip, + name=hostname, + is_new_device=True, + set_friendly_name=True, + enable_mqtt=True, + with_retry=False, + reboot=True + ) def is_ip_in_network_filter(self, ip_address): """Check if an IP address is in any of the configured network filters. @@ -797,6 +1347,15 @@ class TasmotaDiscovery: self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}") break + # Determine connection type based on available fields + connection = "Unknown" + if target_device.get('essid'): + connection = f"Wireless - {target_device.get('essid')}" + elif target_device.get('radio') or target_device.get('wifi'): + connection = "Wireless" + elif target_device.get('port') or target_device.get('switch_port') or target_device.get('switch'): + connection = "Wired" + # Create a device info dictionary device_info = { "name": device_name, @@ -805,6 +1364,7 @@ class TasmotaDiscovery: "last_seen": target_device.get('last_seen', ''), "hostname": device_hostname, "notes": target_device.get('note', ''), + "connection": connection, } # Process the device based on whether it's unknown or not @@ -832,7 +1392,7 @@ class TasmotaDiscovery: try: # Clear console output and show prompt print("\n" + "="*50) - print(f"DEVICE: {device_name} at IP: {device_ip}") + print(f"DEVICE: {device_name} at IP: {device_ip} Connection: {connection}") print(f"Current hostname: {device_hostname}") print("="*50) print("The device is now toggling to help you identify it.") @@ -965,80 +1525,17 @@ class TasmotaDiscovery: def check_mqtt_settings(ip, name, mqtt_status): """Check and update MQTT settings if they don't match config""" - # Get the base hostname (everything before the dash) - hostname_base = name.split('-')[0] if '-' in name else name - - mqtt_fields = { - "Host": mqtt_config.get('Host', ''), - "Port": mqtt_config.get('Port', 1883), - "User": mqtt_config.get('User', ''), - "Password": mqtt_config.get('Password', ''), - "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), - "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), - } - - device_mqtt = mqtt_status.get('MqttHost', {}) - changes_needed = [] - force_password_update = False - - # Check each MQTT setting - if device_mqtt.get('Host') != mqtt_fields['Host']: - changes_needed.append(('MqttHost', mqtt_fields['Host'])) - self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") - force_password_update = True - - if device_mqtt.get('Port') != mqtt_fields['Port']: - changes_needed.append(('MqttPort', mqtt_fields['Port'])) - self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") - force_password_update = True - - if device_mqtt.get('User') != mqtt_fields['User']: - changes_needed.append(('MqttUser', mqtt_fields['User'])) - self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") - force_password_update = True - - if device_mqtt.get('Topic') != mqtt_fields['Topic']: - changes_needed.append(('Topic', mqtt_fields['Topic'])) - self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") - force_password_update = True - - if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: - changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) - self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") - force_password_update = True - - # Add password update if any MQTT setting changed or user was updated - if force_password_update: - changes_needed.append(('MqttPassword', mqtt_fields['Password'])) - self.logger.debug(f"{name}: MQTT Password will be updated") - - # Check NoRetain setting - FIXED: Use the actual value from config with default of False - no_retain = mqtt_config.get('NoRetain', False) - if no_retain: - changes_needed.append(('SetOption62', '1')) # 1 = No Retain - else: - changes_needed.append(('SetOption62', '0')) # 0 = Use Retain - - # Apply changes if needed - for setting, value in changes_needed: - try: - # For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value - if setting == "FullTopic": - url = f"http://{ip}/cm?cmnd={setting}{value}" - else: - url = f"http://{ip}/cm?cmnd={setting}%20{value}" - response = requests.get(url, timeout=5) - if response.status_code == 200: - if setting != 'MqttPassword': - self.logger.debug(f"{name}: Updated {setting} to {value}") - else: - self.logger.debug(f"{name}: Updated MQTT Password") - else: - self.logger.error(f"{name}: Failed to update {setting}") - except requests.exceptions.RequestException as e: - self.logger.error(f"{name}: Error updating {setting}: {str(e)}") - - return len(changes_needed) > 0 + # Use the unified MQTT configuration method + return self.configure_mqtt_settings( + ip=ip, + name=name, + mqtt_status=mqtt_status, + is_new_device=False, + set_friendly_name=False, + enable_mqtt=False, + with_retry=True, + reboot=False + ) for device in devices: if not isinstance(device, dict): @@ -1074,262 +1571,11 @@ class TasmotaDiscovery: # Check and update MQTT settings if needed mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) - # Set console parameters from config - console_updated = False - console_params = mqtt_config.get('console', {}) - if console_params: - self.logger.info(f"{name}: Setting console parameters from configuration") - - # Special handling for Retain parameters - need to send opposite state first, then final state - # This is necessary because the changes are what create the update of the Retain state at the MQTT server - retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] - - # Process Retain parameters first - for param in retain_params: - if param in console_params: - try: - final_value = console_params[param] - # Set opposite state first - opposite_value = "On" if final_value.lower() == "off" else "Off" - - # First command (opposite state) - with retry logic - url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" - success = False - attempts = 0 - max_attempts = 3 - last_error = None - - while not success and attempts < max_attempts: - attempts += 1 - try: - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") - console_updated = True - success = True - else: - self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})") - last_error = f"HTTP {response.status_code}" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.Timeout as e: - self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})") - last_error = "Timeout" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.RequestException as e: - self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})") - last_error = str(e) - if attempts < max_attempts: - time.sleep(1) # Wait before retry - - if not success: - self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}") - # Track the failure for later reporting - if not hasattr(self, 'command_failures'): - self.command_failures = [] - self.command_failures.append({ - "device": name, - "ip": ip, - "command": f"{param} {opposite_value}", - "error": last_error - }) - - # Small delay to ensure commands are processed in order - time.sleep(0.5) - - # Second command (final state) - with retry logic - url = f"http://{ip}/cm?cmnd={param}%20{final_value}" - success = False - attempts = 0 - last_error = None - - while not success and attempts < max_attempts: - attempts += 1 - try: - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") - console_updated = True - success = True - else: - self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})") - last_error = f"HTTP {response.status_code}" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.Timeout as e: - self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})") - last_error = "Timeout" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.RequestException as e: - self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})") - last_error = str(e) - if attempts < max_attempts: - time.sleep(1) # Wait before retry - - if not success: - self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}") - # Track the failure for later reporting - if not hasattr(self, 'command_failures'): - self.command_failures = [] - self.command_failures.append({ - "device": name, - "ip": ip, - "command": f"{param} {final_value}", - "error": last_error - }) - except Exception as e: - self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}") - # Track the failure for later reporting - if not hasattr(self, 'command_failures'): - self.command_failures = [] - self.command_failures.append({ - "device": name, - "ip": ip, - "command": f"{param} (both steps)", - "error": str(e) - }) - - # Process all other console parameters - # Track rules that need to be enabled - rules_to_enable = {} - - for param, value in console_params.items(): - # Skip Retain parameters as they're handled specially above - if param in retain_params: - continue - - # Check if this is a rule definition (lowercase rule1, rule2, etc.) - if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): - # Store the rule number for later enabling - rule_num = param[-1] - rules_to_enable[rule_num] = True - self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable") - - # Skip Rule1, Rule2, etc. if we're auto-enabling rules - if param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit(): - # If this is in the config, we'll respect it, but log that it's not needed - self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature") - - # Regular console parameter - with retry logic - # Special handling for rule parameters to properly encode the URL - if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): - # For rule commands, we need to URL encode the entire value to preserve special characters - import urllib.parse - encoded_value = urllib.parse.quote(value) - url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}" - self.logger.info(f"{name}: Sending rule command: {url}") - else: - url = f"http://{ip}/cm?cmnd={param}%20{value}" - - success = False - attempts = 0 - max_attempts = 3 - last_error = None - - while not success and attempts < max_attempts: - attempts += 1 - try: - response = requests.get(url, timeout=5) - if response.status_code == 200: - # Special logging for rule parameters - if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): - self.logger.info(f"{name}: Rule command response: {response.text}") - self.logger.info(f"{name}: Set rule {param} to '{value}'") - else: - self.logger.debug(f"{name}: Set console parameter {param} to {value}") - console_updated = True - success = True - else: - self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})") - last_error = f"HTTP {response.status_code}" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.Timeout as e: - self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})") - last_error = "Timeout" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.RequestException as e: - self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})") - last_error = str(e) - if attempts < max_attempts: - time.sleep(1) # Wait before retry - - if not success: - self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}") - # Track the failure for later reporting - if not hasattr(self, 'command_failures'): - self.command_failures = [] - self.command_failures.append({ - "device": name, - "ip": ip, - "command": f"{param} {value}", - "error": last_error - }) - - # Auto-enable any rules that were defined - self.logger.info(f"{name}: Rules to enable: {rules_to_enable}") - for rule_num in rules_to_enable: - rule_enable_param = f"Rule{rule_num}" - # Skip if the rule enable command was already in the config - # Check if the uppercase version (Rule1) is in the config - if rule_enable_param in console_params: - self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)") - continue - - # Check if the lowercase version (rule1) is in the config - lowercase_rule_param = f"rule{rule_num}" - if lowercase_rule_param in console_params: - self.logger.info(f"{name}: Found lowercase {lowercase_rule_param} in config, will enable {rule_enable_param}") - # Don't continue - we want to enable the rule - else: - self.logger.info(f"{name}: No rule definition found in config, skipping auto-enable") - continue - - # Rule auto-enabling - with retry logic - url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" - success = False - attempts = 0 - max_attempts = 3 - last_error = None - - while not success and attempts < max_attempts: - attempts += 1 - try: - response = requests.get(url, timeout=5) - if response.status_code == 200: - self.logger.info(f"{name}: Auto-enabled {rule_enable_param}") - console_updated = True - success = True - else: - self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})") - last_error = f"HTTP {response.status_code}" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.Timeout as e: - self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})") - last_error = "Timeout" - if attempts < max_attempts: - time.sleep(1) # Wait before retry - except requests.exceptions.RequestException as e: - self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})") - last_error = str(e) - if attempts < max_attempts: - time.sleep(1) # Wait before retry - - if not success: - self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}") - # Track the failure for later reporting - if not hasattr(self, 'command_failures'): - self.command_failures = [] - self.command_failures.append({ - "device": name, - "ip": ip, - "command": f"{rule_enable_param} 1", - "error": last_error - }) + # Check and update template if needed + template_updated = self.check_and_update_template(ip, name) + + # Console settings are now applied in configure_mqtt_settings + console_updated = mqtt_updated device_detail = { "name": name, @@ -1339,6 +1585,7 @@ class TasmotaDiscovery: "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), "mqtt_status": "Updated" if mqtt_updated else "Verified", "console_status": "Updated" if console_updated else "Verified", + "template_status": "Updated" if template_updated else "Verified", "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "online" } diff --git a/TasmotaManager_fixed.py b/TasmotaManager_fixed.py new file mode 100644 index 0000000..08ebd58 --- /dev/null +++ b/TasmotaManager_fixed.py @@ -0,0 +1,820 @@ +import json +import logging +import os +import sys +from datetime import datetime +from typing import Optional +import requests +from urllib3.exceptions import InsecureRequestWarning +import re # Import the regular expression module +import time +import argparse + +# Disable SSL warnings +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +class UnifiClient: + def __init__(self, base_url, username, password, site_id, verify_ssl=True): + self.base_url = base_url.rstrip('/') + self.username = username + self.password = password + self.site_id = site_id + self.session = requests.Session() + self.session.verify = verify_ssl + + # Initialize cookie jar + self.session.cookies.clear() + + def _login(self) -> requests.Response: # Changed return type annotation + """Authenticate with the UniFi Controller.""" + login_url = f"{self.base_url}/api/auth/login" + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + payload = { + "username": self.username, + "password": self.password, + "remember": False + } + try: + response = self.session.post( + login_url, + json=payload, + headers=headers + ) + response.raise_for_status() + if 'X-CSRF-Token' in response.headers: + self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] + return response # Return the response object + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response.status_code == 401: + raise Exception("Authentication failed. Please verify your username and password.") from e + raise + + def get_clients(self) -> list: + """Get all clients from the UniFi Controller.""" + # Try the newer API endpoint first + url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If the newer endpoint fails, try the legacy endpoint + url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If both fail, try the v2 API endpoint + url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + +class TasmotaDiscovery: + def __init__(self, debug: bool = False): + """Initialize the TasmotaDiscovery with optional debug mode.""" + log_level = logging.DEBUG if debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + self.logger = logging.getLogger(__name__) + self.config = None + self.unifi_client = None + + def load_config(self, config_path: Optional[str] = None) -> dict: + """Load configuration from JSON file.""" + if config_path is None: + config_path = os.path.join(os.path.dirname(__file__), 'config.json') + + self.logger.debug(f"Loading configuration from: {config_path}") + try: + with open(config_path, 'r') as config_file: + self.config = json.load(config_file) + self.logger.debug("Configuration loaded successfully from %s", config_path) + return self.config + except FileNotFoundError: + self.logger.error(f"Configuration file not found at {config_path}") + sys.exit(1) + except json.JSONDecodeError: + self.logger.error("Invalid JSON in configuration file") + sys.exit(1) + + def setup_unifi_client(self): + """Set up the UniFi client with better error handling""" + self.logger.debug("Setting up UniFi client") + + if not self.config or 'unifi' not in self.config: + raise ValueError("Missing UniFi configuration") + + unifi_config = self.config['unifi'] + required_fields = ['host', 'username', 'password', 'site'] + missing_fields = [field for field in required_fields if field not in unifi_config] + + if missing_fields: + raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") + + try: + self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") + self.unifi_client = UnifiClient( + base_url=unifi_config['host'], + username=unifi_config['username'], + password=unifi_config['password'], + site_id=unifi_config['site'], + verify_ssl=False # Add this if using self-signed certificates + ) + + # Test the connection by making a simple request + response = self.unifi_client._login() + if not response: + raise ConnectionError(f"Failed to connect to UniFi controller: No response") + + self.logger.debug("UniFi client setup successful") + + except Exception as e: + self.logger.error(f"Error setting up UniFi client: {str(e)}") + raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") + + def is_tasmota_device(self, device: dict) -> bool: + """Determine if a device is in the network_filter and not in exclude_patterns.""" + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + ip = device.get('ip', '') + + # Check if device is in the configured network + network_filters = self.config['unifi'].get('network_filter', {}) + for network in network_filters.values(): + if ip.startswith(network['subnet']): + self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}") + + # Check exclusion patterns + exclude_patterns = network.get('exclude_patterns', []) + for pattern in exclude_patterns: + pattern = pattern.lower() + # Convert glob pattern to regex pattern + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") + return False + + # Device is in the network and not excluded + self.logger.debug(f"Found device in network: {name}") + return True + + return False + + def get_tasmota_devices(self) -> list: + """Query UniFi controller and filter Tasmota devices.""" + devices = [] + self.logger.debug("Querying UniFi controller for devices") + try: + all_clients = self.unifi_client.get_clients() + self.logger.debug(f"Found {len(all_clients)} total devices") + + for device in all_clients: + if self.is_tasmota_device(device): + device_info = { + "name": device.get('name', device.get('hostname', 'Unknown')), + "ip": device.get('ip', ''), + "mac": device.get('mac', ''), + "last_seen": device.get('last_seen', ''), + "hostname": device.get('hostname', ''), + "notes": device.get('note', ''), + } + devices.append(device_info) + + self.logger.debug(f"Found {len(devices)} Tasmota devices") + return devices + except Exception as e: + self.logger.error(f"Error getting devices from UniFi controller: {e}") + return [] + + def save_tasmota_config(self, devices: list) -> None: + """Save Tasmota device information to a JSON file with device tracking.""" + filename = "current.json" + self.logger.debug(f"Saving Tasmota configuration to {filename}") + deprecated_filename = "deprecated.json" + + current_devices = [] + deprecated_devices = [] + + # Load existing devices if file exists + if os.path.exists(filename): + try: + with open(filename, 'r') as f: + existing_config = json.load(f) + current_devices = existing_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {filename}, treating as empty") + current_devices = [] + + # Load deprecated devices if file exists + if os.path.exists(deprecated_filename): + try: + with open(deprecated_filename, 'r') as f: + deprecated_config = json.load(f) + deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {deprecated_filename}, treating as empty") + deprecated_devices = [] + + # Create new config + new_devices = [] + moved_to_deprecated = [] + restored_from_deprecated = [] + removed_from_deprecated = [] + excluded_devices = [] + + # Check for excluded devices in current and deprecated lists + network_filters = self.config['unifi'].get('network_filter', {}) + exclude_patterns = [] + for network in network_filters.values(): + exclude_patterns.extend(network.get('exclude_patterns', [])) + + # Function to check if device is excluded + def is_device_excluded(device_name: str, hostname: str = '') -> bool: + name = device_name.lower() + hostname = hostname.lower() + for pattern in exclude_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + return True + return False + + # Process current devices + for device in devices: + device_name = device['name'] + device_hostname = device.get('hostname', '') + device_ip = device['ip'] + device_mac = device['mac'] + + # Check if device should be excluded + if is_device_excluded(device_name, device_hostname): + print(f"Device {device_name} excluded by pattern - skipping") + excluded_devices.append(device_name) + continue + + # Check in current devices + existing_device = next((d for d in current_devices + if d['name'] == device_name), None) + + if existing_device: + # Device exists, check if IP or MAC changed + if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: + moved_to_deprecated.append(existing_device) + new_devices.append(device) + print(f"Device {device_name} moved to deprecated (IP/MAC changed)") + else: + new_devices.append(existing_device) # Keep existing device + else: + # New device, check if it was in deprecated + deprecated_device = next((d for d in deprecated_devices + if d['name'] == device_name), None) + if deprecated_device: + removed_from_deprecated.append(device_name) + print(f"Device {device_name} removed from deprecated (restored)") + new_devices.append(device) + print(f"Device {device_name} added to output file") + + # Find devices that are no longer present + current_names = {d['name'] for d in devices} + for existing_device in current_devices: + if existing_device['name'] not in current_names: + if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): + moved_to_deprecated.append(existing_device) + print(f"Device {existing_device['name']} moved to deprecated (no longer present)") + + # Update deprecated devices list, excluding any excluded devices + final_deprecated = [] + for device in deprecated_devices: + if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): + final_deprecated.append(device) + elif is_device_excluded(device['name'], device.get('hostname', '')): + print(f"Device {device['name']} removed from deprecated (excluded by pattern)") + + final_deprecated.extend(moved_to_deprecated) + + # Save new configuration + config = { + "tasmota": { + "devices": new_devices, + "generated_at": datetime.now().isoformat(), + "total_devices": len(new_devices) + } + } + + # Save deprecated configuration + deprecated_config = { + "tasmota": { + "devices": final_deprecated, + "generated_at": datetime.now().isoformat(), + "total_devices": len(final_deprecated) + } + } + + # Backup existing file if it exists + if os.path.exists(filename): + try: + backup_name = f"{filename}.backup" + os.rename(filename, backup_name) + self.logger.info(f"Created backup of existing configuration as {backup_name}") + except Exception as e: + self.logger.error(f"Error creating backup: {e}") + + # Save files + try: + with open(filename, 'w') as f: + json.dump(config, f, indent=4) + with open(deprecated_filename, 'w') as f: + json.dump(deprecated_config, f, indent=4) + + self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") + self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") + + print("\nDevice Status Summary:") + if excluded_devices: + print("\nExcluded Devices:") + for name in excluded_devices: + print(f"- {name}") + + if moved_to_deprecated: + print("\nMoved to deprecated:") + for device in moved_to_deprecated: + print(f"- {device['name']}") + + if removed_from_deprecated: + print("\nRestored from deprecated:") + for name in removed_from_deprecated: + print(f"- {name}") + + print("\nCurrent Tasmota Devices:") + for device in new_devices: + print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") + + except Exception as e: + self.logger.error(f"Error saving configuration: {e}") + + def get_unknown_devices(self, use_current_json=True): + """Identify devices that match unknown_device_patterns from current.json.""" + self.logger.info("Identifying unknown devices for processing...") + unknown_devices = [] + + try: + source_file = 'current.json' if use_current_json else 'tasmota.json' + with open(source_file, 'r') as f: + data = json.load(f) + all_devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") + except FileNotFoundError: + self.logger.error(f"{source_file} not found. Run discovery first.") + return [] + except json.JSONDecodeError: + self.logger.error(f"Invalid JSON format in {source_file}") + return [] + + # Identify devices matching unknown_device_patterns + network_filters = self.config['unifi'].get('network_filter', {}) + unknown_patterns = [] + for network in network_filters.values(): + unknown_patterns.extend(network.get('unknown_device_patterns', [])) + + for device in all_devices: + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + + for pattern in unknown_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): + self.logger.debug(f"Found unknown device: {name} ({hostname})") + unknown_devices.append(device) + break + + self.logger.info(f"Found {len(unknown_devices)} unknown devices to process") + return unknown_devices + + def process_unknown_devices(self): + """Process unknown devices by checking for toggle button and configuring them. + + This method: + 1. Identifies devices matching unknown_device_patterns + 2. Checks if each device has a toggle button (indicating it's a light/switch) + 3. Toggles the button at 1/2Hz while checking for hostname changes + 4. Prompts the user to enter a new name for the device in the console + 5. Once a name is entered, configures the device with the new hostname + """ + unknown_devices = self.get_unknown_devices() + if not unknown_devices: + self.logger.info("No unknown devices found to process") + return + + self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...") + + for device in unknown_devices: + name = device.get('name', 'Unknown') + ip = device.get('ip') + + if not ip: + self.logger.warning(f"Skipping device {name} - no IP address") + continue + + self.logger.info(f"Processing unknown device: {name} at {ip}") + + # Check if device has a toggle button + try: + # Get the main page to check for toggle button + url = f"http://{ip}/" + response = requests.get(url, timeout=5) + + # Check if there's a toggle button in the response + has_toggle = "toggle" in response.text.lower() + + if has_toggle: + self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug") + + # Start toggling at 1/2Hz + original_hostname = device.get('hostname', '') + toggle_state = False + + # Temporarily disable all logging during toggling + logging.disable(logging.CRITICAL) + + try: + # Clear console output and show prompt + print("\n" + "="*50) + print(f"DEVICE: {name} at IP: {ip}") + print(f"Current hostname: {original_hostname}") + print("="*50) + print("The device is now toggling to help you identify it.") + + # Start toggling in background while waiting for input + import threading + stop_toggle = threading.Event() + + def toggle_device(): + toggle_state = False + while not stop_toggle.is_set(): + toggle_state = not toggle_state + toggle_cmd = "Power On" if toggle_state else "Power Off" + toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}" + try: + requests.get(toggle_url, timeout=2) + except: + pass + time.sleep(2.0) # 1/2Hz rate + + # Start toggle thread + toggle_thread = threading.Thread(target=toggle_device) + toggle_thread.daemon = True + toggle_thread.start() + + # Prompt for new hostname + print("\nPlease enter a new name for this device:") + new_hostname = input("> ").strip() + + # Stop toggling + stop_toggle.set() + toggle_thread.join(timeout=3) + + if new_hostname and new_hostname != original_hostname: + print(f"Setting new hostname to: {new_hostname}") + else: + print("No valid hostname entered, skipping device") + new_hostname = "" + + finally: + # Re-enable logging + logging.disable(logging.NOTSET) + + # If a new hostname was entered, configure the device + if new_hostname: + self.logger.info(f"Configuring device with new hostname: {new_hostname}") + self.configure_unknown_device(ip, new_hostname) + else: + self.logger.warning(f"No new hostname provided for {name}, skipping configuration") + else: + self.logger.info(f"Device {name} does not have a toggle button, skipping") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") + + def configure_unknown_device(self, ip, hostname): + """Configure an unknown device with the given hostname and MQTT settings.""" + try: + # Set Friendly Name + friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}" + response = requests.get(friendly_name_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Set Friendly Name to {hostname}") + else: + self.logger.error(f"Failed to set Friendly Name to {hostname}") + + # Enable MQTT if not already enabled + mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT + response = requests.get(mqtt_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Enabled MQTT for {hostname}") + else: + self.logger.error(f"Failed to enable MQTT for {hostname}") + + # Configure MQTT settings + mqtt_config = self.config.get('mqtt', {}) + if mqtt_config: + # Get the base hostname (everything before the dash) + hostname_base = hostname.split('-')[0] if '-' in hostname else hostname + + mqtt_fields = { + "MqttHost": mqtt_config.get('Host', ''), + "MqttPort": mqtt_config.get('Port', 1883), + "MqttUser": mqtt_config.get('User', ''), + "MqttPassword": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + for setting, value in mqtt_fields.items(): + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.info(f"{hostname}: Set {setting} to {value}") + else: + self.logger.info(f"{hostname}: Set MQTT Password") + else: + self.logger.error(f"{hostname}: Failed to set {setting}") + + # Save configuration (will reboot the device) + save_url = f"http://{ip}/cm?cmnd=Restart%201" + response = requests.get(save_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Saved configuration and rebooted {hostname}") + else: + self.logger.error(f"Failed to save configuration for {hostname}") + + return True + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error configuring device at {ip}: {str(e)}") + return False + + def get_device_details(self, use_current_json=True): + """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings. + Filters out devices matching unknown_device_patterns.""" + self.logger.info("Starting to gather detailed device information...") + device_details = [] + + try: + source_file = 'current.json' if use_current_json else 'tasmota.json' + with open(source_file, 'r') as f: + data = json.load(f) + all_devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") + except FileNotFoundError: + self.logger.error(f"{source_file} not found. Run discovery first.") + return + except json.JSONDecodeError: + self.logger.error(f"Invalid JSON format in {source_file}") + return + + # Filter out devices matching unknown_device_patterns + devices = [] + network_filters = self.config['unifi'].get('network_filter', {}) + unknown_patterns = [] + for network in network_filters.values(): + unknown_patterns.extend(network.get('unknown_device_patterns', [])) + + for device in all_devices: + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + + is_unknown = False + for pattern in unknown_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): + self.logger.debug(f"Skipping unknown device: {name} ({hostname})") + is_unknown = True + break + + if not is_unknown: + devices.append(device) + + self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices") + + mqtt_config = self.config.get('mqtt', {}) + if not mqtt_config: + self.logger.error("MQTT configuration missing from config file") + return + + def check_mqtt_settings(ip, name, mqtt_status): + """Check and update MQTT settings if they don't match config""" + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + mqtt_fields = { + "Host": mqtt_config.get('Host', ''), + "Port": mqtt_config.get('Port', 1883), + "User": mqtt_config.get('User', ''), + "Password": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + device_mqtt = mqtt_status.get('MqttHost', {}) + changes_needed = [] + force_password_update = False + + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['Host']: + changes_needed.append(('MqttHost', mqtt_fields['Host'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['Port']: + changes_needed.append(('MqttPort', mqtt_fields['Port'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['User']: + changes_needed.append(('MqttUser', mqtt_fields['User'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['Password'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting - FIXED: Use the actual value from config with default of False + no_retain = mqtt_config.get('NoRetain', False) + if no_retain: + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + else: + changes_needed.append(('SetOption62', '0')) # 0 = Use Retain + + # Apply changes if needed + for setting, value in changes_needed: + try: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + else: + self.logger.error(f"{name}: Failed to update {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") + + return len(changes_needed) > 0 + + for device in devices: + if not isinstance(device, dict): + self.logger.warning(f"Skipping invalid device entry: {device}") + continue + + name = device.get('name', 'Unknown') + ip = device.get('ip') + mac = device.get('mac') + + if not ip: + self.logger.warning(f"Skipping device {name} - no IP address") + continue + + self.logger.info(f"Checking device: {name} at {ip}") + + try: + # Get Status 2 for firmware version + url_status = f"http://{ip}/cm?cmnd=Status%202" + response = requests.get(url_status, timeout=5) + status_data = response.json() + + # Get Status 5 for network info + url_network = f"http://{ip}/cm?cmnd=Status%205" + response = requests.get(url_network, timeout=5) + network_data = response.json() + + # Get Status 6 for MQTT info + url_mqtt = f"http://{ip}/cm?cmnd=Status%206" + response = requests.get(url_mqtt, timeout=5) + mqtt_data = response.json() + + # Check and update MQTT settings if needed + mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) + + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), + "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), + "mqtt_status": "Updated" if mqtt_updated else "Verified", + "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), + "status": "online" + } + self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": "Unknown", + "status": "offline", + "error": str(e) + } + + device_details.append(device_detail) + time.sleep(0.5) + + # Save all device details at once + try: + with open('TasmotaDevices.json', 'w') as f: + json.dump(device_details, f, indent=2) + self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") + except Exception as e: + self.logger.error(f"Error saving device details: {e}") + +def main(): + parser = argparse.ArgumentParser(description='Tasmota Device Manager') + parser.add_argument('--config', default='network_configuration.json', + help='Path to configuration file') + parser.add_argument('--debug', action='store_true', + help='Enable debug logging') + parser.add_argument('--skip-unifi', action='store_true', + help='Skip UniFi discovery and use existing current.json') + parser.add_argument('--process-unknown', action='store_true', + help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT') + + args = parser.parse_args() + + # Set up logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + print("Starting Tasmota Device Discovery and Version Check...") + + # Create TasmotaDiscovery instance + discovery = TasmotaDiscovery(debug=args.debug) + discovery.load_config(args.config) + + try: + if not args.skip_unifi: + print("Step 1: Discovering Tasmota devices...") + discovery.setup_unifi_client() + tasmota_devices = discovery.get_tasmota_devices() + discovery.save_tasmota_config(tasmota_devices) + else: + print("Skipping UniFi discovery, using existing current.json...") + + print("\nStep 2: Getting detailed version information...") + discovery.get_device_details(use_current_json=True) + + if args.process_unknown: + print("\nStep 3: Processing unknown devices...") + discovery.process_unknown_devices() + + print("\nProcess completed successfully!") + print("- Device list saved to: current.json") + print("- Detailed information saved to: TasmotaDevices.json") + + except ConnectionError as e: + print(f"Connection Error: {str(e)}") + print("\nTrying to proceed with existing current.json...") + try: + discovery.get_device_details(use_current_json=True) + print("\nSuccessfully retrieved device details from existing current.json") + except Exception as inner_e: + print(f"Error processing existing devices: {str(inner_e)}") + return 1 + except Exception as e: + print(f"Error: {str(e)}") + if args.debug: + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/console_duplicate_template_fix_summary.md b/console_duplicate_template_fix_summary.md new file mode 100644 index 0000000..811ac23 --- /dev/null +++ b/console_duplicate_template_fix_summary.md @@ -0,0 +1,102 @@ +# Console Duplicate and Template Matching Fix Summary + +## Issues Addressed + +1. **Duplicate Console Settings**: Console settings were being applied twice during device configuration. +2. **Template Matching Failure**: The template matching algorithm was not handling the response format correctly, causing the config_other settings to not be applied. + +## Changes Made + +### 1. Fix for Duplicate Console Settings + +The console settings were being applied in two places: + +1. In `configure_mqtt_settings()` called from `check_mqtt_settings()` +2. Directly in `get_device_details()` + +To fix this issue: + +1. Added a `skip_console` parameter to `configure_mqtt_settings()`: + ```python + def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False, skip_console=False): + ``` + +2. Modified the function to skip console settings if `skip_console` is True: + ```python + # Apply console settings + console_updated = False + console_params = mqtt_config.get('console', {}) + if console_params and not skip_console: + # Console settings application code... + ``` + +3. Updated `check_mqtt_settings()` to pass `skip_console=True`: + ```python + return self.configure_mqtt_settings( + ip=ip, + name=name, + mqtt_status=mqtt_status, + is_new_device=False, + set_friendly_name=False, + enable_mqtt=False, + with_retry=True, + reboot=False, + skip_console=True # Skip console settings here as they'll be applied separately + ) + ``` + +This ensures that console settings are only applied once, directly in `get_device_details()`. + +### 2. Fix for Template Matching Failure + +The template matching algorithm was not handling the response format correctly. The function expected a "Template" key in the response, but the actual response had a different structure. + +To fix this issue: + +1. Added logging of the actual response format for debugging: + ```python + self.logger.debug(f"{name}: Template response: {template_data}") + ``` + +2. Enhanced the template extraction logic to handle different response formats: + ```python + # Extract current template - handle different response formats + current_template = "" + + # Try different possible response formats + if "Template" in template_data: + current_template = template_data.get("Template", "") + elif isinstance(template_data, dict) and len(template_data) > 0: + # If there's no "Template" key but we have a dict, try to get the first value + # This handles cases where the response might be {"NAME":"...","GPIO":[...]} + first_key = next(iter(template_data)) + if isinstance(template_data[first_key], str) and "{" in template_data[first_key]: + current_template = template_data[first_key] + self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}") + # Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys + elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']): + # Convert the dict to a JSON string to match the expected format + import json + current_template = json.dumps(template_data) + self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys") + ``` + +This allows the function to handle the specific response format with 'NAME', 'GPIO', 'FLAG', and 'BASE' keys, which is what the OfficeLight device returns. + +## Testing and Verification + +The changes were tested with the OfficeLight device and both issues were resolved: + +1. Console settings are now only applied once +2. Template matching is working correctly and updating the template as needed + +The TasmotaDevices.json file confirms that the template was successfully updated, with `"template_status": "Updated"`. + +## Conclusion + +These changes optimize the device configuration process by: + +1. Eliminating duplicate application of console settings +2. Improving the template matching algorithm to handle different response formats + +This ensures that all configuration steps (MQTT, config_other, and console) are applied correctly and efficiently. \ No newline at end of file diff --git a/console_settings_optimization.md b/console_settings_optimization.md new file mode 100644 index 0000000..ca80992 --- /dev/null +++ b/console_settings_optimization.md @@ -0,0 +1,78 @@ +# Console Settings Optimization + +## Issue Description + +The issue was related to how console settings were being applied in the TasmotaManager code. The original implementation used a `skip_console` parameter in the `configure_mqtt_settings` function to prevent console settings from being applied twice: + +1. Once in `configure_mqtt_settings` (but skipped with `skip_console=True`) +2. Again directly in `get_device_details` + +The question was raised: "I question why the skip_console was needed. Seems like the console settings before thecheck_mqtt_settings should be the one deleted?" + +## Analysis + +After reviewing the code, I found that: + +1. In `get_device_details`, it calls `check_mqtt_settings` which calls `configure_mqtt_settings` with `skip_console=True`. This prevents `configure_mqtt_settings` from applying console settings. + +2. Later in `get_device_details`, console settings are applied directly with a large block of code that duplicates functionality already present in `configure_mqtt_settings`. + +3. In `configure_unknown_device`, it calls `configure_mqtt_settings` without specifying `skip_console`, so it uses the default value of `False`. This means console settings are applied when configuring unknown devices. + +This approach added unnecessary complexity with the `skip_console` parameter and made the code less intuitive (why skip in one place and apply in another?). + +## Changes Made + +I implemented the following changes to optimize the code: + +1. Removed the `skip_console` parameter from the `configure_mqtt_settings` function signature: + ```python + def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False): + ``` + +2. Updated the condition in `configure_mqtt_settings` to always apply console settings: + ```python + # Apply console settings + console_updated = False + console_params = mqtt_config.get('console', {}) + if console_params: + self.logger.info(f"{name}: Setting console parameters from configuration") + ``` + +3. Updated `check_mqtt_settings` to call `configure_mqtt_settings` without the `skip_console` parameter: + ```python + return self.configure_mqtt_settings( + ip=ip, + name=name, + mqtt_status=mqtt_status, + is_new_device=False, + set_friendly_name=False, + enable_mqtt=False, + with_retry=True, + reboot=False + ) + ``` + +4. Removed the console settings application code in `get_device_details` and replaced it with: + ```python + # Console settings are now applied in configure_mqtt_settings + console_updated = mqtt_updated + ``` + +## Benefits + +These changes provide several benefits: + +1. **Simplified Code**: Removed the `skip_console` parameter and eliminated duplicate code. + +2. **More Intuitive Design**: Console settings are now applied in the same place as MQTT settings, making the code more logical and easier to understand. + +3. **Reduced Maintenance**: With only one place to update console settings logic, future changes will be easier to implement. + +4. **Consistent Behavior**: Console settings are now applied consistently for both unknown and known devices. + +## Testing + +The changes were tested to ensure that console settings are still applied correctly. The `console_updated` flag is now set based on the result of the MQTT settings update, which includes console settings application. + +This approach maintains all the functionality of the original code while making it more maintainable and easier to understand. \ No newline at end of file diff --git a/console_settings_summary.md b/console_settings_summary.md new file mode 100644 index 0000000..4530c3e --- /dev/null +++ b/console_settings_summary.md @@ -0,0 +1,55 @@ +# Console Settings for Unknown Devices - Implementation Summary + +## Requirement +For all unknown devices, once the MQTT and hostname are updated but before the reboot, continue with the console settings. Then reboot the device. + +## Changes Made + +1. Modified the `configure_unknown_device` method in `TasmotaManager.py` to: + - Apply console settings from the configuration after setting MQTT parameters but before rebooting + - Handle special cases for retain parameters (ButtonRetain, SwitchRetain, PowerRetain) + - Auto-enable rules that are defined in the configuration + - Maintain the same logging and error handling as the rest of the application + +2. Created a test script `test_unknown_device_console_settings.py` to verify the functionality: + - The script takes a device identifier (IP or hostname) as an argument + - It displays the console parameters that will be applied from the configuration + - It processes the device using the modified `configure_unknown_device` method + - This allows testing that console settings are applied to unknown devices before rebooting + +## Implementation Details + +### Console Settings Application +The implementation applies console settings in the following order: +1. First, it handles retain parameters (ButtonRetain, SwitchRetain, PowerRetain) with special logic: + - For each retain parameter, it first sets the opposite state + - Then it sets the desired state + - This ensures the MQTT broker's retain flags are properly updated + +2. Next, it processes all other console parameters: + - It identifies rule definitions (rule1, rule2, etc.) for auto-enabling + - It applies each parameter with a simple HTTP request + +3. Finally, it auto-enables any rules that were defined: + - If a rule definition (e.g., rule1) is found, it automatically enables the rule (Rule1 ON) + - This ensures rules are active after the device reboots + +### Testing +To test this functionality: +``` +./test_unknown_device_console_settings.py +``` + +Where `` is either the IP address or hostname of the device you want to process. + +The test script will: +1. Display the console parameters from the configuration +2. Process the device, applying hostname, MQTT settings, and console settings +3. Report whether the processing was successful + +## Expected Behavior +After this change, when an unknown device is processed: +1. The hostname and MQTT settings will be updated +2. All console settings from the configuration will be applied +3. The device will be rebooted +4. Upon restart, the device will have all settings properly configured \ No newline at end of file diff --git a/fulltopic_fix_summary.md b/fulltopic_fix_summary.md new file mode 100644 index 0000000..a5a1d64 --- /dev/null +++ b/fulltopic_fix_summary.md @@ -0,0 +1,44 @@ +# FullTopic Parameter Fix Summary + +## Issue Description +When setting the MQTT parameters for FullTopic, the Full Topic was ending up with a %20 at the beginning, as in "%20%prefix%/%topic%/" instead of the correct "%prefix%/%topic%/". + +## Root Cause +The issue was in the URL construction when sending commands to Tasmota devices. The code was using a space (%20 in URL encoding) between the command name and its value for all parameters: + +```python +url = f"http://{ip}/cm?cmnd={setting}%20{value}" +``` + +While this works for most parameters, it causes problems with the FullTopic parameter because the space gets included in the value. + +## Fix Implemented +The fix adds special handling for the FullTopic parameter by using "=" instead of a space (%20) between the command and value: + +```python +# For FullTopic, we need to avoid adding a space (%20) between the command and value +if setting == "FullTopic": + url = f"http://{ip}/cm?cmnd={setting}={value}" +else: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" +``` + +This change was implemented in two places: +1. In the `configure_unknown_device` method (around line 542) +2. In the MQTT settings update code (around line 937) + +## Testing +A test script `test_fulltopic_fix.py` was created to verify the fix. The script: +1. Connects to a Tasmota device +2. Sets the FullTopic parameter using the new method +3. Verifies that the FullTopic is set correctly without the %20 prefix + +To run the test: +``` +./test_fulltopic_fix.py +``` + +Where `` is the IP address of a Tasmota device to test with. + +## Expected Result +After this fix, the FullTopic parameter should be set correctly as "%prefix%/%topic%/" without the unwanted %20 at the beginning. \ No newline at end of file diff --git a/implementation_summary.md b/implementation_summary.md new file mode 100644 index 0000000..0b0ea03 --- /dev/null +++ b/implementation_summary.md @@ -0,0 +1,27 @@ +# Implementation Summary + +## Requirement +For a single device when identified as unknown device, the script should toggle the device at a 1/2 Hz rate and wait for the user to enter a new Host Name. + +## Changes Made + +1. Modified the `process_single_device` method in `TasmotaManager.py` to: + - Check if a device identified as unknown has a toggle button + - If it does, toggle the device at 1/2 Hz rate (toggling every 2 seconds) + - Display information about the device to help the user identify it + - Prompt the user to enter a new hostname for the device + - Configure the device with the new hostname if provided + +2. Created a test script `test_unknown_device_toggle.py` to verify the functionality: + - The script takes a device identifier (IP or hostname) as an argument + - It processes the device using the modified `process_single_device` method + - This allows testing the toggling functionality for a single unknown device + +## Testing + +To test this functionality: +``` +./test_unknown_device_toggle.py +``` + +Where `` is either the IP address or hostname of the device you want to process. \ No newline at end of file diff --git a/network_configuration.json b/network_configuration.json index 0036d26..2655006 100644 --- a/network_configuration.json +++ b/network_configuration.json @@ -28,6 +28,15 @@ "Topic": "%hostname_base%", "FullTopic": "%prefix%/%topic%/", "NoRetain": false, + "config_other": { + "TreatLife_SW_SS01S": "{\"NAME\":\"TL SS01S Swtch\",\"GPIO\":[0,0,0,0,52,158,0,0,21,17,0,0,0],\"FLAG\":0,\"BASE\":18}\n", + "TreatLife_SW_SS02S": "{\"NAME\":\"Treatlife SS02\",\"GPIO\":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],\"FLAG\":0,\"BASE\":18}", + "TreatLife_DIM_DS02S": "{\"NAME\":\"DS02S Dimmer\",\"GPIO\":[0,107,0,108,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":54}", + "CloudFree_SW1": "{\"NAME\":\"CloudFree SW1\",\"GPIO\":[0,224,0,32,320,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":1}", + "Gosund_WP5_Plug": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,17,0,0,0,56,57,21,0,0],\"FLAG\":0,\"BASE\":18}", + "CloudFree_X10S_Plug": "{\"NAME\":\"Aoycocr X10S\",\"GPIO\":[56,0,57,0,21,134,0,0,131,17,132,0,0],\"FLAG\":0,\"BASE\":45}\n", + "Sonoff_S31_PM_Plug": "{\"NAME\":\"Sonoff S31\",\"GPIO\":[17,145,0,146,0,0,0,0,21,56,0,0,0],\"FLAG\":0,\"BASE\":41}" + }, "console": { "SwitchRetain": "Off", "ButtonRetain": "Off", diff --git a/process_unknown_optimization_summary.md b/process_unknown_optimization_summary.md new file mode 100644 index 0000000..947e46f --- /dev/null +++ b/process_unknown_optimization_summary.md @@ -0,0 +1,50 @@ +# Process Unknown Devices Optimization Summary + +## Issue Description +When using the `--process-unknown` flag, the script was unnecessarily getting detailed information for devices that don't match the unknown_device_patterns. This was inefficient because the script was processing all devices first, then filtering out the unknown ones, and then processing only the unknown ones. + +## Root Cause +The issue was in the main function of TasmotaManager.py. The script was calling `get_device_details()` for all devices before calling `process_unknown_devices()`. The `get_device_details()` method filters out devices matching unknown_device_patterns, which means it was processing all devices except those that match the patterns. This is the opposite of what we want when processing unknown devices. + +```python +# Original code +print("\nStep 2: Getting detailed version information...") +discovery.get_device_details(use_current_json=True) + +if args.process_unknown: + print("\nStep 3: Processing unknown devices...") + discovery.process_unknown_devices() +``` + +## Fix Implemented +The fix was to modify the main function to skip the `get_device_details()` call when the `--process-unknown` flag is used. This ensures that we're not wasting time getting detailed information for devices that we don't need to process. + +```python +# Modified code +if args.process_unknown: + print("\nStep 2: Processing unknown devices...") + discovery.process_unknown_devices() +else: + print("\nStep 2: Getting detailed version information...") + discovery.get_device_details(use_current_json=True) +``` + +## Testing +A test script `test_process_unknown_optimization.py` was created to verify the fix. The script: +1. Runs TasmotaManager with the `--process-unknown` flag and captures the output +2. Checks that the output contains "Processing unknown devices" but not "Getting detailed version information" +3. Counts how many unknown devices were processed +4. Loads the network_configuration.json to get the unknown_device_patterns + +To run the test: +``` +./test_process_unknown_optimization.py +``` + +## Expected Result +After this fix, when the `--process-unknown` flag is used, the script will only process devices that match the unknown_device_patterns, skipping the detailed information gathering for all other devices. This makes the script more efficient and focused on its task. + +## Benefits +1. **Improved Performance**: The script no longer wastes time processing devices that it doesn't need to. +2. **Reduced Network Traffic**: Fewer HTTP requests are made to devices that don't need to be processed. +3. **Clearer Workflow**: The script now has a more logical flow, either processing all devices or only unknown devices, not both. \ No newline at end of file diff --git a/template_activation_fix_summary.md b/template_activation_fix_summary.md new file mode 100644 index 0000000..125af60 --- /dev/null +++ b/template_activation_fix_summary.md @@ -0,0 +1,96 @@ +# Template Activation Fix Summary + +## Issue Description + +The issue was that templates were not being properly activated after being set. In the Tasmota web UI, there's an "Activate" checkbox that needs to be checked when applying a template. Without checking this box, the template is set but not activated. + +In our code, we were setting the template using the Template command, but we weren't activating it, which is equivalent to not checking the "Activate" box in the web UI. + +## Changes Made + +### 1. Understanding the Template Activation Process + +In Tasmota, to fully activate a template, three steps are required: +1. Set the template using the `Template` command +2. Set the module to 0 (Template module) using the `Module 0` command +3. Restart the device using the `Restart 1` command + +### 2. Modifications to `check_and_update_template` Method + +We modified the `check_and_update_template` method in `TasmotaManager.py` to include the template activation steps. Changes were made in two places: + +#### When a template is updated: + +```python +# URL encode the template value +import urllib.parse +encoded_value = urllib.parse.quote(template_value) +url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}" + +response = requests.get(url, timeout=5) +if response.status_code == 200: + self.logger.info(f"{name}: Template updated successfully") + + # Activate the template by setting module to 0 (Template module) + self.logger.info(f"{name}: Activating template by setting module to 0") + module_url = f"http://{ip}/cm?cmnd=Module%200" + module_response = requests.get(module_url, timeout=5) + + if module_response.status_code == 200: + self.logger.info(f"{name}: Module set to 0 successfully") + + # Restart the device to apply the template + self.logger.info(f"{name}: Restarting device to apply template") + restart_url = f"http://{ip}/cm?cmnd=Restart%201" + restart_response = requests.get(restart_url, timeout=5) + + if restart_response.status_code == 200: + self.logger.info(f"{name}: Device restart initiated successfully") + template_updated = True + else: + self.logger.error(f"{name}: Failed to restart device") + else: + self.logger.error(f"{name}: Failed to set module to 0") +else: + self.logger.error(f"{name}: Failed to update template") +``` + +#### When a device name is updated: + +Similar changes were made when a device name is updated to match a template. After successfully updating the device name, we added code to set the module to 0 and restart the device. + +### 3. Test Script + +A test script `test_template_activation.py` was created to verify that templates are properly activated. The script: + +1. Gets a test device from current.json +2. Gets a template from network_configuration.json +3. Sets the template on the device and activates it +4. Verifies that the template was properly activated by checking: + - The module is set to 0 (Template module) + - The template matches the expected value + +## How to Test + +To test the template activation fix: + +1. Run the test script: + ``` + python3 test_template_activation.py + ``` + +2. The script will output information about the template activation process and verify that the template was properly activated. + +3. You can also manually test by: + - Running TasmotaManager with a device that has a template defined in network_configuration.json + - Checking the device's module and template after TasmotaManager has processed it + +## Expected Results + +After the fix, when a template is set or a device name is updated to match a template: + +1. The template should be properly set on the device +2. The module should be set to 0 (Template module) +3. The device should restart to apply the template + +This ensures that templates are fully activated, equivalent to checking the "Activate" box in the Tasmota web UI. \ No newline at end of file diff --git a/test_fulltopic_fix.py b/test_fulltopic_fix.py new file mode 100755 index 0000000..9ac62e6 --- /dev/null +++ b/test_fulltopic_fix.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python3 +""" +Test script to verify the FullTopic parameter is set correctly without a %20 prefix. +This script will: +1. Connect to a Tasmota device +2. Set the FullTopic parameter +3. Verify the FullTopic is set correctly without a %20 prefix +""" + +import sys +import logging +import requests +import json +import argparse + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger("FullTopicTest") + +def load_config(): + """Load the network configuration.""" + try: + with open('network_configuration.json', 'r') as f: + return json.load(f) + except Exception as e: + logger.error(f"Error loading configuration: {str(e)}") + sys.exit(1) + +def test_fulltopic_setting(ip_address): + """Test setting the FullTopic parameter on a device.""" + logger.info(f"Testing FullTopic setting on device at {ip_address}") + + # Load configuration + config = load_config() + mqtt_config = config.get('mqtt', {}) + + if not mqtt_config: + logger.error("No MQTT configuration found") + return False + + # Get the FullTopic value from configuration + full_topic = mqtt_config.get('FullTopic', '%prefix%/%topic%/') + logger.info(f"FullTopic from configuration: {full_topic}") + + # First, check the current FullTopic value + try: + status_url = f"http://{ip_address}/cm?cmnd=FullTopic" + response = requests.get(status_url, timeout=5) + if response.status_code == 200: + current_value = response.text + logger.info(f"Current FullTopic value: {current_value}") + else: + logger.error(f"Failed to get current FullTopic value: {response.status_code}") + return False + except requests.exceptions.RequestException as e: + logger.error(f"Error connecting to device: {str(e)}") + return False + + # Set the FullTopic using the fixed method (with = instead of %20) + try: + set_url = f"http://{ip_address}/cm?cmnd=FullTopic={full_topic}" + logger.info(f"Setting FullTopic with URL: {set_url}") + response = requests.get(set_url, timeout=5) + if response.status_code == 200: + logger.info(f"Response from setting FullTopic: {response.text}") + else: + logger.error(f"Failed to set FullTopic: {response.status_code}") + return False + except requests.exceptions.RequestException as e: + logger.error(f"Error setting FullTopic: {str(e)}") + return False + + # Verify the FullTopic was set correctly + try: + verify_url = f"http://{ip_address}/cm?cmnd=FullTopic" + response = requests.get(verify_url, timeout=5) + if response.status_code == 200: + new_value = response.text + logger.info(f"New FullTopic value: {new_value}") + + # Check if the value contains %20 at the beginning + if "%20" in new_value: + logger.error("FullTopic still contains %20 - fix not working") + return False + else: + logger.info("FullTopic set correctly without %20") + return True + else: + logger.error(f"Failed to verify FullTopic: {response.status_code}") + return False + except requests.exceptions.RequestException as e: + logger.error(f"Error verifying FullTopic: {str(e)}") + return False + +def main(): + """Main function to test the FullTopic fix.""" + parser = argparse.ArgumentParser(description='Test FullTopic parameter setting') + parser.add_argument('ip_address', help='IP address of the Tasmota device to test') + args = parser.parse_args() + + if not args.ip_address: + print("Usage: python test_fulltopic_fix.py ") + sys.exit(1) + + result = test_fulltopic_setting(args.ip_address) + + if result: + print("SUCCESS: FullTopic set correctly without %20 prefix") + sys.exit(0) + else: + print("FAILURE: FullTopic not set correctly") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_hostname_matching.py b/test_hostname_matching.py new file mode 100755 index 0000000..be2965b --- /dev/null +++ b/test_hostname_matching.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python3 +""" +Test script for hostname matching in TasmotaManager.py + +This script tests the hostname matching functionality with various patterns: +1. Exact match +2. Partial match +3. Wildcard match +4. Multiple matches +""" + +import subprocess +import sys +import os + +def run_test(test_name, hostname_pattern): + """Run a test with the given hostname pattern""" + print(f"\n{'='*80}") + print(f"TEST: {test_name}") + print(f"Pattern: {hostname_pattern}") + print(f"{'='*80}") + + # Run the TasmotaManager.py script with the --Device parameter and --debug flag + cmd = ["python3", "TasmotaManager.py", "--Device", hostname_pattern, "--debug"] + print(f"Running command: {' '.join(cmd)}") + + # Run the command and capture output + process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + stdout, stderr = process.communicate() + + # Print the output + print("\nSTDOUT:") + print(stdout) + + if stderr: + print("\nSTDERR:") + print(stderr) + + print(f"\nExit code: {process.returncode}") + return process.returncode + +def main(): + """Run all tests""" + # Test 1: Exact match + run_test("Exact Match", "MasterLamp-5891") + + # Test 2: Partial match + run_test("Partial Match", "Master") + + # Test 3: Wildcard match + run_test("Wildcard Match", "Master*") + + # Test 4: Wildcard match with * on both sides + run_test("Wildcard Match (both sides)", "*Lamp*") + + # Test 5: Multiple matches (should match multiple devices and use the first one) + run_test("Multiple Matches", "M") + + print("\nAll tests completed!") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_process_unknown_optimization.py b/test_process_unknown_optimization.py new file mode 100755 index 0000000..d044aa7 --- /dev/null +++ b/test_process_unknown_optimization.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python3 +""" +Test script to verify the optimization for processing unknown devices. +This script will run TasmotaManager with the --process-unknown flag +and verify that it only processes devices that match the unknown_device_patterns. +""" + +import sys +import logging +import subprocess +import os +import json + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +logger = logging.getLogger("ProcessUnknownTest") + +def test_process_unknown_optimization(): + """Test that the --process-unknown flag skips detailed information gathering for non-matching devices.""" + logger.info("Testing process-unknown optimization") + + # Check if current.json exists + if not os.path.exists('current.json'): + logger.error("current.json not found. Run discovery first.") + return False + + # Run TasmotaManager with --process-unknown flag and capture output + logger.info("Running TasmotaManager with --process-unknown flag") + try: + result = subprocess.run( + ["python", "TasmotaManager.py", "--process-unknown", "--skip-unifi", "--debug"], + capture_output=True, + text=True, + check=True + ) + output = result.stdout + result.stderr + logger.info("TasmotaManager completed successfully") + except subprocess.CalledProcessError as e: + logger.error(f"Error running TasmotaManager: {e}") + logger.error(f"Output: {e.stdout}") + logger.error(f"Error: {e.stderr}") + return False + + # Check that the output contains "Processing unknown devices" but not "Getting detailed version information" + if "Step 2: Processing unknown devices" in output and "Getting detailed version information" not in output: + logger.info("Verified that detailed version information gathering was skipped") + else: + logger.error("Failed to verify that detailed version information gathering was skipped") + return False + + # Check the log for evidence that only unknown devices were processed + unknown_devices_processed = 0 + for line in output.splitlines(): + if "Processing unknown device:" in line: + unknown_devices_processed += 1 + logger.info(f"Found log entry: {line.strip()}") + + logger.info(f"Found {unknown_devices_processed} unknown devices processed") + + # Load network_configuration.json to get unknown_device_patterns + try: + with open('network_configuration.json', 'r') as f: + config = json.load(f) + + network_filters = config['unifi'].get('network_filter', {}) + unknown_patterns = [] + for network in network_filters.values(): + unknown_patterns.extend(network.get('unknown_device_patterns', [])) + + logger.info(f"Found {len(unknown_patterns)} unknown device patterns in configuration") + for pattern in unknown_patterns: + logger.info(f" - {pattern}") + except Exception as e: + logger.error(f"Error loading configuration: {e}") + return False + + logger.info("Test completed successfully") + return True + +def main(): + """Main function to run the test.""" + print("Testing process-unknown optimization") + + result = test_process_unknown_optimization() + + if result: + print("\nSUCCESS: The optimization for processing unknown devices is working correctly") + print("The script only processes devices that match the unknown_device_patterns") + sys.exit(0) + else: + print("\nFAILURE: The optimization for processing unknown devices is not working correctly") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_rule1_encoding.py b/test_rule1_encoding.py new file mode 100644 index 0000000..b8739ab --- /dev/null +++ b/test_rule1_encoding.py @@ -0,0 +1,106 @@ +#!/usr/bin/env python3 +import requests +import urllib.parse +import time +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Device to test - use the same device from test_rule1_device_mode.py +DEVICE_IP = "192.168.8.35" + +# Rule1 value from network_configuration.json +RULE1_VALUE = "on button1#state=10 do power0 toggle endon" + +def check_rule1(): + """Check the current rule1 setting on the device""" + url = f"http://{DEVICE_IP}/cm?cmnd=rule1" + logger.info(f"Checking rule1: {url}") + response = requests.get(url, timeout=5) + if response.status_code == 200: + logger.info(f"Rule1 response: {response.text}") + return response.text + else: + logger.error(f"Failed to get rule1: HTTP {response.status_code}") + return None + +def set_rule1_with_encoding(): + """Set rule1 with proper URL encoding""" + # URL encode the rule value + encoded_value = urllib.parse.quote(RULE1_VALUE) + url = f"http://{DEVICE_IP}/cm?cmnd=rule1%20{encoded_value}" + + logger.info(f"Setting rule1 with encoding: {url}") + response = requests.get(url, timeout=5) + if response.status_code == 200: + logger.info(f"Set rule1 response: {response.text}") + return True + else: + logger.error(f"Failed to set rule1: HTTP {response.status_code}") + return False + +def enable_rule1(): + """Enable rule1""" + url = f"http://{DEVICE_IP}/cm?cmnd=Rule1%201" + logger.info(f"Enabling rule1: {url}") + response = requests.get(url, timeout=5) + if response.status_code == 200: + logger.info(f"Enable rule1 response: {response.text}") + return True + else: + logger.error(f"Failed to enable rule1: HTTP {response.status_code}") + return False + +def main(): + # Check current rule1 + logger.info("Checking current rule1") + current_rule1 = check_rule1() + + # Set rule1 with proper URL encoding + logger.info("Setting rule1 with proper URL encoding") + success = set_rule1_with_encoding() + if not success: + logger.error("Failed to set rule1") + return 1 + + # Wait for the command to take effect + logger.info("Waiting for command to take effect...") + time.sleep(2) + + # Check rule1 after setting + logger.info("Checking rule1 after setting") + after_set_rule1 = check_rule1() + + # Enable rule1 + logger.info("Enabling rule1") + success = enable_rule1() + if not success: + logger.error("Failed to enable rule1") + return 1 + + # Wait for the command to take effect + logger.info("Waiting for command to take effect...") + time.sleep(2) + + # Check rule1 after enabling + logger.info("Checking rule1 after enabling") + after_enable_rule1 = check_rule1() + + # Compare with expected value + if RULE1_VALUE in after_enable_rule1: + logger.info("SUCCESS: rule1 was correctly set!") + return 0 + else: + logger.error(f"FAILURE: rule1 was not set correctly!") + logger.error(f" Expected: {RULE1_VALUE}") + logger.error(f" Actual: {after_enable_rule1}") + return 1 + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_template_activation.py b/test_template_activation.py new file mode 100644 index 0000000..ce50b68 --- /dev/null +++ b/test_template_activation.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +""" +Test script to verify that templates are properly activated after being set. + +This script: +1. Gets a test device from current.json +2. Sets a template on the device +3. Verifies that the template was properly activated +""" + +import json +import logging +import requests +import time +import sys +import os + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Import TasmotaManager class +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from TasmotaManager import TasmotaDiscovery + +def get_test_device(): + """Get a test device from current.json""" + try: + with open('current.json', 'r') as f: + data = json.load(f) + devices = data.get('tasmota', {}).get('devices', []) + if devices: + return devices[0] # Use the first device + else: + logger.error("No devices found in current.json") + return None + except Exception as e: + logger.error(f"Error reading current.json: {e}") + return None + +def get_template_from_config(): + """Get a template from network_configuration.json""" + try: + with open('network_configuration.json', 'r') as f: + config = json.load(f) + templates = config.get('mqtt', {}).get('config_other', {}) + if templates: + # Get the first template + template_key = next(iter(templates)) + template_value = templates[template_key] + return template_key, template_value + else: + logger.error("No templates found in network_configuration.json") + return None, None + except Exception as e: + logger.error(f"Error reading network_configuration.json: {e}") + return None, None + +def check_device_module(ip): + """Check the current module of the device""" + try: + url = f"http://{ip}/cm?cmnd=Module" + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + logger.info(f"Module response: {data}") + + # Extract module information + if "Module" in data: + module = data["Module"] + return module + else: + logger.error(f"Unexpected response format: {data}") + return None + else: + logger.error(f"Failed to get module: HTTP {response.status_code}") + return None + except Exception as e: + logger.error(f"Error checking module: {e}") + return None + +def check_template_on_device(ip): + """Check the current template on the device""" + try: + url = f"http://{ip}/cm?cmnd=Template" + response = requests.get(url, timeout=5) + if response.status_code == 200: + data = response.json() + logger.info(f"Template response: {data}") + + # Extract template information + template = None + if "Template" in data: + template = data["Template"] + elif isinstance(data, dict) and len(data) > 0: + # If there's no "Template" key but we have a dict, try to get the first value + first_key = next(iter(data)) + if isinstance(data[first_key], str) and "{" in data[first_key]: + template = data[first_key] + # Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys + elif all(key in data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']): + import json + template = json.dumps(data) + + return template + else: + logger.error(f"Failed to get template: HTTP {response.status_code}") + return None + except Exception as e: + logger.error(f"Error checking template: {e}") + return None + +def set_template_on_device(ip, template_value): + """Set a template on the device and activate it""" + try: + # URL encode the template value + import urllib.parse + encoded_value = urllib.parse.quote(template_value) + url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}" + + logger.info(f"Setting template: {url}") + response = requests.get(url, timeout=5) + if response.status_code == 200: + logger.info(f"Template set response: {response.text}") + + # Set module to 0 to activate the template + logger.info("Setting module to 0 to activate template") + module_url = f"http://{ip}/cm?cmnd=Module%200" + module_response = requests.get(module_url, timeout=5) + + if module_response.status_code == 200: + logger.info(f"Module set response: {module_response.text}") + + # Restart the device to apply the template + logger.info("Restarting device to apply template") + restart_url = f"http://{ip}/cm?cmnd=Restart%201" + restart_response = requests.get(restart_url, timeout=5) + + if restart_response.status_code == 200: + logger.info("Device restart initiated successfully") + return True + else: + logger.error(f"Failed to restart device: HTTP {restart_response.status_code}") + else: + logger.error(f"Failed to set module: HTTP {module_response.status_code}") + else: + logger.error(f"Failed to set template: HTTP {response.status_code}") + + return False + except Exception as e: + logger.error(f"Error setting template: {e}") + return False + +def main(): + """Main test function""" + # Get a test device + device = get_test_device() + if not device: + logger.error("No test device available. Run discovery first.") + return 1 + + device_name = device.get('name') + device_ip = device.get('ip') + + logger.info(f"Testing with device: {device_name} (IP: {device_ip})") + + # Get a template from the configuration + template_key, template_value = get_template_from_config() + if not template_key or not template_value: + logger.error("No template available in configuration.") + return 1 + + logger.info(f"Using template: {template_key} = {template_value}") + + # Check current module and template + logger.info("Checking current module and template") + current_module = check_device_module(device_ip) + current_template = check_template_on_device(device_ip) + + logger.info(f"Current module: {current_module}") + logger.info(f"Current template: {current_template}") + + # Set the template on the device + logger.info("Setting and activating template") + success = set_template_on_device(device_ip, template_value) + if not success: + logger.error("Failed to set and activate template") + return 1 + + # Wait for the device to restart + logger.info("Waiting for device to restart...") + time.sleep(10) + + # Check module and template after restart + logger.info("Checking module and template after restart") + after_module = check_device_module(device_ip) + after_template = check_template_on_device(device_ip) + + logger.info(f"Module after restart: {after_module}") + logger.info(f"Template after restart: {after_template}") + + # Verify that the template was activated + if after_module == 0: + logger.info("SUCCESS: Module is set to 0 (Template module)") + else: + logger.error(f"FAILURE: Module is not set to 0, got {after_module}") + return 1 + + # Compare templates (this is approximate since formatting might differ) + import json + try: + # Try to parse both as JSON for comparison + template_json = json.loads(template_value) + after_json = json.loads(after_template) if after_template else None + + if after_json and all(key in after_json for key in ['NAME', 'GPIO', 'FLAG', 'BASE']): + logger.info("SUCCESS: Template appears to be correctly set and activated") + return 0 + else: + logger.error("FAILURE: Template does not appear to be correctly set") + return 1 + except json.JSONDecodeError: + # If JSON parsing fails, do a simple string comparison + if template_value == after_template: + logger.info("SUCCESS: Template appears to be correctly set and activated") + return 0 + else: + logger.error("FAILURE: Template does not appear to be correctly set") + return 1 + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file diff --git a/test_template_matching.py b/test_template_matching.py new file mode 100644 index 0000000..445b400 --- /dev/null +++ b/test_template_matching.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +""" +Test script to verify the template matching algorithm in TasmotaManager.py. + +This script simulates different scenarios to ensure the algorithm works correctly: +1. Key matches Device Name, Template matches value +2. Key matches Device Name, Template doesn't match value +3. No key matches Device Name, but a value matches Template +4. No matches at all +""" + +import json +import logging +import requests +import unittest +from unittest.mock import patch, MagicMock + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' +) +logger = logging.getLogger(__name__) + +# Import TasmotaManager class +import sys +import os +sys.path.append(os.path.dirname(os.path.abspath(__file__))) +from TasmotaManager import TasmotaDiscovery + +class TestTemplateMatching(unittest.TestCase): + """Test cases for template matching algorithm.""" + + def setUp(self): + """Set up test environment.""" + self.discovery = TasmotaDiscovery(debug=True) + + # Create a mock config with mqtt.config_other + self.discovery.config = { + 'mqtt': { + 'config_other': { + 'TreatLife_SW_SS01S': '{"NAME":"TL SS01S Swtch","GPIO":[0,0,0,0,52,158,0,0,21,17,0,0,0],"FLAG":0,"BASE":18}', + 'TreatLife_SW_SS02S': '{"NAME":"Treatlife SS02","GPIO":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],"FLAG":0,"BASE":18}' + } + } + } + + @patch('requests.get') + def test_key_matches_template_matches(self, mock_get): + """Test when key matches Device Name and template matches value.""" + # Mock responses for Status 0 and Template commands + mock_responses = [ + # Status 0 response + MagicMock( + status_code=200, + json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}} + ), + # Template response + MagicMock( + status_code=200, + json=lambda: {"Template": '{"NAME":"TL SS01S Swtch","GPIO":[0,0,0,0,52,158,0,0,21,17,0,0,0],"FLAG":0,"BASE":18}'} + ) + ] + mock_get.side_effect = mock_responses + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertFalse(result) # No update needed + self.assertEqual(mock_get.call_count, 2) # Only Status 0 and Template calls + + # Log the result + logger.info("Test 1: Key matches Device Name, Template matches value - PASSED") + + @patch('requests.get') + def test_key_matches_template_doesnt_match(self, mock_get): + """Test when key matches Device Name but template doesn't match value.""" + # Mock responses for Status 0, Template, and Template update commands + mock_responses = [ + # Status 0 response + MagicMock( + status_code=200, + json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}} + ), + # Template response + MagicMock( + status_code=200, + json=lambda: {"Template": '{"NAME":"Different Template","GPIO":[0,0,0,0,0,0,0,0,0,0,0,0,0],"FLAG":0,"BASE":18}'} + ), + # Template update response + MagicMock( + status_code=200, + json=lambda: {"Template": "Done"} + ) + ] + mock_get.side_effect = mock_responses + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertTrue(result) # Template was updated + self.assertEqual(mock_get.call_count, 3) # Status 0, Template, and Template update calls + + # Log the result + logger.info("Test 2: Key matches Device Name, Template doesn't match value - PASSED") + + @patch('requests.get') + def test_no_key_matches_value_matches(self, mock_get): + """Test when no key matches Device Name but a value matches Template.""" + # Mock responses for Status 0, Template, and DeviceName update commands + mock_responses = [ + # Status 0 response + MagicMock( + status_code=200, + json=lambda: {"Status": {"DeviceName": "Unknown_Device"}} + ), + # Template response + MagicMock( + status_code=200, + json=lambda: {"Template": '{"NAME":"Treatlife SS02","GPIO":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],"FLAG":0,"BASE":18}'} + ), + # DeviceName update response + MagicMock( + status_code=200, + json=lambda: {"DeviceName": "Done"} + ) + ] + mock_get.side_effect = mock_responses + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertTrue(result) # Device name was updated + self.assertEqual(mock_get.call_count, 3) # Status 0, Template, and DeviceName update calls + + # Log the result + logger.info("Test 3: No key matches Device Name, but a value matches Template - PASSED") + + @patch('requests.get') + def test_no_matches_at_all(self, mock_get): + """Test when there are no matches at all.""" + # Mock responses for Status 0 and Template commands + mock_responses = [ + # Status 0 response + MagicMock( + status_code=200, + json=lambda: {"Status": {"DeviceName": "Unknown_Device"}} + ), + # Template response + MagicMock( + status_code=200, + json=lambda: {"Template": '{"NAME":"Unknown Template","GPIO":[0,0,0,0,0,0,0,0,0,0,0,0,0],"FLAG":0,"BASE":18}'} + ) + ] + mock_get.side_effect = mock_responses + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertFalse(result) # No updates made + self.assertEqual(mock_get.call_count, 2) # Only Status 0 and Template calls + + # Log the result + logger.info("Test 4: No matches at all - PASSED") + + @patch('requests.get') + def test_no_config_other(self, mock_get): + """Test when there's no mqtt.config_other in the configuration.""" + # Set empty config_other + self.discovery.config = {'mqtt': {}} + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertFalse(result) # No updates made + self.assertEqual(mock_get.call_count, 0) # No HTTP calls made + + # Log the result + logger.info("Test 5: No mqtt.config_other in configuration - PASSED") + + @patch('requests.get') + def test_status0_failure(self, mock_get): + """Test when Status 0 command fails.""" + # Mock response for Status 0 command + mock_get.return_value = MagicMock( + status_code=200, + json=lambda: {"Status": {}} # Missing DeviceName + ) + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertFalse(result) # No updates made + self.assertEqual(mock_get.call_count, 1) # Only Status 0 call + + # Log the result + logger.info("Test 6: Status 0 command failure - PASSED") + + @patch('requests.get') + def test_template_failure(self, mock_get): + """Test when Template command fails.""" + # Mock responses for Status 0 and Template commands + mock_responses = [ + # Status 0 response + MagicMock( + status_code=200, + json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}} + ), + # Template response + MagicMock( + status_code=200, + json=lambda: {} # Missing Template + ) + ] + mock_get.side_effect = mock_responses + + # Call the method + result = self.discovery.check_and_update_template("192.168.8.100", "test_device") + + # Verify results + self.assertFalse(result) # No updates made + self.assertEqual(mock_get.call_count, 2) # Status 0 and Template calls + + # Log the result + logger.info("Test 7: Template command failure - PASSED") + +def main(): + """Run the tests.""" + unittest.main() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_unknown_device_console_settings.py b/test_unknown_device_console_settings.py new file mode 100755 index 0000000..fe8034d --- /dev/null +++ b/test_unknown_device_console_settings.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 +""" +Test script to verify that console settings are applied to unknown devices +before rebooting. This script will process a single device by IP address +or hostname and apply console settings from the configuration. +""" + +import sys +import logging +import argparse +from TasmotaManager import TasmotaDiscovery + +# Configure logging +logging.basicConfig( + level=logging.DEBUG, # Use DEBUG level to see all console settings being applied + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +def main(): + """Main function to test the unknown device console settings functionality.""" + parser = argparse.ArgumentParser(description='Test unknown device console settings') + parser.add_argument('device_identifier', help='IP address or hostname of the device to test') + parser.add_argument('--debug', action='store_true', help='Enable debug mode') + args = parser.parse_args() + + print(f"Testing unknown device console settings for: {args.device_identifier}") + + # Initialize TasmotaDiscovery with debug mode if requested + discovery = TasmotaDiscovery(debug=args.debug) + + # Load configuration + discovery.load_config() + + # Get console settings from configuration + mqtt_config = discovery.config.get('mqtt', {}) + console_params = mqtt_config.get('console', {}) + + if not console_params: + print("No console parameters found in configuration. Please add some to test.") + sys.exit(1) + + print("Console parameters that will be applied:") + for param, value in console_params.items(): + print(f" {param}: {value}") + + # Process the single device + print("\nProcessing device...") + result = discovery.process_single_device(args.device_identifier) + + if result: + print(f"\nSuccessfully processed device: {args.device_identifier}") + print("Console settings should have been applied before reboot.") + else: + print(f"\nFailed to process device: {args.device_identifier}") + print("Check the logs for more information.") + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_unknown_device_toggle.py b/test_unknown_device_toggle.py new file mode 100755 index 0000000..c92200a --- /dev/null +++ b/test_unknown_device_toggle.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +""" +Test script to verify the unknown device toggling functionality. +This script will process a single device by IP address or hostname. +""" + +import sys +import logging +from TasmotaManager import TasmotaDiscovery + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) + +def main(): + """Main function to test the unknown device toggling functionality.""" + if len(sys.argv) < 2: + print("Usage: python test_unknown_device_toggle.py ") + print(" can be an IP address or hostname") + sys.exit(1) + + device_identifier = sys.argv[1] + print(f"Testing unknown device toggling for: {device_identifier}") + + # Initialize TasmotaDiscovery with debug mode + discovery = TasmotaDiscovery(debug=True) + + # Load configuration + discovery.load_config() + + # Process the single device + result = discovery.process_single_device(device_identifier) + + if result: + print(f"Successfully processed device: {device_identifier}") + else: + print(f"Failed to process device: {device_identifier}") + +if __name__ == "__main__": + main() \ No newline at end of file