import json import logging import os import sys from datetime import datetime from typing import Optional import requests from urllib3.exceptions import InsecureRequestWarning import re # Import the regular expression module import time import argparse # Disable SSL warnings requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class UnifiClient: def __init__(self, base_url, username, password, site_id, verify_ssl=True): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.site_id = site_id self.session = requests.Session() self.session.verify = verify_ssl # Initialize cookie jar self.session.cookies.clear() def _login(self) -> requests.Response: # Changed return type annotation """Authenticate with the UniFi Controller.""" login_url = f"{self.base_url}/api/auth/login" headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } payload = { "username": self.username, "password": self.password, "remember": False } try: response = self.session.post( login_url, json=payload, headers=headers ) response.raise_for_status() if 'X-CSRF-Token' in response.headers: self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] return response # Return the response object except requests.exceptions.RequestException as e: if hasattr(e, 'response') and e.response.status_code == 401: raise Exception("Authentication failed. Please verify your username and password.") from e raise def get_clients(self) -> list: """Get all clients from the UniFi Controller.""" # Try the newer API endpoint first url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If the newer endpoint fails, try the legacy endpoint url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" try: response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) except requests.exceptions.RequestException as e: # If both fail, try the v2 API endpoint url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" response = self.session.get(url) response.raise_for_status() return response.json().get('data', []) class TasmotaDiscovery: def __init__(self, debug: bool = False): """Initialize the TasmotaDiscovery with optional debug mode.""" log_level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) self.logger = logging.getLogger(__name__) self.config = None self.unifi_client = None def load_config(self, config_path: Optional[str] = None) -> dict: """Load configuration from JSON file.""" if config_path is None: config_path = os.path.join(os.path.dirname(__file__), 'config.json') self.logger.debug(f"Loading configuration from: {config_path}") try: with open(config_path, 'r') as config_file: self.config = json.load(config_file) self.logger.debug("Configuration loaded successfully from %s", config_path) return self.config except FileNotFoundError: self.logger.error(f"Configuration file not found at {config_path}") sys.exit(1) except json.JSONDecodeError: self.logger.error("Invalid JSON in configuration file") sys.exit(1) def setup_unifi_client(self): """Set up the UniFi client with better error handling""" self.logger.debug("Setting up UniFi client") if not self.config or 'unifi' not in self.config: raise ValueError("Missing UniFi configuration") unifi_config = self.config['unifi'] required_fields = ['host', 'username', 'password', 'site'] missing_fields = [field for field in required_fields if field not in unifi_config] if missing_fields: raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") try: self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") self.unifi_client = UnifiClient( base_url=unifi_config['host'], username=unifi_config['username'], password=unifi_config['password'], site_id=unifi_config['site'], verify_ssl=False # Add this if using self-signed certificates ) # Test the connection by making a simple request response = self.unifi_client._login() if not response: raise ConnectionError(f"Failed to connect to UniFi controller: No response") self.logger.debug("UniFi client setup successful") except Exception as e: self.logger.error(f"Error setting up UniFi client: {str(e)}") raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") def is_tasmota_device(self, device: dict) -> bool: """Determine if a device is a Tasmota device.""" name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() ip = device.get('ip', '') # Check if device is in the configured NoT network network_filters = self.config['unifi'].get('network_filter', {}) for network in network_filters.values(): if ip.startswith(network['subnet']): self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}") # First check exclusion patterns exclude_patterns = network.get('exclude_patterns', []) for pattern in exclude_patterns: pattern = pattern.lower() # Convert glob pattern to regex pattern pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") return False # If not excluded, check if it's a Tasmota device matches = any([ name.startswith('tasmota'), name.startswith('sonoff'), name.endswith('-ts'), hostname.startswith('tasmota'), hostname.startswith('sonoff'), hostname.startswith('esp-'), any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch']) ]) if matches: self.logger.debug(f"Found Tasmota device: {name}") return True # Consider all non-excluded devices in NoT network as potential Tasmota devices return False def get_tasmota_devices(self) -> list: """Query UniFi controller and filter Tasmota devices.""" devices = [] self.logger.debug("Querying UniFi controller for devices") try: all_clients = self.unifi_client.get_clients() self.logger.debug(f"Found {len(all_clients)} total devices") for device in all_clients: if self.is_tasmota_device(device): device_info = { "name": device.get('name', device.get('hostname', 'Unknown')), "ip": device.get('ip', ''), "mac": device.get('mac', ''), "last_seen": device.get('last_seen', ''), "hostname": device.get('hostname', ''), "notes": device.get('note', ''), } devices.append(device_info) self.logger.debug(f"Found {len(devices)} Tasmota devices") return devices except Exception as e: self.logger.error(f"Error getting devices from UniFi controller: {e}") return [] def save_tasmota_config(self, devices: list) -> None: """Save Tasmota device information to a JSON file with device tracking.""" filename = "current.json" self.logger.debug(f"Saving Tasmota configuration to {filename}") deprecated_filename = "deprecated.json" current_devices = [] deprecated_devices = [] # Load existing devices if file exists if os.path.exists(filename): try: with open(filename, 'r') as f: existing_config = json.load(f) current_devices = existing_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {filename}, treating as empty") current_devices = [] # Load deprecated devices if file exists if os.path.exists(deprecated_filename): try: with open(deprecated_filename, 'r') as f: deprecated_config = json.load(f) deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) except json.JSONDecodeError: self.logger.error(f"Error reading {deprecated_filename}, treating as empty") deprecated_devices = [] # Create new config new_devices = [] moved_to_deprecated = [] restored_from_deprecated = [] removed_from_deprecated = [] excluded_devices = [] # Check for excluded devices in current and deprecated lists network_filters = self.config['unifi'].get('network_filter', {}) exclude_patterns = [] for network in network_filters.values(): exclude_patterns.extend(network.get('exclude_patterns', [])) # Function to check if device is excluded def is_device_excluded(device_name: str, hostname: str = '') -> bool: name = device_name.lower() hostname = hostname.lower() for pattern in exclude_patterns: pattern = pattern.lower() pattern = pattern.replace('.', r'\.').replace('*', '.*') if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): return True return False # Process current devices for device in devices: device_name = device['name'] device_hostname = device.get('hostname', '') device_ip = device['ip'] device_mac = device['mac'] # Check if device should be excluded if is_device_excluded(device_name, device_hostname): print(f"Device {device_name} excluded by pattern - skipping") excluded_devices.append(device_name) continue # Check in current devices existing_device = next((d for d in current_devices if d['name'] == device_name), None) if existing_device: # Device exists, check if IP or MAC changed if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: moved_to_deprecated.append(existing_device) new_devices.append(device) print(f"Device {device_name} moved to deprecated (IP/MAC changed)") else: new_devices.append(existing_device) # Keep existing device else: # New device, check if it was in deprecated deprecated_device = next((d for d in deprecated_devices if d['name'] == device_name), None) if deprecated_device: removed_from_deprecated.append(device_name) print(f"Device {device_name} removed from deprecated (restored)") new_devices.append(device) print(f"Device {device_name} added to output file") # Find devices that are no longer present current_names = {d['name'] for d in devices} for existing_device in current_devices: if existing_device['name'] not in current_names: if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): moved_to_deprecated.append(existing_device) print(f"Device {existing_device['name']} moved to deprecated (no longer present)") # Update deprecated devices list, excluding any excluded devices final_deprecated = [] for device in deprecated_devices: if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): final_deprecated.append(device) elif is_device_excluded(device['name'], device.get('hostname', '')): print(f"Device {device['name']} removed from deprecated (excluded by pattern)") final_deprecated.extend(moved_to_deprecated) # Save new configuration config = { "tasmota": { "devices": new_devices, "generated_at": datetime.now().isoformat(), "total_devices": len(new_devices) } } # Save deprecated configuration deprecated_config = { "tasmota": { "devices": final_deprecated, "generated_at": datetime.now().isoformat(), "total_devices": len(final_deprecated) } } # Backup existing file if it exists if os.path.exists(filename): try: backup_name = f"{filename}.backup" os.rename(filename, backup_name) self.logger.info(f"Created backup of existing configuration as {backup_name}") except Exception as e: self.logger.error(f"Error creating backup: {e}") # Save files try: with open(filename, 'w') as f: json.dump(config, f, indent=4) with open(deprecated_filename, 'w') as f: json.dump(deprecated_config, f, indent=4) self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") print("\nDevice Status Summary:") if excluded_devices: print("\nExcluded Devices:") for name in excluded_devices: print(f"- {name}") if moved_to_deprecated: print("\nMoved to deprecated:") for device in moved_to_deprecated: print(f"- {device['name']}") if removed_from_deprecated: print("\nRestored from deprecated:") for name in removed_from_deprecated: print(f"- {name}") print("\nCurrent Tasmota Devices:") for device in new_devices: print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") except Exception as e: self.logger.error(f"Error saving configuration: {e}") def get_device_details(self, use_current_json=True): """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings""" self.logger.info("Starting to gather detailed device information...") device_details = [] try: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) devices = data.get('tasmota', {}).get('devices', []) self.logger.debug(f"Loaded {len(devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return except json.JSONDecodeError: self.logger.error(f"Invalid JSON format in {source_file}") return mqtt_config = self.config.get('mqtt', {}) if not mqtt_config: self.logger.error("MQTT configuration missing from config file") return def check_mqtt_settings(ip, name, mqtt_status): """Check and update MQTT settings if they don't match config""" # Get the base hostname (everything before the dash) hostname_base = name.split('-')[0] if '-' in name else name mqtt_fields = { "Host": mqtt_config.get('Host', ''), "Port": mqtt_config.get('Port', 1883), "User": mqtt_config.get('User', ''), "Password": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } device_mqtt = mqtt_status.get('MqttHost', {}) changes_needed = [] force_password_update = False # Check each MQTT setting if device_mqtt.get('Host') != mqtt_fields['Host']: changes_needed.append(('MqttHost', mqtt_fields['Host'])) self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") force_password_update = True if device_mqtt.get('Port') != mqtt_fields['Port']: changes_needed.append(('MqttPort', mqtt_fields['Port'])) self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") force_password_update = True if device_mqtt.get('User') != mqtt_fields['User']: changes_needed.append(('MqttUser', mqtt_fields['User'])) self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") force_password_update = True if device_mqtt.get('Topic') != mqtt_fields['Topic']: changes_needed.append(('Topic', mqtt_fields['Topic'])) self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") force_password_update = True if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") force_password_update = True # Add password update if any MQTT setting changed or user was updated if force_password_update: changes_needed.append(('MqttPassword', mqtt_fields['Password'])) self.logger.debug(f"{name}: MQTT Password will be updated") # Check NoRetain setting if mqtt_config.get('NoRetain', True): changes_needed.append(('SetOption62', '1')) # 1 = No Retain # Apply changes if needed for setting, value in changes_needed: try: url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.debug(f"{name}: Updated {setting} to {value}") else: self.logger.debug(f"{name}: Updated MQTT Password") else: self.logger.error(f"{name}: Failed to update {setting}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error updating {setting}: {str(e)}") return len(changes_needed) > 0 for device in devices: if not isinstance(device, dict): self.logger.warning(f"Skipping invalid device entry: {device}") continue name = device.get('name', 'Unknown') ip = device.get('ip') mac = device.get('mac') if not ip: self.logger.warning(f"Skipping device {name} - no IP address") continue self.logger.info(f"Checking device: {name} at {ip}") try: # Get Status 2 for firmware version url_status = f"http://{ip}/cm?cmnd=Status%202" response = requests.get(url_status, timeout=5) status_data = response.json() # Get Status 5 for network info url_network = f"http://{ip}/cm?cmnd=Status%205" response = requests.get(url_network, timeout=5) network_data = response.json() # Get Status 6 for MQTT info url_mqtt = f"http://{ip}/cm?cmnd=Status%206" response = requests.get(url_mqtt, timeout=5) mqtt_data = response.json() # Check and update MQTT settings if needed mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) device_detail = { "name": name, "ip": ip, "mac": mac, "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), "mqtt_status": "Updated" if mqtt_updated else "Verified", "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), "status": "online" } self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") except requests.exceptions.RequestException as e: self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") device_detail = { "name": name, "ip": ip, "mac": mac, "version": "Unknown", "status": "offline", "error": str(e) } device_details.append(device_detail) time.sleep(0.5) # Save all device details at once try: with open('TasmotaDevices.json', 'w') as f: json.dump(device_details, f, indent=2) self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") except Exception as e: self.logger.error(f"Error saving device details: {e}") def check_mqtt_settings(ip, name, mqtt_status): """Check and update MQTT settings if they don't match config""" # Get the base hostname (everything before the dash) hostname_base = name.split('-')[0] if '-' in name else name mqtt_fields = { "Host": mqtt_config.get('Host', ''), "Port": mqtt_config.get('Port', 1883), "User": mqtt_config.get('User', ''), "Password": mqtt_config.get('Password', ''), "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), } device_mqtt = mqtt_status.get('MqttHost', {}) changes_needed = [] force_password_update = False # Check each MQTT setting if device_mqtt.get('Host') != mqtt_fields['Host']: changes_needed.append(('MqttHost', mqtt_fields['Host'])) self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") force_password_update = True if device_mqtt.get('Port') != mqtt_fields['Port']: changes_needed.append(('MqttPort', mqtt_fields['Port'])) self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") force_password_update = True if device_mqtt.get('User') != mqtt_fields['User']: changes_needed.append(('MqttUser', mqtt_fields['User'])) self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") force_password_update = True if device_mqtt.get('Topic') != mqtt_fields['Topic']: changes_needed.append(('Topic', mqtt_fields['Topic'])) self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") force_password_update = True if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") force_password_update = True # Add password update if any MQTT setting changed or user was updated if force_password_update: changes_needed.append(('MqttPassword', mqtt_fields['Password'])) self.logger.debug(f"{name}: MQTT Password will be updated") # Check NoRetain setting if mqtt_config.get('NoRetain', True): changes_needed.append(('SetOption62', '1')) # 1 = No Retain # Apply changes if needed for setting, value in changes_needed: try: url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': self.logger.debug(f"{name}: Updated {setting} to {value}") else: self.logger.debug(f"{name}: Updated MQTT Password") else: self.logger.error(f"{name}: Failed to update {setting}") except requests.exceptions.RequestException as e: self.logger.error(f"{name}: Error updating {setting}: {str(e)}") return len(changes_needed) > 0 def main(): parser = argparse.ArgumentParser(description='Tasmota Device Manager') parser.add_argument('--config', default='network_configuration.json', help='Path to configuration file') parser.add_argument('--debug', action='store_true', help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing current.json') args = parser.parse_args() # Set up logging log_level = logging.DEBUG if args.debug else logging.INFO logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') print("Starting Tasmota Device Discovery and Version Check...") # Create TasmotaDiscovery instance discovery = TasmotaDiscovery(debug=args.debug) discovery.load_config(args.config) try: if not args.skip_unifi: print("Step 1: Discovering Tasmota devices...") discovery.setup_unifi_client() tasmota_devices = discovery.get_tasmota_devices() discovery.save_tasmota_config(tasmota_devices) else: print("Skipping UniFi discovery, using existing current.json...") print("\nStep 2: Getting detailed version information...") discovery.get_device_details(use_current_json=True) print("\nProcess completed successfully!") print("- Device list saved to: current.json") print("- Detailed information saved to: TasmotaDevices.json") except ConnectionError as e: print(f"Connection Error: {str(e)}") print("\nTrying to proceed with existing current.json...") try: discovery.get_device_details(use_current_json=True) print("\nSuccessfully retrieved device details from existing current.json") except Exception as inner_e: print(f"Error processing existing devices: {str(inner_e)}") return 1 except Exception as e: print(f"Error: {str(e)}") if args.debug: import traceback traceback.print_exc() return 1 return 0 if __name__ == '__main__': main()