import json import logging import os import sys from datetime import datetime from typing import Optional import requests from urllib3.exceptions import InsecureRequestWarning import re # Import the regular expression module import time import argparse # Disable SSL warnings requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class UnifiClient: def __init__(self, base_url, username, password, site_id, verify_ssl=True): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.site_id = site_id self.session = requests.Session() self.session.verify = verify_ssl # Initialize cookie jar self.session.cookies.clear() def _login(self) -> requests.Response: # Changed return type annotation """Authenticate with the UniFi Controller.""" login_url = f"{self.base_url}/api/auth/login" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } payload = { "username": self.username, "password": self.password, "remember": False } try: response = self.session.post( login_url, json=payload, headers=headers ) response.raise_for_status() if 'X-CSRF-Token' in response.headers: self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] return response # Return the response object except requests.exceptions.RequestException as e: if hasattr(e, 'response') and e.response.status_code == 401: raise Exception("Authentication failed. Please verify your username and password.") from e raise def get_clients(self) -> list: """Get all clients from the UniFi Controller.""" # Try the newer API endpoint first url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If the newer endpoint fails, try the legacy endpoint url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If both fail, try the v2 API endpoint url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) class TasmotaDiscovery: def __init__(self, debug: bool = False): """Initialize the TasmotaDiscovery with optional debug mode.""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.logger = logging.getLogger(__name__) self.config = None self.unifi_client = None def load_config(self, config_path: Optional[str] = None) -> dict: """Load configuration from JSON file.""" if config_path is None: config_path = os.path.join(os.path.dirname(__file__), 'config.json') self.logger.debug(f"Loading configuration from: {config_path}") try: with open(config_path, 'r') as config_file: self.config = json.load(config_file) self.logger.debug("Configuration loaded successfully from %s", config_path) return self.config except FileNotFoundError: self.logger.error(f"Configuration file not found at {config_path}") sys.exit(1) except json.JSONDecodeError: self.logger.error("Invalid JSON in configuration file") sys.exit(1) def setup_unifi_client(self): """Set up the UniFi client with better error handling""" self.logger.debug("Setting up UniFi client") if not self.config or 'unifi' not in self.config: raise ValueError("Missing UniFi configuration") unifi_config = self.config['unifi'] required_fields = ['host', 'username', 'password', 'site'] missing_fields = [field for field in required_fields if field not in unifi_config] if missing_fields: raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") try: self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") self.unifi_client = UnifiClient( base_url=unifi_config['host'], username=unifi_config['username'], password=unifi_config['password'], site_id=unifi_config['site'], verify_ssl=False # Add this if using self-signed certificates ) # Test the connection by making a simple request response = self.unifi_client._login() if not response: raise ConnectionError(f"Failed to connect to UniFi controller: No response") self.logger.debug("UniFi client setup successful") except Exception as e: self.logger.error(f"Error setting up UniFi client: {str(e)}") raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") def is_tasmota_device(self, device: dict) -> bool: """Determine if a device is in the network_filter and not in exclude_patterns.""" name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() ip = device.get('ip', '') # Check if device is in the configured network network_filters = self.config['unifi'].get('network_filter', {}) for network in network_filters.values(): if ip.startswith(network['subnet']): self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}") # Check exclusion patterns exclude_patterns = network.get('exclude_patterns', []) for pattern in exclude_patterns: pattern = pattern.lower() # Convert glob pattern to regex pattern pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") return False # Device is in the network and not excluded self.logger.debug(f"Found device in network: {name}") return True return False def get_tasmota_devices(self) -> list: """Query UniFi controller and filter Tasmota devices.""" devices = [] self.logger.debug("Querying UniFi controller for devices") try: all_clients = self.unifi_client.get_clients() self.logger.debug(f"Found {len(all_clients)} total devices") for device in all_clients: if self.is_tasmota_device(device): device_info = { "name": device.get('name', device.get('hostname', 'Unknown')), "ip": device.get('ip', ''), "mac": device.get('mac', ''), "last_seen": device.get('last_seen', ''), "hostname": device.get('hostname', ''), "notes": device.get('note', ''), } devices.append(device_info) self.logger.debug(f"Found {len(devices)} Tasmota devices") return devices except Exception as e: self.logger.error(f"Error getting devices from UniFi controller: {e}") return [] def save_tasmota_config(self, devices: list) -> None: """Save Tasmota device information to a JSON file with device tracking.""" filename = "current.json" self.logger.debug(f"Saving Tasmota configuration to {filename}") deprecated_filename = "deprecated.json" current_devices = [] deprecated_devices = [] # Load existing devices if file exists if os.path.exists(filename): try: with open(filename, 'r') as f: existing_config = json.load(f) current_devices = existing_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {filename}, treating as empty") current_devices = [] # Load deprecated devices if file exists if os.path.exists(deprecated_filename): try: with open(deprecated_filename, 'r') as f: deprecated_config = json.load(f) deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {deprecated_filename}, treating as empty") deprecated_devices = [] # Create new config new_devices = [] moved_to_deprecated = [] restored_from_deprecated = [] removed_from_deprecated = [] excluded_devices = [] # Check for excluded devices in current and deprecated lists network_filters = self.config['unifi'].get('network_filter', {}) exclude_patterns = [] for network in network_filters.values(): exclude_patterns.extend(network.get('exclude_patterns', [])) # Function to check if device is excluded def is_device_excluded(device_name: str, hostname: str = '') -> bool: name = device_name.lower() hostname = hostname.lower() for pattern in exclude_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): return True return False # Process current devices for device in devices: device_name = device['name'] device_hostname = device.get('hostname', '') device_ip = device['ip'] device_mac = device['mac'] # Check if device should be excluded if is_device_excluded(device_name, device_hostname): print(f"Device {device_name} excluded by pattern - skipping") excluded_devices.append(device_name) continue # Check in current devices existing_device = next((d for d in current_devices if d['name'] == device_name), None) if existing_device: # Device exists, check if IP or MAC changed if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: moved_to_deprecated.append(existing_device) new_devices.append(device) print(f"Device {device_name} moved to deprecated (IP/MAC changed)") else: new_devices.append(existing_device) # Keep existing device else: # New device, check if it was in deprecated deprecated_device = next((d for d in deprecated_devices if d['name'] == device_name), None) if deprecated_device: removed_from_deprecated.append(device_name) print(f"Device {device_name} removed from deprecated (restored)") new_devices.append(device) print(f"Device {device_name} added to output file") # Find devices that are no longer present current_names = {d['name'] for d in devices} for existing_device in current_devices: if existing_device['name'] not in current_names: if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): moved_to_deprecated.append(existing_device) print(f"Device {existing_device['name']} moved to deprecated (no longer present)") # Update deprecated devices list, excluding any excluded devices final_deprecated = [] for device in deprecated_devices: if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): final_deprecated.append(device) elif is_device_excluded(device['name'], device.get('hostname', '')): print(f"Device {device['name']} removed from deprecated (excluded by pattern)") final_deprecated.extend(moved_to_deprecated) # Save new configuration config = { "tasmota": { "devices": new_devices, "generated_at": datetime.now().isoformat(), "total_devices": len(new_devices) } } # Save deprecated configuration deprecated_config = { "tasmota": { "devices": final_deprecated, "generated_at": datetime.now().isoformat(), "total_devices": len(final_deprecated) } } # Backup existing file if it exists if os.path.exists(filename): try: backup_name = f"{filename}.backup" os.rename(filename, backup_name) self.logger.info(f"Created backup of existing configuration as {backup_name}") except Exception as e: self.logger.error(f"Error creating backup: {e}") # Save files try: with open(filename, 'w') as f: json.dump(config, f, indent=4) with open(deprecated_filename, 'w') as f: json.dump(deprecated_config, f, indent=4) self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") print("\nDevice Status Summary:") if excluded_devices: print("\nExcluded Devices:") for name in excluded_devices: print(f"- {name}") if moved_to_deprecated: print("\nMoved to deprecated:") for device in moved_to_deprecated: print(f"- {device['name']}") if removed_from_deprecated: print("\nRestored from deprecated:") for name in removed_from_deprecated: print(f"- {name}") print("\nCurrent Tasmota Devices:") for device in new_devices: print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") except Exception as e: self.logger.error(f"Error saving configuration: {e}") def get_unknown_devices(self, use_current_json=True): """Identify devices that match unknown_device_patterns from current.json.""" self.logger.info("Identifying unknown devices for processing...") unknown_devices = [] try: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) all_devices = data.get('tasmota', {}).get('devices', []) self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return [] except json.JSONDecodeError: self.logger.error(f"Invalid JSON format in {source_file}") return [] # Identify devices matching unknown_device_patterns network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) for device in all_devices: name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() for pattern in unknown_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): self.logger.debug(f"Found unknown device: {name} ({hostname})") unknown_devices.append(device) break self.logger.info(f"Found {len(unknown_devices)} unknown devices to process") return unknown_devices def process_unknown_devices(self): """Process unknown devices by checking for toggle button and configuring them. This method: 1. Identifies devices matching unknown_device_patterns 2. Checks if each device has a toggle button (indicating it's a light/switch) 3. Toggles the button at 1/2Hz while checking for hostname changes 4. Prompts the user to enter a new name for the device in the console 5. Once a name is entered, configures the device with the new hostname """ unknown_devices = self.get_unknown_devices() if not unknown_devices: self.logger.info("No unknown devices found to process") return self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...") for device in unknown_devices: name = device.get('name', 'Unknown') ip = device.get('ip') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue self.logger.info(f"Processing unknown device: {name} at {ip}") # Check if device has a toggle button try: # Get the main page to check for toggle button url = f"http://{ip}/" response = requests.get(url, timeout=5) # Check if there's a toggle button in the response has_toggle = "toggle" in response.text.lower() if has_toggle: self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug") # Start toggling at 1/2Hz original_hostname = device.get('hostname', '') toggle_state = False # Temporarily disable all logging during toggling logging.disable(logging.CRITICAL) try: # Clear console output and show prompt print("\n" + "="*50) print(f"DEVICE: {name} at IP: {ip}") print(f"Current hostname: {original_hostname}") print("="*50) print("The device is now toggling to help you identify it.") # Start toggling in background while waiting for input import threading stop_toggle = threading.Event() def toggle_device(): toggle_state = False while not stop_toggle.is_set(): toggle_state = not toggle_state toggle_cmd = "Power On" if toggle_state else "Power Off" toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}" try: requests.get(toggle_url, timeout=2) except: pass time.sleep(2.0) # 1/2Hz rate # Start toggle thread toggle_thread = threading.Thread(target=toggle_device) toggle_thread.daemon = True toggle_thread.start() # Prompt for new hostname print("\nPlease enter a new name for this device:") new_hostname = input("> ").strip() # Stop toggling stop_toggle.set() toggle_thread.join(timeout=3) if new_hostname and new_hostname != original_hostname: print(f"Setting new hostname to: {new_hostname}") else: print("No valid hostname entered, skipping device") new_hostname = "" finally: # Re-enable logging logging.disable(logging.NOTSET) # If a new hostname was entered, configure the device if new_hostname: self.logger.info(f"Configuring device with new hostname: {new_hostname}") self.configure_unknown_device(ip, new_hostname) else: self.logger.warning(f"No new hostname provided for {name}, skipping configuration") else: self.logger.info(f"Device {name} does not have a toggle button, skipping") except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") def configure_unknown_device(self, ip, hostname): """Configure an unknown device with the given hostname and MQTT settings.""" try: # Set Friendly Name friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}" response = requests.get(friendly_name_url, timeout=5) if response.status_code == 200: self.logger.info(f"Set Friendly Name to {hostname}") else: self.logger.error(f"Failed to set Friendly Name to {hostname}") # Enable MQTT if not already enabled mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT response = requests.get(mqtt_url, timeout=5) if response.status_code == 200: self.logger.info(f"Enabled MQTT for {hostname}") else: self.logger.error(f"Failed to enable MQTT for {hostname}") # Configure MQTT settings mqtt_config = self.config.get('mqtt', {}) if mqtt_config: # Get the base hostname (everything before the dash) hostname_base = hostname.split('-')[0] if '-' in hostname else hostname mqtt_fields = { "MqttHost": mqtt_config.get('Host', ''), "MqttPort": mqtt_config.get('Port', 1883), "MqttUser": mqtt_config.get('User', ''), "MqttPassword": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } for setting, value in mqtt_fields.items(): url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.info(f"{hostname}: Set {setting} to {value}") else: self.logger.info(f"{hostname}: Set MQTT Password") else: self.logger.error(f"{hostname}: Failed to set {setting}") # Save configuration (will reboot the device) save_url = f"http://{ip}/cm?cmnd=Restart%201" response = requests.get(save_url, timeout=5) if response.status_code == 200: self.logger.info(f"Saved configuration and rebooted {hostname}") else: self.logger.error(f"Failed to save configuration for {hostname}") return True except requests.exceptions.RequestException as e: self.logger.error(f"Error configuring device at {ip}: {str(e)}") return False def get_device_details(self, use_current_json=True): """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings. Filters out devices matching unknown_device_patterns.""" self.logger.info("Starting to gather detailed device information...") device_details = [] try: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) all_devices = data.get('tasmota', {}).get('devices', []) self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return except json.JSONDecodeError: self.logger.error(f"Invalid JSON format in {source_file}") return # Filter out devices matching unknown_device_patterns devices = [] network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) for device in all_devices: name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() is_unknown = False for pattern in unknown_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): self.logger.debug(f"Skipping unknown device: {name} ({hostname})") is_unknown = True break if not is_unknown: devices.append(device) self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices") mqtt_config = self.config.get('mqtt', {}) if not mqtt_config: self.logger.error("MQTT configuration missing from config file") return def check_mqtt_settings(ip, name, mqtt_status): """Check and update MQTT settings if they don't match config""" # Get the base hostname (everything before the dash) hostname_base = name.split('-')[0] if '-' in name else name mqtt_fields = { "Host": mqtt_config.get('Host', ''), "Port": mqtt_config.get('Port', 1883), "User": mqtt_config.get('User', ''), "Password": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } device_mqtt = mqtt_status.get('MqttHost', {}) changes_needed = [] force_password_update = False # Check each MQTT setting if device_mqtt.get('Host') != mqtt_fields['Host']: changes_needed.append(('MqttHost', mqtt_fields['Host'])) self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") force_password_update = True if device_mqtt.get('Port') != mqtt_fields['Port']: changes_needed.append(('MqttPort', mqtt_fields['Port'])) self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") force_password_update = True if device_mqtt.get('User') != mqtt_fields['User']: changes_needed.append(('MqttUser', mqtt_fields['User'])) self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") force_password_update = True if device_mqtt.get('Topic') != mqtt_fields['Topic']: changes_needed.append(('Topic', mqtt_fields['Topic'])) self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") force_password_update = True if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") force_password_update = True # Add password update if any MQTT setting changed or user was updated if force_password_update: changes_needed.append(('MqttPassword', mqtt_fields['Password'])) self.logger.debug(f"{name}: MQTT Password will be updated") # Check NoRetain setting - FIXED: Use the actual value from config with default of False no_retain = mqtt_config.get('NoRetain', False) if no_retain: changes_needed.append(('SetOption62', '1')) # 1 = No Retain else: changes_needed.append(('SetOption62', '0')) # 0 = Use Retain # Apply changes if needed for setting, value in changes_needed: try: url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.debug(f"{name}: Updated {setting} to {value}") else: self.logger.debug(f"{name}: Updated MQTT Password") else: self.logger.error(f"{name}: Failed to update {setting}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error updating {setting}: {str(e)}") return len(changes_needed) > 0 for device in devices: if not isinstance(device, dict): self.logger.warning(f"Skipping invalid device entry: {device}") continue name = device.get('name', 'Unknown') ip = device.get('ip') mac = device.get('mac') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue self.logger.info(f"Checking device: {name} at {ip}") try: # Get Status 2 for firmware version url_status = f"http://{ip}/cm?cmnd=Status%202" response = requests.get(url_status, timeout=5) status_data = response.json() # Get Status 5 for network info url_network = f"http://{ip}/cm?cmnd=Status%205" response = requests.get(url_network, timeout=5) network_data = response.json() # Get Status 6 for MQTT info url_mqtt = f"http://{ip}/cm?cmnd=Status%206" response = requests.get(url_mqtt, timeout=5) mqtt_data = response.json() # Check and update MQTT settings if needed mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) # Set console parameters from config console_updated = False console_params = mqtt_config.get('console', {}) if console_params: self.logger.info(f"{name}: Setting console parameters from configuration") # Special handling for Retain parameters - need to send opposite state first, then final state # This is necessary because the changes are what create the update of the Retain state at the MQTT server retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] # Process Retain parameters first for param in retain_params: if param in console_params: try: final_value = console_params[param] # Set opposite state first opposite_value = "On" if final_value.lower() == "off" else "Off" # First command (opposite state) url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") console_updated = True else: self.logger.error(f"{name}: Failed to set {param} to {opposite_value}") # Small delay to ensure commands are processed in order time.sleep(0.5) # Second command (final state) url = f"http://{ip}/cm?cmnd={param}%20{final_value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") console_updated = True else: self.logger.error(f"{name}: Failed to set {param} to {final_value}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error setting {param} commands: {str(e)}") # Process all other console parameters # Track rules that need to be enabled rules_to_enable = {} for param, value in console_params.items(): # Skip Retain parameters as they're handled specially above if param in retain_params: continue # Check if this is a rule definition (lowercase rule1, rule2, etc.) if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): # Store the rule number for later enabling rule_num = param[-1] rules_to_enable[rule_num] = True self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable") # Skip Rule1, Rule2, etc. if we're auto-enabling rules if param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit(): # If this is in the config, we'll respect it, but log that it's not needed self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature") try: url = f"http://{ip}/cm?cmnd={param}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set console parameter {param} to {value}") console_updated = True else: self.logger.error(f"{name}: Failed to set console parameter {param}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error setting console parameter {param}: {str(e)}") # Auto-enable any rules that were defined for rule_num in rules_to_enable: rule_enable_param = f"Rule{rule_num}" # Skip if the rule enable command was already in the config if any(p.lower() == rule_enable_param.lower() for p in console_params): continue try: url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.info(f"{name}: Auto-enabled {rule_enable_param}") console_updated = True else: self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)}") device_detail = { "name": name, "ip": ip, "mac": mac, "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), "mqtt_status": "Updated" if mqtt_updated else "Verified", "console_status": "Updated" if console_updated else "Verified", "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "online" } self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") device_detail = { "name": name, "ip": ip, "mac": mac, "version": "Unknown", "status": "offline", "error": str(e) } device_details.append(device_detail) time.sleep(0.5) # Save all device details at once try: with open('TasmotaDevices.json', 'w') as f: json.dump(device_details, f, indent=2) self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") except Exception as e: self.logger.error(f"Error saving device details: {e}") def main(): parser = argparse.ArgumentParser(description='Tasmota Device Manager') parser.add_argument('--config', default='network_configuration.json', help='Path to configuration file') parser.add_argument('--debug', action='store_true', help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing current.json') parser.add_argument('--process-unknown', action='store_true', help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT') args = parser.parse_args() # Set up logging log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') print("Starting Tasmota Device Discovery and Version Check...") # Create TasmotaDiscovery instance discovery = TasmotaDiscovery(debug=args.debug) discovery.load_config(args.config) try: if not args.skip_unifi: print("Step 1: Discovering Tasmota devices...") discovery.setup_unifi_client() tasmota_devices = discovery.get_tasmota_devices() discovery.save_tasmota_config(tasmota_devices) else: print("Skipping UniFi discovery, using existing current.json...") print("\nStep 2: Getting detailed version information...") discovery.get_device_details(use_current_json=True) if args.process_unknown: print("\nStep 3: Processing unknown devices...") discovery.process_unknown_devices() print("\nProcess completed successfully!") print("- Device list saved to: current.json") print("- Detailed information saved to: TasmotaDevices.json") except ConnectionError as e: print(f"Connection Error: {str(e)}") print("\nTrying to proceed with existing current.json...") try: discovery.get_device_details(use_current_json=True) print("\nSuccessfully retrieved device details from existing current.json") except Exception as inner_e: print(f"Error processing existing devices: {str(inner_e)}") return 1 except Exception as e: print(f"Error: {str(e)}") if args.debug: import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': main()