diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..3e42dac --- /dev/null +++ b/config.yaml @@ -0,0 +1,36 @@ +unifi: + type: UDMSE # Unifi Dream Machine SE + host: https://192.168.6.1 + username: Tasmota + password: TasmotaManager12!@ + port: 8443 # Default Unifi Controller port + site: "default" # Site ID for Unifi Controller + API_Key: "nIfTdZAXVUGQgyNqsATluTja-noaNLAk" # API Key for Unifi Controller + + network_filter: + NoT_network: + name: "NoT" + # Updated to support multiple subnets + subnets: + - "192.168.5" # Main network + - "192.168.7" # IoT network + - "192.168.8" # Tasmota network + - "192.168.9" # Camera network + exclude_patterns: + - "homeassistant*" + - "*sonos*" + default_name_patterns: + - "tasmota*" + - "ESP-*" + +tasmota: + mqtt_settings: + host: "homeassistant.NoT.mgeppert.com" + port: 1883 + user: "mgeppert" + password: "mgeppert" + topic: "%hostname_base%" + full_topic: "%prefix%/%topic%/" + no_retain: false + +other_settings: # Yet to be defined \ No newline at end of file diff --git a/discover_devices.py b/discover_devices.py new file mode 100644 index 0000000..baafa98 --- /dev/null +++ b/discover_devices.py @@ -0,0 +1,869 @@ +#!/usr/bin/env python3 +""" +Tasmota Device Discovery Script + +This script implements the Device Discovery process for Tasmota devices on a network. +It connects to a Unifi Switch, retrieves a list of connected devices, and filters for +potential Tasmota devices based on network_filter information in the config file. + +Usage: + python discover_devices.py [options] + +Options: + -h, --help Show this help message and exit + -d, --debug Enable debug mode with verbose logging + -c, --config Specify a custom config file path (default: config.yaml) +""" + +import argparse +import yaml +import logging +import os +import sys +import re +import json +from typing import Dict, List, Any, Optional, Tuple +import requests +from urllib3.exceptions import InsecureRequestWarning + +# Suppress only the single InsecureRequestWarning from urllib3 +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +# Set up logging +logger = logging.getLogger("tasmota_discovery") + +# File paths +VALID_HOSTNAMES_FILE = "valid_hostnames.json" +DEFAULT_HOSTNAMES_FILE = "default_hostnames.json" +DEPRECATED_HOSTNAMES_FILE = "deprecated_hostnames.json" +UNIDENTIFIED_DEVICES_FILE = "unidentified_devices.json" + +class UnifiClient: + """Client for interacting with the Unifi Controller API.""" + + def __init__(self, config: Dict[str, Any], debug: bool = False): + """ + Initialize the Unifi client with configuration. + + Args: + config: Dictionary containing Unifi configuration + debug: Whether to enable debug mode + """ + self.host = config.get('host') + self.port = config.get('port', 8443) + self.type = config.get('type', 'UDMSE') + self.site_id = config.get('site', 'default') # Site ID parameter from config + self.api_key = config.get('API_Key') # API Key for authentication + self.debug = debug + self.session = requests.Session() + self.base_url = self.host # Using host without port number as per issue requirement + self.is_authenticated = False + self.available_sites = [] # Will store available sites from the controller + self.application_version = "Unknown" # Will store the application version + + # Check for required configuration parameters + if not self.host: + raise ValueError("Missing required Unifi host parameter") + + # Set up API key authentication + if self.api_key: + logger.debug("API Key authentication will be used") + # Add API key to session headers + self.session.headers.update({ + 'X-API-KEY': self.api_key, + 'Accept': 'application/json' + }) + self.is_authenticated = True # With API key, we're pre-authenticated + + # Retrieve and validate the site ID + self.retrieve_available_sites() + + # Get the application version + self.get_application_version() + else: + raise ValueError("Missing required Unifi API_Key parameter") + + + def retrieve_available_sites(self) -> bool: + """ + Retrieve available sites from the Unifi Controller and validate the configured site ID. + + Returns: + bool: True if site ID is valid, False otherwise + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return False + + try: + # Try different API endpoints for retrieving sites + endpoints = [ + "/proxy/network/integration/v1/sites", # Integration API endpoint + "/proxy/network/api/self/sites", # Newer API endpoint + "/api/self/sites", # Legacy API endpoint + "/v2/api/sites" # v2 API endpoint + ] + + logger.info("Attempting to retrieve available sites from Unifi Controller") + + for endpoint in endpoints: + sites_url = f"{self.base_url}{endpoint}" + logger.debug(f"Trying sites API endpoint: {sites_url}") + + try: + response = self.session.get(sites_url, verify=False, timeout=5) + + if response.status_code == 200: + sites_data = response.json() + + # Handle different response formats + if isinstance(sites_data, list): + self.available_sites = sites_data + elif isinstance(sites_data, dict) and 'data' in sites_data: + self.available_sites = sites_data.get('data', []) + else: + logger.debug(f"Unexpected response format from sites API endpoint: {sites_url}") + continue + + # Log the available sites + site_names = [site.get('name', site.get('desc', 'Unknown')) for site in self.available_sites] + site_ids = [site.get('id', site.get('name', 'Unknown')) for site in self.available_sites] + + logger.info(f"Retrieved {len(self.available_sites)} sites from Unifi Controller") + logger.debug(f"Available sites: {', '.join(site_names)}") + logger.debug(f"Available site IDs: {', '.join(site_ids)}") + + # Validate the configured site ID + valid_site = False + for site in self.available_sites: + site_id = site.get('id', site.get('name', '')) + if site_id == self.site_id: + valid_site = True + site_name = site.get('name', site.get('desc', 'Unknown')) + logger.info(f"Configured site ID '{self.site_id}' is valid (Site name: {site_name})") + break + + if not valid_site: + logger.warning(f"Configured site ID '{self.site_id}' not found in available sites") + if self.site_id == 'default' and len(self.available_sites) > 0: + # Try to use the first available site as default + first_site = self.available_sites[0] + self.site_id = first_site.get('id', first_site.get('name', 'default')) + site_name = first_site.get('name', first_site.get('desc', 'Unknown')) + logger.info(f"Using first available site as default: '{self.site_id}' (Site name: {site_name})") + valid_site = True + + return valid_site + else: + logger.debug(f"Sites API endpoint failed: {sites_url} - Status code: {response.status_code}") + + except Exception as e: + logger.debug(f"Error with sites API endpoint {sites_url}: {str(e)}") + + logger.error("Failed to retrieve available sites from Unifi Controller") + return False + + except Exception as e: + logger.error(f"Error retrieving available sites from Unifi Controller: {str(e)}") + return False + + def get_application_version(self) -> str: + """ + Retrieve the application version from the Unifi Controller. + + Returns: + str: The application version or hardware/model information if version not found + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return "Unknown" + + try: + # Try different API endpoints for retrieving application version + endpoints = [ + "/proxy/network/integration/v1/info", # Integration API info endpoint (from issue description) + "/proxy/network/api/s/{site_id}/status", # Site-specific status endpoint + "/proxy/network/api/status", # Network status endpoint + "/api/system", # System info endpoint + "/proxy/network/api/system", # Network system endpoint + "/v1/api/system/info", # v1 system info endpoint + "/proxy/protect/api/system", # Protect system endpoint + "/proxy/network/v1/api/system/info", # Network v1 system info + "/proxy/network/api/s/{site_id}/stat/sysinfo", # Site-specific system info + "/api/s/{site_id}/stat/sysinfo" # Legacy site-specific system info + ] + + logger.info("Attempting to retrieve application version from Unifi Controller") + + # Variables to store fallback information if no explicit version is found + hardware_info = None + model_name = None + + for endpoint in endpoints: + # Replace {site_id} placeholder with actual site_id if present + current_endpoint = endpoint.replace("{site_id}", self.site_id) + version_url = f"{self.base_url}{current_endpoint}" + logger.debug(f"Trying version API endpoint: {version_url}") + + try: + response = self.session.get(version_url, verify=False, timeout=5) + + if response.status_code == 200: + try: + version_data = response.json() + logger.debug(f"Response from {current_endpoint}: {version_data}") + + # Handle different response formats + # Try common paths where version information might be found + version = None + + # Check for version in meta.version + if isinstance(version_data, dict) and 'meta' in version_data and 'version' in version_data['meta']: + version = version_data['meta']['version'] + + # Check for version in data.version + elif isinstance(version_data, dict) and 'data' in version_data: + data = version_data['data'] + if isinstance(data, list) and len(data) > 0 and 'version' in data[0]: + version = data[0]['version'] + elif isinstance(data, dict) and 'version' in data: + version = data['version'] + + # Check for version directly in the response + elif isinstance(version_data, dict) and 'version' in version_data: + version = version_data['version'] + + # Check for firmware_version + elif isinstance(version_data, dict) and 'firmware_version' in version_data: + version = version_data['firmware_version'] + + # Check for firmware + elif isinstance(version_data, dict) and 'firmware' in version_data: + version = version_data['firmware'] + + # Check for controller.version + elif isinstance(version_data, dict) and 'controller' in version_data and 'version' in version_data['controller']: + version = version_data['controller']['version'] + + # Check for applicationVersion (from issue description) + elif isinstance(version_data, dict) and 'applicationVersion' in version_data: + version = version_data['applicationVersion'] + logger.debug(f"Found applicationVersion in response: {version}") + + # Store hardware and model information as fallback + if isinstance(version_data, dict): + # Check for hardware information + if 'hardware' in version_data and isinstance(version_data['hardware'], dict) and 'shortname' in version_data['hardware']: + hardware_info = version_data['hardware']['shortname'] + + # Check for model/name information + if 'name' in version_data: + model_name = version_data['name'] + + if version: + self.application_version = str(version) + logger.info(f"Retrieved application version from Unifi Controller: {self.application_version}") + return self.application_version + + except ValueError as json_error: + logger.debug(f"Invalid JSON response from endpoint: {version_url} - Error: {str(json_error)}") + # Try to extract version from raw response if JSON parsing fails + try: + raw_response = response.text + logger.debug(f"Raw response from {current_endpoint}: {raw_response[:200]}...") # Log first 200 chars + + # Look for version patterns in raw response + import re + version_match = re.search(r'version["\']?\s*:\s*["\']([^"\']+)["\']', raw_response, re.IGNORECASE) + if version_match: + version = version_match.group(1) + self.application_version = str(version) + logger.info(f"Retrieved application version from raw response: {self.application_version}") + return self.application_version + except Exception as raw_error: + logger.debug(f"Error processing raw response: {str(raw_error)}") + else: + logger.debug(f"Version API endpoint failed: {version_url} - Status code: {response.status_code}") + + except Exception as e: + logger.debug(f"Error with version API endpoint {version_url}: {str(e)}") + + # If we have hardware or model information, use that as a fallback + if hardware_info and model_name: + fallback_version = f"{hardware_info} - {model_name}" + self.application_version = fallback_version + logger.info(f"Using hardware and model information as version: {fallback_version}") + return fallback_version + elif hardware_info: + self.application_version = hardware_info + logger.info(f"Using hardware information as version: {hardware_info}") + return hardware_info + elif model_name: + self.application_version = model_name + logger.info(f"Using model name as version: {model_name}") + return model_name + + logger.warning("Failed to retrieve application version from Unifi Controller") + return "Unknown" + + except Exception as e: + logger.error(f"Error retrieving application version from Unifi Controller: {str(e)}") + return "Unknown" + + def get_devices(self) -> List[Dict[str, Any]]: + """ + Retrieve list of all connected devices from the Unifi Controller using the specified + site-specific integration API endpoint. Handles pagination to retrieve all devices. + + Returns: + List of dictionaries containing device information + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return [] + + try: + logger.info("Retrieving devices from Unifi Controller using site-specific integration API endpoint") + + # Ensure we have a valid site ID + if not self.available_sites: + logger.debug("No available sites found, attempting to retrieve sites") + if not self.retrieve_available_sites(): + logger.error("Failed to retrieve and validate site ID, cannot use site-specific endpoint") + return [] + + # Use the specific site-specific integration API endpoint from the issue description + # Format: /proxy/network/integration/v1/sites/{site_id}/clients + site_integration_url = f"{self.base_url}/proxy/network/integration/v1/sites/{self.site_id}/clients" + logger.debug(f"Using site-specific integration API endpoint: {site_integration_url}") + + # Initialize variables for pagination + all_devices = [] + offset = 0 + limit = 100 # Increase the limit to reduce the number of API calls + total_count = None + + # Loop until we've retrieved all devices + while True: + try: + # Add pagination parameters to the URL + paginated_url = f"{site_integration_url}?offset={offset}&limit={limit}" + logger.debug(f"Retrieving page with offset={offset}, limit={limit}") + + response = self.session.get(paginated_url, verify=False, timeout=10) # Increased timeout for larger responses + + if response.status_code == 200: + response_data = response.json() + + # Based on the test results, the response is a dictionary with keys: offset, limit, count, totalCount, data + # The 'data' field contains the actual client information + if isinstance(response_data, dict): + # Extract pagination information + current_offset = response_data.get('offset', 0) + current_limit = response_data.get('limit', 0) + current_count = response_data.get('count', 0) + + # Set total_count if not already set + if total_count is None: + total_count = response_data.get('totalCount', 0) + logger.info(f"Total number of devices: {total_count}") + + if 'data' in response_data: + devices = response_data.get('data', []) + logger.info(f"Retrieved {len(devices)} devices from page {offset//limit + 1} (offset={offset}, limit={limit})") + + # Log the first device for debugging (if available and first page) + if len(devices) > 0 and offset == 0: + logger.debug(f"First device sample: {devices[0]}") + logger.debug(f"Keys in first device: {', '.join(devices[0].keys())}") + + # Add devices to the all_devices list + all_devices.extend(devices) + + # Update offset for next page + offset += current_count + + # Check if we've retrieved all devices + if offset >= total_count or current_count == 0: + logger.info(f"Retrieved all {len(all_devices)} devices") + break + else: + logger.error("No 'data' field in response") + break + elif isinstance(response_data, list): + devices = response_data + logger.info(f"Retrieved {len(devices)} devices from Unifi Controller (direct list)") + + # Log the first device for debugging (if available and first page) + if len(devices) > 0 and offset == 0: + logger.debug(f"First device sample: {devices[0]}") + + # Add devices to the all_devices list + all_devices.extend(devices) + + # Since we don't have pagination information, we can't continue + break + else: + logger.error(f"Unexpected response format: {type(response_data)}") + logger.debug(f"Response content: {response_data}") + break + else: + logger.error(f"Site-specific integration API endpoint failed: {response.status_code}") + try: + error_content = response.text + logger.debug(f"Error response content: {error_content}") + except Exception: + pass + break + except Exception as e: + logger.error(f"Error with site-specific integration API endpoint: {str(e)}") + break + + # Transform the response to match the expected format for our application + logger.info(f"Transforming {len(all_devices)} devices to application format") + transformed_devices = [] + for device in all_devices: + transformed_device = { + 'hostname': device.get('name', ''), + 'ip': device.get('ipAddress', ''), + 'mac': device.get('macAddress', ''), + 'status': 'connected' if device.get('connectedAt') else 'disconnected', + 'id': device.get('id', ''), + 'type': device.get('type', ''), + 'connected_at': device.get('connectedAt', ''), + 'uplink_device_id': device.get('uplinkDeviceId', '') + } + transformed_devices.append(transformed_device) + + return transformed_devices + + except Exception as e: + logger.error(f"Error in get_devices method: {str(e)}") + return [] + + +class DeviceDiscovery: + """Main class for Tasmota device discovery.""" + + def __init__(self, config_path: str, debug: bool = False): + """ + Initialize the device discovery with configuration. + + Args: + config_path: Path to the configuration file + debug: Whether to enable debug mode + """ + self.config_path = config_path + self.debug = debug + self.config = self._load_config() + self.unifi_client = UnifiClient(self.config.get('unifi', {}), debug) + + def _load_config(self) -> Dict[str, Any]: + """ + Load configuration from YAML file. + + Returns: + Dictionary containing configuration + """ + try: + with open(self.config_path, 'r') as file: + config = yaml.safe_load(file) + logger.debug(f"Loaded configuration from {self.config_path}") + return config + except Exception as e: + logger.error(f"Error loading configuration from {self.config_path}: {str(e)}") + sys.exit(1) + + def _match_pattern(self, hostname: str, patterns: List[str]) -> bool: + """ + Check if hostname matches any of the given patterns. + + Args: + hostname: Hostname to check + patterns: List of glob patterns to match against + + Returns: + True if hostname matches any pattern, False otherwise + """ + if not hostname or not patterns: + return False + + for pattern in patterns: + # Convert glob pattern to regex + regex_pattern = pattern.replace("*", ".*") + if re.match(f"^{regex_pattern}$", hostname, re.IGNORECASE): + return True + return False + + def filter_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Filter devices based on network_filter criteria. + + Args: + devices: List of devices from Unifi Controller + + Returns: + Tuple of (included_devices, excluded_devices, unidentified_devices) + """ + included_devices = [] + excluded_devices = [] + unidentified_devices = [] + + # Track devices that have been processed to avoid duplicates + processed_devices = set() + + network_filters = self.config.get('unifi', {}).get('network_filter', {}) + + for network_name, network_config in network_filters.items(): + # Support both old (subnet) and new (subnets) configuration formats + subnet = network_config.get('subnet') + subnets = network_config.get('subnets', []) + + # If subnet is specified but subnets is not, add subnet to subnets for backward compatibility + if subnet and not subnets: + subnets = [subnet] + + exclude_patterns = network_config.get('exclude_patterns', []) + + logger.debug(f"Filtering devices for network {network_name} with subnets {subnets}") + + for device in devices: + # Skip devices that have already been processed + device_id = device.get('id') + if device_id in processed_devices: + continue + + ip = device.get('ip') + hostname = device.get('hostname', '') + + # Check if device is in any of the specified subnets + in_subnet = False + matching_subnet = None + + if ip: + for subnet_prefix in subnets: + if ip.startswith(subnet_prefix): + in_subnet = True + matching_subnet = subnet_prefix + break + + if not in_subnet: + # Add to unidentified devices if not in any of the specified subnets + logger.debug(f"Unidentified device {hostname} ({ip}) - not in any configured subnet") + unidentified_devices.append(device) + processed_devices.add(device_id) + continue + + # Check if device should be excluded + if self._match_pattern(hostname, exclude_patterns): + logger.debug(f"Excluding device {hostname} ({ip}) - matches exclude pattern") + excluded_devices.append(device) + else: + logger.debug(f"Including device {hostname} ({ip}) - matched subnet {matching_subnet}") + included_devices.append(device) + + # Mark device as processed + processed_devices.add(device_id) + + logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})") + return included_devices, excluded_devices, unidentified_devices + + def classify_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Classify devices into valid hostname and default hostname groups. + + Args: + devices: List of filtered devices + + Returns: + Tuple of (valid_hostname_devices, default_hostname_devices) + """ + valid_hostname_devices = [] + default_hostname_devices = [] + + network_filters = self.config.get('unifi', {}).get('network_filter', {}) + + for device in devices: + hostname = device.get('hostname', '') + + # Check if hostname matches any default name pattern + is_default = False + for network_name, network_config in network_filters.items(): + default_patterns = network_config.get('default_name_patterns', []) + if self._match_pattern(hostname, default_patterns): + logger.debug(f"Device {hostname} matches default name pattern") + default_hostname_devices.append(device) + is_default = True + break + + if not is_default: + logger.debug(f"Device {hostname} has valid hostname") + valid_hostname_devices.append(device) + + logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default") + return valid_hostname_devices, default_hostname_devices + + def process_existing_files(self, valid_devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Process existing hostname files and update them based on current devices. + + Args: + valid_devices: List of devices with valid hostnames + + Returns: + Tuple of (updated_valid_devices, deprecated_devices) + """ + existing_valid_devices = [] + deprecated_devices = [] + + # Load existing valid hostnames if file exists + if os.path.exists(VALID_HOSTNAMES_FILE): + try: + with open(VALID_HOSTNAMES_FILE, 'r') as file: + existing_valid_devices = json.load(file) + logger.debug(f"Loaded {len(existing_valid_devices)} existing valid hostnames") + except Exception as e: + logger.error(f"Error loading existing valid hostnames: {str(e)}") + existing_valid_devices = [] + + # Delete deprecated file if it exists + if os.path.exists(DEPRECATED_HOSTNAMES_FILE): + try: + os.remove(DEPRECATED_HOSTNAMES_FILE) + logger.debug("Deleted existing deprecated hostnames file") + except Exception as e: + logger.error(f"Error deleting deprecated hostnames file: {str(e)}") + + if not existing_valid_devices: + # If no existing valid hostnames, just return the current valid devices + return valid_devices, [] + + # Create lookup dictionaries for faster processing + current_devices_by_hostname = {device.get('hostname'): device for device in valid_devices if device.get('hostname')} + existing_devices_by_hostname = {device.get('hostname'): device for device in existing_valid_devices if device.get('hostname')} + + updated_valid_devices = [] + + # Process current devices + for hostname, device in current_devices_by_hostname.items(): + if hostname in existing_devices_by_hostname: + # Device exists in both current and existing lists + existing_device = existing_devices_by_hostname[hostname] + + # Check if any fields have changed + changed_fields = [] + for key, value in device.items(): + if key in existing_device and existing_device[key] != value: + changed_fields.append(key) + + if changed_fields: + logger.info(f"Device {hostname} has changed fields: {', '.join(changed_fields)}") + + # Use the current device data + updated_valid_devices.append(device) + else: + # New device not in existing list + logger.info(f"New device found: {hostname}") + updated_valid_devices.append(device) + + # Find deprecated devices (in existing but not in current) + for hostname, device in existing_devices_by_hostname.items(): + if hostname not in current_devices_by_hostname: + logger.info(f"Device {hostname} is no longer available, marking as deprecated") + deprecated_devices.append(device) + + return updated_valid_devices, deprecated_devices + + def save_device_files(self, valid_devices: List[Dict[str, Any]], default_devices: List[Dict[str, Any]], + deprecated_devices: List[Dict[str, Any]]) -> None: + """ + Save device information to respective files. + + Args: + valid_devices: List of devices with valid hostnames + default_devices: List of devices with default hostnames + deprecated_devices: List of devices that are deprecated + """ + try: + with open(VALID_HOSTNAMES_FILE, 'w') as file: + json.dump(valid_devices, file, indent=2) + logger.info(f"Saved {len(valid_devices)} valid hostnames to {VALID_HOSTNAMES_FILE}") + + with open(DEFAULT_HOSTNAMES_FILE, 'w') as file: + json.dump(default_devices, file, indent=2) + logger.info(f"Saved {len(default_devices)} default hostnames to {DEFAULT_HOSTNAMES_FILE}") + + if deprecated_devices: + with open(DEPRECATED_HOSTNAMES_FILE, 'w') as file: + json.dump(deprecated_devices, file, indent=2) + logger.info(f"Saved {len(deprecated_devices)} deprecated hostnames to {DEPRECATED_HOSTNAMES_FILE}") + except Exception as e: + logger.error(f"Error saving device files: {str(e)}") + + def save_unidentified_devices(self, unidentified_devices: List[Dict[str, Any]]) -> None: + """ + Save unidentified devices to a file when debug mode is enabled. + + Args: + unidentified_devices: List of devices that were not identified during filtering + """ + if not self.debug: + return + + if not unidentified_devices: + logger.debug("No unidentified devices to save") + return + + try: + with open(UNIDENTIFIED_DEVICES_FILE, 'w') as file: + json.dump(unidentified_devices, file, indent=2) + logger.info(f"Saved {len(unidentified_devices)} unidentified devices to {UNIDENTIFIED_DEVICES_FILE}") + except Exception as e: + logger.error(f"Error saving unidentified devices file: {str(e)}") + + def discover(self) -> None: + """ + Run the device discovery process. + + This method retrieves devices from the Unifi Controller using the site-specific + integration API endpoint, filters and classifies them, and then processes them + against existing device files. + """ + logger.info("Starting Tasmota device discovery") + + # Get devices from Unifi Controller using the site-specific integration API endpoint + devices = self.unifi_client.get_devices() + + if not devices: + logger.warning("No devices retrieved from Unifi Controller") + + # Try to use existing valid_hostnames.json file as fallback + if os.path.exists(VALID_HOSTNAMES_FILE): + try: + with open(VALID_HOSTNAMES_FILE, 'r') as file: + valid_hostname_devices = json.load(file) + logger.info(f"Loaded {len(valid_hostname_devices)} devices from {VALID_HOSTNAMES_FILE} as fallback") + + # Process existing files and update device lists + updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices) + + # Initialize default_hostname_devices as an empty list + default_hostname_devices = [] + + # Save device information to files + self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices) + + logger.info("Device discovery completed successfully using fallback method") + return + except Exception as e: + logger.error(f"Error loading {VALID_HOSTNAMES_FILE}: {str(e)}") + logger.warning("Cannot proceed with device discovery") + return + else: + logger.warning("No valid hostnames file found for fallback") + logger.warning("Cannot proceed with device discovery") + return + + logger.info(f"Retrieved {len(devices)} devices from Unifi Controller") + + # Filter devices based on network_filter criteria + included_devices, excluded_devices, unidentified_devices = self.filter_devices(devices) + + # Save unidentified devices to a file when debug mode is enabled + self.save_unidentified_devices(unidentified_devices) + + if not included_devices: + logger.warning("No devices match the filter criteria") + return + + logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})") + + # Classify devices into valid and default hostname groups + valid_hostname_devices, default_hostname_devices = self.classify_devices(included_devices) + + logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default") + + # Process existing files and update device lists + updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices) + + # Save device information to files + self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices) + + logger.info("Device discovery completed successfully") + + +def setup_logging(debug: bool) -> None: + """ + Set up logging configuration. + + Args: + debug: Whether to enable debug mode + """ + log_level = logging.DEBUG if debug else logging.INFO + log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + # Create logs directory if it doesn't exist + os.makedirs('logs', exist_ok=True) + + # Configure file handler + file_handler = logging.FileHandler('logs/discovery.log') + file_handler.setLevel(log_level) + file_handler.setFormatter(logging.Formatter(log_format)) + + # Configure console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(log_level) + console_handler.setFormatter(logging.Formatter(log_format)) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Configure tasmota_discovery logger + logger.setLevel(log_level) + + if debug: + logger.debug("Debug mode enabled") + + +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + Parsed arguments + """ + parser = argparse.ArgumentParser(description='Tasmota Device Discovery') + parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode with verbose logging') + parser.add_argument('-c', '--config', default='config.yaml', help='Specify a custom config file path') + return parser.parse_args() + + +def main() -> None: + """ + Main entry point for the script. + """ + args = parse_arguments() + + # Set up logging + setup_logging(args.debug) + + try: + # Initialize device discovery + discovery = DeviceDiscovery(args.config, args.debug) + + # Run discovery process + discovery.discover() + + except Exception as e: + logger.error(f"Error during device discovery: {str(e)}") + if args.debug: + import traceback + logger.debug(traceback.format_exc()) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file