"""Device discovery and filtering logic.""" import logging from typing import List, Dict, Optional, Tuple from utils import match_pattern, send_tasmota_command, get_hostname_base, get_data_file_path, save_json_file from unifi_client import UnifiClient class TasmotaDiscovery: """Handles discovery and filtering of Tasmota devices via UniFi.""" def __init__(self, config: dict, unifi_client: UnifiClient, logger: Optional[logging.Logger] = None): """ Initialize discovery handler. Args: config: Configuration dictionary unifi_client: Authenticated UniFi client logger: Optional logger instance """ self.config = config self.unifi_client = unifi_client self.logger = logger or logging.getLogger(__name__) def is_tasmota_device(self, device: dict) -> bool: """ Check if a device should be considered a Tasmota device. Args: device: Device dictionary from UniFi Returns: bool: True if device matches network filter and is not excluded """ device_ip = device.get('ip', '') device_name = device.get('name', device.get('hostname', '')) if not device_ip: return False # Check if device is in any configured network network_filters = self.config.get('unifi', {}).get('network_filter', {}) for network_name, network_config in network_filters.items(): subnet = network_config.get('subnet', '') # Check if IP is in this subnet if not device_ip.startswith(subnet): continue # Check if device is excluded if self.is_device_excluded(device, network_config): self.logger.debug(f"Device {device_name} ({device_ip}) is excluded") return False # Device is in network and not excluded return True return False def is_device_excluded(self, device: dict, network_config: dict) -> bool: """ Check if a device matches any exclusion patterns. Args: device: Device dictionary network_config: Network configuration with exclude_patterns Returns: bool: True if device should be excluded """ device_name = device.get('name', '') device_hostname = device.get('hostname', '') exclude_patterns = network_config.get('exclude_patterns', []) for pattern in exclude_patterns: if match_pattern(device_name, pattern) or match_pattern(device_hostname, pattern): return True return False def is_hostname_unknown(self, hostname: str, unknown_patterns: List[str]) -> bool: """ Check if a hostname matches unknown device patterns. Args: hostname: Hostname to check unknown_patterns: List of patterns for unknown devices Returns: bool: True if hostname matches any unknown pattern """ if not hostname: return False for pattern in unknown_patterns: if match_pattern(hostname, pattern): return True return False def get_device_hostname(self, ip: str, device_name: str, timeout: int = 5, log_level: str = 'debug') -> Tuple[Optional[str], bool]: """ Get the self-reported hostname from a Tasmota device. Args: ip: Device IP address device_name: Device name for logging timeout: Request timeout log_level: Logging level ('debug', 'info', etc.) Returns: Tuple of (hostname, success) """ if log_level == 'debug': self.logger.debug(f"Getting self-reported hostname for {device_name} at {ip}") result, success = send_tasmota_command(ip, "Status%205", timeout, self.logger) if success and result: hostname = result.get('StatusNET', {}).get('Hostname') if hostname: if log_level == 'debug': self.logger.debug(f"Self-reported hostname: {hostname}") return hostname, True return None, False def get_tasmota_devices(self) -> List[Dict]: """ Query UniFi controller and filter Tasmota devices. Returns: list: List of device info dictionaries """ devices = [] self.logger.debug("Querying UniFi controller for devices") try: all_clients = self.unifi_client.get_clients() self.logger.debug(f"Found {len(all_clients)} total devices") # Get unknown device patterns network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) for device in all_clients: if self.is_tasmota_device(device): # Determine connection type connection = "Unknown" if device.get('essid'): connection = f"Wireless - {device.get('essid')}" elif device.get('radio') or device.get('wifi'): connection = "Wireless" elif device.get('port') or device.get('switch_port') or device.get('switch'): connection = "Wired" device_name = device.get('name', device.get('hostname', 'Unknown')) device_hostname = device.get('hostname', '') device_ip = device.get('ip', '') # Check for UniFi hostname bug unifi_hostname_bug_detected = False device_reported_hostname = None unifi_name_matches_unknown = ( self.is_hostname_unknown(device_name, unknown_patterns) or self.is_hostname_unknown(device_hostname, unknown_patterns) ) if unifi_name_matches_unknown and device_ip: device_reported_hostname, success = self.get_device_hostname( device_ip, device_name, timeout=5 ) if success: # Check if self-reported hostname matches unknown patterns device_hostname_base = device_reported_hostname.split('-')[0].lower() device_hostname_matches_unknown = self.is_hostname_unknown( device_hostname_base, unknown_patterns ) if not device_hostname_matches_unknown: unifi_hostname_bug_detected = True self.logger.info( f"UniFi OS hostname bug detected for {device_name}: " f"self-reported hostname '{device_reported_hostname}' " f"doesn't match unknown patterns" ) device_info = { "name": device_name, "ip": device_ip, "mac": device.get('mac', ''), "last_seen": device.get('last_seen', ''), "hostname": device_hostname, "notes": device.get('note', ''), "connection": connection, "unifi_hostname_bug_detected": unifi_hostname_bug_detected } devices.append(device_info) self.logger.debug(f"Found {len(devices)} Tasmota devices") return devices except Exception as e: self.logger.error(f"Error getting devices from UniFi controller: {e}") raise def save_tasmota_config(self, devices: List[Dict], previous_data: Optional[Dict] = None): """ Save current devices and track changes. Args: devices: List of current devices previous_data: Previously saved device data """ current_file = get_data_file_path('current.json') deprecated_file = get_data_file_path('deprecated.json') # Save current devices save_json_file(current_file, devices, self.logger) # Track deprecated devices if previous_data: current_ips = {d['ip'] for d in devices} deprecated = [d for d in previous_data if d.get('ip') not in current_ips] if deprecated: self.logger.info(f"Found {len(deprecated)} deprecated devices") save_json_file(deprecated_file, deprecated, self.logger) def get_unknown_devices(self, devices: List[Dict]) -> List[Dict]: """ Filter devices to find those matching unknown patterns. Args: devices: List of all Tasmota devices Returns: list: Devices matching unknown patterns """ network_filters = self.config['unifi'].get('network_filter', {}) unknown_patterns = [] for network in network_filters.values(): unknown_patterns.extend(network.get('unknown_device_patterns', [])) unknown_devices = [] for device in devices: device_name = device.get('name', '') device_hostname = device.get('hostname', '') if (self.is_hostname_unknown(device_name, unknown_patterns) or self.is_hostname_unknown(device_hostname, unknown_patterns)): unknown_devices.append(device) return unknown_devices def is_ip_in_network_filter(self, ip: str) -> bool: """ Check if an IP address is in any configured network filter. Args: ip: IP address to check Returns: bool: True if IP is in a configured network """ network_filters = self.config.get('unifi', {}).get('network_filter', {}) for network_config in network_filters.values(): subnet = network_config.get('subnet', '') if ip.startswith(subnet): return True return False