import json import logging import os import sys from datetime import datetime from typing import Optional import requests from urllib3.exceptions import InsecureRequestWarning import re # Import the regular expression module import time import argparse # Disable SSL warnings requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class UnifiClient: def __init__(self, base_url, username, password, site_id, verify_ssl=True): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.site_id = site_id self.session = requests.Session() self.session.verify = verify_ssl # Initialize cookie jar self.session.cookies.clear() def _login(self) -> requests.Response: # Changed return type annotation """Authenticate with the UniFi Controller.""" login_url = f"{self.base_url}/api/auth/login" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } payload = { "username": self.username, "password": self.password, "remember": False } try: response = self.session.post( login_url, json=payload, headers=headers ) response.raise_for_status() if 'X-CSRF-Token' in response.headers: self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] return response # Return the response object except requests.exceptions.RequestException as e: if hasattr(e, 'response') and e.response.status_code == 401: raise Exception("Authentication failed. Please verify your username and password.") from e raise def get_clients(self) -> list: """Get all clients from the UniFi Controller.""" # Try the newer API endpoint first url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If the newer endpoint fails, try the legacy endpoint url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If both fail, try the v2 API endpoint url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) class TasmotaDiscovery: def __init__(self, debug: bool = False): """Initialize the TasmotaDiscovery with optional debug mode.""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.logger = logging.getLogger(__name__) self.config = None self.unifi_client = None def load_config(self, config_path: Optional[str] = None) -> dict: """Load configuration from JSON file.""" if config_path is None: config_path = os.path.join(os.path.dirname(__file__), 'config.json') self.logger.debug(f"Loading configuration from: {config_path}") try: with open(config_path, 'r') as config_file: self.config = json.load(config_file) self.logger.debug("Configuration loaded successfully from %s", config_path) return self.config except FileNotFoundError: self.logger.error(f"Configuration file not found at {config_path}") sys.exit(1) except json.JSONDecodeError: self.logger.error("Invalid JSON in configuration file") sys.exit(1) def setup_unifi_client(self): """Set up the UniFi client with better error handling""" self.logger.debug("Setting up UniFi client") if not self.config or 'unifi' not in self.config: raise ValueError("Missing UniFi configuration") unifi_config = self.config['unifi'] required_fields = ['host', 'username', 'password', 'site'] missing_fields = [field for field in required_fields if field not in unifi_config] if missing_fields: raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") try: self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") self.unifi_client = UnifiClient( base_url=unifi_config['host'], username=unifi_config['username'], password=unifi_config['password'], site_id=unifi_config['site'], verify_ssl=False # Add this if using self-signed certificates ) # Test the connection by making a simple request response = self.unifi_client._login() if not response: raise ConnectionError(f"Failed to connect to UniFi controller: No response") self.logger.debug("UniFi client setup successful") except Exception as e: self.logger.error(f"Error setting up UniFi client: {str(e)}") raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") def is_tasmota_device(self, device: dict) -> bool: """Determine if a device is in the network_filter and not in exclude_patterns.""" name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() ip = device.get('ip', '') # Check if device is in the configured network network_filters = self.config['unifi'].get('network_filter', {}) for network in network_filters.values(): if ip.startswith(network['subnet']): self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}") # Check exclusion patterns exclude_patterns = network.get('exclude_patterns', []) for pattern in exclude_patterns: pattern = pattern.lower() # Convert glob pattern to regex pattern pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") return False # Device is in the network and not excluded self.logger.debug(f"Found device in network: {name}") return True return False def get_tasmota_devices(self) -> list: """Query UniFi controller and filter Tasmota devices.""" devices = [] self.logger.debug("Querying UniFi controller for devices") try: all_clients = self.unifi_client.get_clients() self.logger.debug(f"Found {len(all_clients)} total devices") for device in all_clients: if self.is_tasmota_device(device): device_info = { "name": device.get('name', device.get('hostname', 'Unknown')), "ip": device.get('ip', ''), "mac": device.get('mac', ''), "last_seen": device.get('last_seen', ''), "hostname": device.get('hostname', ''), "notes": device.get('note', ''), } devices.append(device_info) self.logger.debug(f"Found {len(devices)} Tasmota devices") return devices except Exception as e: self.logger.error(f"Error getting devices from UniFi controller: {e}") return [] def save_tasmota_config(self, devices: list) -> None: """Save Tasmota device information to a JSON file with device tracking.""" filename = "current.json" self.logger.debug(f"Saving Tasmota configuration to {filename}") deprecated_filename = "deprecated.json" current_devices = [] deprecated_devices = [] # Load existing devices if file exists if os.path.exists(filename): try: with open(filename, 'r') as f: existing_config = json.load(f) current_devices = existing_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {filename}, treating as empty") current_devices = [] # Load deprecated devices if file exists if os.path.exists(deprecated_filename): try: with open(deprecated_filename, 'r') as f: deprecated_config = json.load(f) deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {deprecated_filename}, treating as empty") deprecated_devices = [] # Create new config new_devices = [] moved_to_deprecated = [] restored_from_deprecated = [] removed_from_deprecated = [] excluded_devices = [] # Check for excluded devices in current and deprecated lists network_filters = self.config['unifi'].get('network_filter', {}) exclude_patterns = [] for network in network_filters.values(): exclude_patterns.extend(network.get('exclude_patterns', [])) # Function to check if device is excluded def is_device_excluded(device_name: str, hostname: str = '') -> bool: name = device_name.lower() hostname = hostname.lower() for pattern in exclude_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): return True return False # Process current devices for device in devices: device_name = device['name'] device_hostname = device.get('hostname', '') device_ip = device['ip'] device_mac = device['mac'] # Check if device should be excluded if is_device_excluded(device_name, device_hostname): print(f"Device {device_name} excluded by pattern - skipping") excluded_devices.append(device_name) continue # Check in current devices existing_device = next((d for d in current_devices if d['name'] == device_name), None) if existing_device: # Device exists, check if IP or MAC changed if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: moved_to_deprecated.append(existing_device) new_devices.append(device) print(f"Device {device_name} moved to deprecated (IP/MAC changed)") else: new_devices.append(existing_device) # Keep existing device else: # New device, check if it was in deprecated deprecated_device = next((d for d in deprecated_devices if d['name'] == device_name), None) if deprecated_device: removed_from_deprecated.append(device_name) print(f"Device {device_name} removed from deprecated (restored)") new_devices.append(device) print(f"Device {device_name} added to output file") # Find devices that are no longer present current_names = {d['name'] for d in devices} for existing_device in current_devices: if existing_device['name'] not in current_names: if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): moved_to_deprecated.append(existing_device) print(f"Device {existing_device['name']} moved to deprecated (no longer present)") # Update deprecated devices list, excluding any excluded devices final_deprecated = [] for device in deprecated_devices: if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): final_deprecated.append(device) elif is_device_excluded(device['name'], device.get('hostname', '')): print(f"Device {device['name']} removed from deprecated (excluded by pattern)") final_deprecated.extend(moved_to_deprecated) # Save new configuration config = { "tasmota": { "devices": new_devices, "generated_at": datetime.now().isoformat(), "total_devices": len(new_devices) } } # Save deprecated configuration deprecated_config = { "tasmota": { "devices": final_deprecated, "generated_at": datetime.now().isoformat(), "total_devices": len(final_deprecated) } } # Backup existing file if it exists if os.path.exists(filename): try: backup_name = f"{filename}.backup" os.rename(filename, backup_name) self.logger.info(f"Created backup of existing configuration as {backup_name}") except Exception as e: self.logger.error(f"Error creating backup: {e}") # Save files try: with open(filename, 'w') as f: json.dump(config, f, indent=4) with open(deprecated_filename, 'w') as f: json.dump(deprecated_config, f, indent=4) self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") print("\nDevice Status Summary:") if excluded_devices: print("\nExcluded Devices:") for name in excluded_devices: print(f"- {name}") if moved_to_deprecated: print("\nMoved to deprecated:") for device in moved_to_deprecated: print(f"- {device['name']}") if removed_from_deprecated: print("\nRestored from deprecated:") for name in removed_from_deprecated: print(f"- {name}") print("\nCurrent Tasmota Devices:") for device in new_devices: print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") except Exception as e: self.logger.error(f"Error saving configuration: {e}") def get_unknown_devices(self, use_current_json=True): """Identify devices that match unknown_device_patterns from current.json.""" self.logger.info("Identifying unknown devices for processing...") unknown_devices = [] try: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) all_devices = data.get('tasmota', {}).get('devices', []) self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return [] except json.JSONDecodeError: self.logger.error(f"Invalid JSON format in {source_file}") return [] # Identify devices matching unknown_device_patterns network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) for device in all_devices: name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() for pattern in unknown_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): self.logger.debug(f"Found unknown device: {name} ({hostname})") unknown_devices.append(device) break self.logger.info(f"Found {len(unknown_devices)} unknown devices to process") return unknown_devices def process_unknown_devices(self): """Process unknown devices by checking for toggle button and configuring them. This method: 1. Identifies devices matching unknown_device_patterns 2. Checks if each device has a toggle button (indicating it's a light/switch) 3. Toggles the button at 1/2Hz while checking for hostname changes 4. Prompts the user to enter a new name for the device in the console 5. Once a name is entered, configures the device with the new hostname """ unknown_devices = self.get_unknown_devices() if not unknown_devices: self.logger.info("No unknown devices found to process") return self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...") for device in unknown_devices: name = device.get('name', 'Unknown') ip = device.get('ip') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue self.logger.info(f"Processing unknown device: {name} at {ip}") # Check if device has a toggle button try: # Get the main page to check for toggle button url = f"http://{ip}/" response = requests.get(url, timeout=5) # Check if there's a toggle button in the response has_toggle = "toggle" in response.text.lower() if has_toggle: self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug") # Start toggling at 1/2Hz original_hostname = device.get('hostname', '') toggle_state = False # Temporarily disable all logging during toggling logging.disable(logging.CRITICAL) try: # Clear console output and show prompt print("\n" + "="*50) print(f"DEVICE: {name} at IP: {ip}") print(f"Current hostname: {original_hostname}") print("="*50) print("The device is now toggling to help you identify it.") # Start toggling in background while waiting for input import threading stop_toggle = threading.Event() def toggle_device(): toggle_state = False while not stop_toggle.is_set(): toggle_state = not toggle_state toggle_cmd = "Power On" if toggle_state else "Power Off" toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}" try: requests.get(toggle_url, timeout=2) except: pass time.sleep(2.0) # 1/2Hz rate # Start toggle thread toggle_thread = threading.Thread(target=toggle_device) toggle_thread.daemon = True toggle_thread.start() # Prompt for new hostname print("\nPlease enter a new name for this device:") new_hostname = input("> ").strip() # Stop toggling stop_toggle.set() toggle_thread.join(timeout=3) if new_hostname and new_hostname != original_hostname: print(f"Setting new hostname to: {new_hostname}") else: print("No valid hostname entered, skipping device") new_hostname = "" finally: # Re-enable logging logging.disable(logging.NOTSET) # If a new hostname was entered, configure the device if new_hostname: self.logger.info(f"Configuring device with new hostname: {new_hostname}") self.configure_unknown_device(ip, new_hostname) else: self.logger.warning(f"No new hostname provided for {name}, skipping configuration") else: self.logger.info(f"Device {name} does not have a toggle button, skipping") except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") def configure_unknown_device(self, ip, hostname): """Configure an unknown device with the given hostname and MQTT settings.""" try: # Set Friendly Name friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}" response = requests.get(friendly_name_url, timeout=5) if response.status_code == 200: self.logger.info(f"Set Friendly Name to {hostname}") else: self.logger.error(f"Failed to set Friendly Name to {hostname}") # Enable MQTT if not already enabled mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT response = requests.get(mqtt_url, timeout=5) if response.status_code == 200: self.logger.info(f"Enabled MQTT for {hostname}") else: self.logger.error(f"Failed to enable MQTT for {hostname}") # Configure MQTT settings mqtt_config = self.config.get('mqtt', {}) if mqtt_config: # Get the base hostname (everything before the dash) hostname_base = hostname.split('-')[0] if '-' in hostname else hostname mqtt_fields = { "MqttHost": mqtt_config.get('Host', ''), "MqttPort": mqtt_config.get('Port', 1883), "MqttUser": mqtt_config.get('User', ''), "MqttPassword": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } for setting, value in mqtt_fields.items(): # For FullTopic, we need to avoid adding a space (%20) between the command and value if setting == "FullTopic": url = f"http://{ip}/cm?cmnd={setting}={value}" else: url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.info(f"{hostname}: Set {setting} to {value}") else: self.logger.info(f"{hostname}: Set MQTT Password") else: self.logger.error(f"{hostname}: Failed to set {setting}") # Apply console settings before rebooting console_params = mqtt_config.get('console', {}) if console_params: self.logger.info(f"{hostname}: Setting console parameters from configuration") # Special handling for Retain parameters - need to send opposite state first, then final state # This is necessary because the changes are what create the update of the Retain state at the MQTT server retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] # Process Retain parameters first for param in retain_params: if param in console_params: try: final_value = console_params[param] # Set opposite state first opposite_value = "On" if final_value.lower() == "off" else "Off" # First command (opposite state) url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{hostname}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") else: self.logger.error(f"{hostname}: Failed to set {param} to {opposite_value}") # Small delay to ensure commands are processed in order time.sleep(0.5) # Second command (final state) url = f"http://{ip}/cm?cmnd={param}%20{final_value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{hostname}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") else: self.logger.error(f"{hostname}: Failed to set {param} to {final_value}") except Exception as e: self.logger.error(f"{hostname}: Unexpected error setting {param} commands: {str(e)}") # Process all other console parameters # Track rules that need to be enabled rules_to_enable = {} for param, value in console_params.items(): # Skip Retain parameters as they're handled specially above if param in retain_params: continue # Check if this is a rule definition (lowercase rule1, rule2, etc.) if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): # Store the rule number for later enabling rule_num = param[-1] rules_to_enable[rule_num] = True self.logger.debug(f"{hostname}: Detected rule definition {param}, will auto-enable") # Regular console parameter url = f"http://{ip}/cm?cmnd={param}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{hostname}: Set console parameter {param} to {value}") else: self.logger.error(f"{hostname}: Failed to set console parameter {param}") # Auto-enable any rules that were defined for rule_num in rules_to_enable: rule_enable_param = f"Rule{rule_num}" # Skip if the rule enable command was already in the config if any(p.lower() == rule_enable_param.lower() for p in console_params): continue # Rule auto-enabling url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.info(f"{hostname}: Auto-enabled {rule_enable_param}") else: self.logger.error(f"{hostname}: Failed to auto-enable {rule_enable_param}") # Save configuration (will reboot the device) save_url = f"http://{ip}/cm?cmnd=Restart%201" response = requests.get(save_url, timeout=5) if response.status_code == 200: self.logger.info(f"Saved configuration and rebooted {hostname}") else: self.logger.error(f"Failed to save configuration for {hostname}") return True except requests.exceptions.RequestException as e: self.logger.error(f"Error configuring device at {ip}: {str(e)}") return False def is_ip_in_network_filter(self, ip_address): """Check if an IP address is in any of the configured network filters. Args: ip_address: The IP address to check Returns: tuple: (is_in_network, target_network, network_name) where: - is_in_network is a boolean indicating if the IP is in a network - target_network is the network configuration dict or None - network_name is the name of the network or None """ network_filters = self.config['unifi'].get('network_filter', {}) for network_name, network in network_filters.items(): if ip_address.startswith(network['subnet']): self.logger.info(f"IP {ip_address} is in network: {network_name}") return True, network, network_name self.logger.error(f"IP {ip_address} is not in any configured network") return False, None, None def process_single_device(self, device_identifier): """Process a single device by hostname or IP address. Args: device_identifier: Either a hostname or IP address Returns: bool: True if device was processed successfully, False otherwise """ self.logger.info(f"Processing single device: {device_identifier}") # Check if device_identifier is an IP address or hostname is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier)) # If it's an IP address, check if it's in the network_filter first if is_ip: in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier) if not in_network: return False # Setup Unifi client if not already done if not self.unifi_client: try: self.setup_unifi_client() except ConnectionError as e: self.logger.error(f"Failed to connect to UniFi controller: {str(e)}") return False # Get all clients from Unifi try: all_clients = self.unifi_client.get_clients() self.logger.debug(f"Found {len(all_clients)} total devices") except Exception as e: self.logger.error(f"Error getting devices from UniFi controller: {e}") return False # Find the device in Unifi target_device = None if is_ip: # Search by IP self.logger.debug(f"Searching for device with IP: {device_identifier}") target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None) if not target_device: self.logger.error(f"No device found with IP: {device_identifier}") return False else: # Search by hostname - support partial and wildcard matches self.logger.debug(f"Searching for device with hostname: {device_identifier}") # Check if the identifier contains wildcards has_wildcards = '*' in device_identifier # Convert wildcards to regex pattern if present if has_wildcards: pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*') self.logger.debug(f"Using wildcard pattern: {pattern}") else: # For partial matches, we'll use the identifier as a substring pattern = device_identifier.lower() self.logger.debug(f"Using partial match pattern: {pattern}") # Find all matching devices matching_devices = [] for device in all_clients: hostname = device.get('hostname', '').lower() name = device.get('name', '').lower() if has_wildcards: # For wildcard matches, use regex if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)): matching_devices.append(device) else: # For partial matches, check if pattern is a substring if pattern in hostname or pattern in name: matching_devices.append(device) # Handle the results if not matching_devices: self.logger.error(f"No devices found matching: {device_identifier}") return False elif len(matching_devices) > 1: # Multiple matches found - log them and use the first one self.logger.warning(f"Multiple devices found matching '{device_identifier}':") for i, device in enumerate(matching_devices, 1): device_name = device.get('name', device.get('hostname', 'Unknown')) device_ip = device.get('ip', '') self.logger.warning(f" {i}. {device_name} (IP: {device_ip})") self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}") # Use the first (or only) matching device target_device = matching_devices[0] # Get device details device_name = target_device.get('name', target_device.get('hostname', 'Unknown')) device_hostname = target_device.get('hostname', '') device_ip = target_device.get('ip', '') device_mac = target_device.get('mac', '') self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})") # If we're processing a hostname (not an IP), check if the device's IP is in the network_filter if not is_ip: in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip) if not in_network: self.logger.error(f"Device {device_name} is not in any configured network") return False # For IP addresses, we already have the target_network from the earlier check # Check if device is excluded exclude_patterns = target_network.get('exclude_patterns', []) for pattern in exclude_patterns: pattern_lower = pattern.lower() pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*') if (re.match(f"^{pattern_regex}$", device_name.lower()) or re.match(f"^{pattern_regex}$", device_hostname.lower())): self.logger.error(f"Device {device_name} is excluded by pattern: {pattern}") return False # Check if device is in unknown_device_patterns unknown_patterns = target_network.get('unknown_device_patterns', []) is_unknown = False for pattern in unknown_patterns: pattern_lower = pattern.lower() pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*') if (re.match(f"^{pattern_regex}", device_name.lower()) or re.match(f"^{pattern_regex}", device_hostname.lower())): is_unknown = True self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}") break # Create a device info dictionary device_info = { "name": device_name, "ip": device_ip, "mac": device_mac, "last_seen": target_device.get('last_seen', ''), "hostname": device_hostname, "notes": target_device.get('note', ''), } # Process the device based on whether it's unknown or not if is_unknown: self.logger.info(f"Processing unknown device: {device_name}") # Check if device has a toggle button try: # Get the main page to check for toggle button url = f"http://{device_ip}/" response = requests.get(url, timeout=5) # Check if there's a toggle button in the response has_toggle = "toggle" in response.text.lower() if has_toggle: self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate") # Start toggling at 1/2Hz toggle_state = False # Temporarily disable all logging during toggling logging.disable(logging.CRITICAL) try: # Clear console output and show prompt print("\n" + "="*50) print(f"DEVICE: {device_name} at IP: {device_ip}") print(f"Current hostname: {device_hostname}") print("="*50) print("The device is now toggling to help you identify it.") # Start toggling in background while waiting for input import threading stop_toggle = threading.Event() def toggle_device(): toggle_state = False while not stop_toggle.is_set(): toggle_state = not toggle_state toggle_cmd = "Power On" if toggle_state else "Power Off" toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}" try: requests.get(toggle_url, timeout=2) except: pass time.sleep(2.0) # 1/2Hz rate # Start toggle thread toggle_thread = threading.Thread(target=toggle_device) toggle_thread.daemon = True toggle_thread.start() # Prompt for new hostname print("\nPlease enter a new name for this device:") new_hostname = input("> ").strip() # Stop toggling stop_toggle.set() toggle_thread.join(timeout=3) if new_hostname and new_hostname != device_hostname: print(f"Setting new hostname to: {new_hostname}") # Re-enable logging logging.disable(logging.NOTSET) return self.configure_unknown_device(device_ip, new_hostname) else: print("No valid hostname entered, skipping device") # Re-enable logging logging.disable(logging.NOTSET) return False finally: # Re-enable logging logging.disable(logging.NOTSET) else: self.logger.info(f"Device {device_name} does not have a toggle button") return self.configure_unknown_device(device_ip, device_hostname) except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}") return False else: self.logger.info(f"Processing normal device: {device_name}") # Create a temporary list with just this device temp_devices = [device_info] # Save to current.json temporarily current_config = {"tasmota": {"devices": temp_devices}} with open('current.json', 'w') as f: json.dump(current_config, f, indent=2) # Process the device self.get_device_details(use_current_json=True) return True def get_device_details(self, use_current_json=True): """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings. Filters out devices matching unknown_device_patterns. Implements retry logic for console commands with up to 3 attempts and tracks failures.""" self.logger.info("Starting to gather detailed device information...") device_details = [] try: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) all_devices = data.get('tasmota', {}).get('devices', []) self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return except json.JSONDecodeError: self.logger.error(f"Invalid JSON format in {source_file}") return # Filter out devices matching unknown_device_patterns devices = [] network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) for device in all_devices: name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() is_unknown = False for pattern in unknown_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): self.logger.debug(f"Skipping unknown device: {name} ({hostname})") is_unknown = True break if not is_unknown: devices.append(device) self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices") mqtt_config = self.config.get('mqtt', {}) if not mqtt_config: self.logger.error("MQTT configuration missing from config file") return def check_mqtt_settings(ip, name, mqtt_status): """Check and update MQTT settings if they don't match config""" # Get the base hostname (everything before the dash) hostname_base = name.split('-')[0] if '-' in name else name mqtt_fields = { "Host": mqtt_config.get('Host', ''), "Port": mqtt_config.get('Port', 1883), "User": mqtt_config.get('User', ''), "Password": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } device_mqtt = mqtt_status.get('MqttHost', {}) changes_needed = [] force_password_update = False # Check each MQTT setting if device_mqtt.get('Host') != mqtt_fields['Host']: changes_needed.append(('MqttHost', mqtt_fields['Host'])) self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") force_password_update = True if device_mqtt.get('Port') != mqtt_fields['Port']: changes_needed.append(('MqttPort', mqtt_fields['Port'])) self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") force_password_update = True if device_mqtt.get('User') != mqtt_fields['User']: changes_needed.append(('MqttUser', mqtt_fields['User'])) self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") force_password_update = True if device_mqtt.get('Topic') != mqtt_fields['Topic']: changes_needed.append(('Topic', mqtt_fields['Topic'])) self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") force_password_update = True if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") force_password_update = True # Add password update if any MQTT setting changed or user was updated if force_password_update: changes_needed.append(('MqttPassword', mqtt_fields['Password'])) self.logger.debug(f"{name}: MQTT Password will be updated") # Check NoRetain setting - FIXED: Use the actual value from config with default of False no_retain = mqtt_config.get('NoRetain', False) if no_retain: changes_needed.append(('SetOption62', '1')) # 1 = No Retain else: changes_needed.append(('SetOption62', '0')) # 0 = Use Retain # Apply changes if needed for setting, value in changes_needed: try: # For FullTopic, we need to avoid adding a space (%20) between the command and value if setting == "FullTopic": url = f"http://{ip}/cm?cmnd={setting}={value}" else: url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.debug(f"{name}: Updated {setting} to {value}") else: self.logger.debug(f"{name}: Updated MQTT Password") else: self.logger.error(f"{name}: Failed to update {setting}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error updating {setting}: {str(e)}") return len(changes_needed) > 0 for device in devices: if not isinstance(device, dict): self.logger.warning(f"Skipping invalid device entry: {device}") continue name = device.get('name', 'Unknown') ip = device.get('ip') mac = device.get('mac') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue self.logger.info(f"Checking device: {name} at {ip}") try: # Get Status 2 for firmware version url_status = f"http://{ip}/cm?cmnd=Status%202" response = requests.get(url_status, timeout=5) status_data = response.json() # Get Status 5 for network info url_network = f"http://{ip}/cm?cmnd=Status%205" response = requests.get(url_network, timeout=5) network_data = response.json() # Get Status 6 for MQTT info url_mqtt = f"http://{ip}/cm?cmnd=Status%206" response = requests.get(url_mqtt, timeout=5) mqtt_data = response.json() # Check and update MQTT settings if needed mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) # Set console parameters from config console_updated = False console_params = mqtt_config.get('console', {}) if console_params: self.logger.info(f"{name}: Setting console parameters from configuration") # Special handling for Retain parameters - need to send opposite state first, then final state # This is necessary because the changes are what create the update of the Retain state at the MQTT server retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] # Process Retain parameters first for param in retain_params: if param in console_params: try: final_value = console_params[param] # Set opposite state first opposite_value = "On" if final_value.lower() == "off" else "Off" # First command (opposite state) - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" success = False attempts = 0 max_attempts = 3 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") console_updated = True success = True else: self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.Timeout as e: self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.RequestException as e: self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(1) # Wait before retry if not success: self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}") # Track the failure for later reporting if not hasattr(self, 'command_failures'): self.command_failures = [] self.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {opposite_value}", "error": last_error }) # Small delay to ensure commands are processed in order time.sleep(0.5) # Second command (final state) - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{final_value}" success = False attempts = 0 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") console_updated = True success = True else: self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.Timeout as e: self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.RequestException as e: self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(1) # Wait before retry if not success: self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}") # Track the failure for later reporting if not hasattr(self, 'command_failures'): self.command_failures = [] self.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {final_value}", "error": last_error }) except Exception as e: self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}") # Track the failure for later reporting if not hasattr(self, 'command_failures'): self.command_failures = [] self.command_failures.append({ "device": name, "ip": ip, "command": f"{param} (both steps)", "error": str(e) }) # Process all other console parameters # Track rules that need to be enabled rules_to_enable = {} for param, value in console_params.items(): # Skip Retain parameters as they're handled specially above if param in retain_params: continue # Check if this is a rule definition (lowercase rule1, rule2, etc.) if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): # Store the rule number for later enabling rule_num = param[-1] rules_to_enable[rule_num] = True self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable") # Skip Rule1, Rule2, etc. if we're auto-enabling rules if param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit(): # If this is in the config, we'll respect it, but log that it's not needed self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature") # Regular console parameter - with retry logic url = f"http://{ip}/cm?cmnd={param}%20{value}" success = False attempts = 0 max_attempts = 3 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.debug(f"{name}: Set console parameter {param} to {value}") console_updated = True success = True else: self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.Timeout as e: self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.RequestException as e: self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(1) # Wait before retry if not success: self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}") # Track the failure for later reporting if not hasattr(self, 'command_failures'): self.command_failures = [] self.command_failures.append({ "device": name, "ip": ip, "command": f"{param} {value}", "error": last_error }) # Auto-enable any rules that were defined for rule_num in rules_to_enable: rule_enable_param = f"Rule{rule_num}" # Skip if the rule enable command was already in the config if any(p.lower() == rule_enable_param.lower() for p in console_params): continue # Rule auto-enabling - with retry logic url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" success = False attempts = 0 max_attempts = 3 last_error = None while not success and attempts < max_attempts: attempts += 1 try: response = requests.get(url, timeout=5) if response.status_code == 200: self.logger.info(f"{name}: Auto-enabled {rule_enable_param}") console_updated = True success = True else: self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})") last_error = f"HTTP {response.status_code}" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.Timeout as e: self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})") last_error = "Timeout" if attempts < max_attempts: time.sleep(1) # Wait before retry except requests.exceptions.RequestException as e: self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})") last_error = str(e) if attempts < max_attempts: time.sleep(1) # Wait before retry if not success: self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}") # Track the failure for later reporting if not hasattr(self, 'command_failures'): self.command_failures = [] self.command_failures.append({ "device": name, "ip": ip, "command": f"{rule_enable_param} 1", "error": last_error }) device_detail = { "name": name, "ip": ip, "mac": mac, "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), "mqtt_status": "Updated" if mqtt_updated else "Verified", "console_status": "Updated" if console_updated else "Verified", "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "online" } self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") device_detail = { "name": name, "ip": ip, "mac": mac, "version": "Unknown", "status": "offline", "error": str(e) } device_details.append(device_detail) time.sleep(0.5) # Save all device details at once try: with open('TasmotaDevices.json', 'w') as f: json.dump(device_details, f, indent=2) self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") except Exception as e: self.logger.error(f"Error saving device details: {e}") # Print summary of command failures if any occurred if hasattr(self, 'command_failures') and self.command_failures: failure_count = len(self.command_failures) print("\n" + "="*80) print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts") print("="*80) # Group failures by device for better readability failures_by_device = {} for failure in self.command_failures: device_name = failure['device'] if device_name not in failures_by_device: failures_by_device[device_name] = [] failures_by_device[device_name].append(failure) # Print failures grouped by device for device_name, failures in failures_by_device.items(): print(f"\nDevice: {device_name} ({failures[0]['ip']})") print("-" * 40) for i, failure in enumerate(failures, 1): print(f" {i}. Command: {failure['command']}") print(f" Error: {failure['error']}") print("\n" + "="*80) def main(): parser = argparse.ArgumentParser(description='Tasmota Device Manager') parser.add_argument('--config', default='network_configuration.json', help='Path to configuration file') parser.add_argument('--debug', action='store_true', help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing current.json') parser.add_argument('--process-unknown', action='store_true', help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT') parser.add_argument('--Device', help='Process a single device by hostname or IP address') args = parser.parse_args() # Set up logging log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') print("Starting Tasmota Device Discovery and Version Check...") # Create TasmotaDiscovery instance discovery = TasmotaDiscovery(debug=args.debug) discovery.load_config(args.config) try: # Process a single device if --Device parameter is provided if args.Device: print(f"Processing single device: {args.Device}") # Let process_single_device handle the UniFi client setup as needed success = discovery.process_single_device(args.Device) if success: print(f"\nDevice {args.Device} processed successfully!") print("- Detailed information saved to: TasmotaDevices.json") else: print(f"\nFailed to process device: {args.Device}") return 1 else: # Normal processing flow if not args.skip_unifi: print("Step 1: Discovering Tasmota devices...") discovery.setup_unifi_client() tasmota_devices = discovery.get_tasmota_devices() discovery.save_tasmota_config(tasmota_devices) else: print("Skipping UniFi discovery, using existing current.json...") if args.process_unknown: print("\nStep 2: Processing unknown devices...") discovery.process_unknown_devices() else: print("\nStep 2: Getting detailed version information...") discovery.get_device_details(use_current_json=True) print("\nProcess completed successfully!") print("- Device list saved to: current.json") print("- Detailed information saved to: TasmotaDevices.json") except ConnectionError as e: print(f"Connection Error: {str(e)}") print("\nTrying to proceed with existing current.json...") try: discovery.get_device_details(use_current_json=True) print("\nSuccessfully retrieved device details from existing current.json") except Exception as inner_e: print(f"Error processing existing devices: {str(inner_e)}") return 1 except Exception as e: print(f"Error: {str(e)}") if args.debug: import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': main()