TasmotaManager/discovery.py
Mike Geppert 9c22168f79 Refactor: Split TasmotaManager into modular structure
- Created modular Python files (main, utils, discovery, configuration, console_settings, unknown_devices, reporting, unifi_client)
- Moved documentation files to docs/
- Moved data files to data/
- Removed old monolithic TasmotaManager.py and TasmotaManager_fixed.py
- Updated .gitignore and pyproject.toml
- All functionality preserved, command-line interface unchanged
Version: 2.0.0
2025-10-29 16:38:03 +00:00

282 lines
11 KiB
Python

"""Device discovery and filtering logic."""
import logging
from typing import List, Dict, Optional, Tuple
from utils import match_pattern, send_tasmota_command, get_hostname_base, get_data_file_path, save_json_file
from unifi_client import UnifiClient
class TasmotaDiscovery:
"""Handles discovery and filtering of Tasmota devices via UniFi."""
def __init__(self, config: dict, unifi_client: UnifiClient,
logger: Optional[logging.Logger] = None):
"""
Initialize discovery handler.
Args:
config: Configuration dictionary
unifi_client: Authenticated UniFi client
logger: Optional logger instance
"""
self.config = config
self.unifi_client = unifi_client
self.logger = logger or logging.getLogger(__name__)
def is_tasmota_device(self, device: dict) -> bool:
"""
Check if a device should be considered a Tasmota device.
Args:
device: Device dictionary from UniFi
Returns:
bool: True if device matches network filter and is not excluded
"""
device_ip = device.get('ip', '')
device_name = device.get('name', device.get('hostname', ''))
if not device_ip:
return False
# Check if device is in any configured network
network_filters = self.config.get('unifi', {}).get('network_filter', {})
for network_name, network_config in network_filters.items():
subnet = network_config.get('subnet', '')
# Check if IP is in this subnet
if not device_ip.startswith(subnet):
continue
# Check if device is excluded
if self.is_device_excluded(device, network_config):
self.logger.debug(f"Device {device_name} ({device_ip}) is excluded")
return False
# Device is in network and not excluded
return True
return False
def is_device_excluded(self, device: dict, network_config: dict) -> bool:
"""
Check if a device matches any exclusion patterns.
Args:
device: Device dictionary
network_config: Network configuration with exclude_patterns
Returns:
bool: True if device should be excluded
"""
device_name = device.get('name', '')
device_hostname = device.get('hostname', '')
exclude_patterns = network_config.get('exclude_patterns', [])
for pattern in exclude_patterns:
if match_pattern(device_name, pattern) or match_pattern(device_hostname, pattern):
return True
return False
def is_hostname_unknown(self, hostname: str, unknown_patterns: List[str]) -> bool:
"""
Check if a hostname matches unknown device patterns.
Args:
hostname: Hostname to check
unknown_patterns: List of patterns for unknown devices
Returns:
bool: True if hostname matches any unknown pattern
"""
if not hostname:
return False
for pattern in unknown_patterns:
if match_pattern(hostname, pattern):
return True
return False
def get_device_hostname(self, ip: str, device_name: str,
timeout: int = 5, log_level: str = 'debug') -> Tuple[Optional[str], bool]:
"""
Get the self-reported hostname from a Tasmota device.
Args:
ip: Device IP address
device_name: Device name for logging
timeout: Request timeout
log_level: Logging level ('debug', 'info', etc.)
Returns:
Tuple of (hostname, success)
"""
if log_level == 'debug':
self.logger.debug(f"Getting self-reported hostname for {device_name} at {ip}")
result, success = send_tasmota_command(ip, "Status%205", timeout, self.logger)
if success and result:
hostname = result.get('StatusNET', {}).get('Hostname')
if hostname:
if log_level == 'debug':
self.logger.debug(f"Self-reported hostname: {hostname}")
return hostname, True
return None, False
def get_tasmota_devices(self) -> List[Dict]:
"""
Query UniFi controller and filter Tasmota devices.
Returns:
list: List of device info dictionaries
"""
devices = []
self.logger.debug("Querying UniFi controller for devices")
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
# Get unknown device patterns
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_clients:
if self.is_tasmota_device(device):
# Determine connection type
connection = "Unknown"
if device.get('essid'):
connection = f"Wireless - {device.get('essid')}"
elif device.get('radio') or device.get('wifi'):
connection = "Wireless"
elif device.get('port') or device.get('switch_port') or device.get('switch'):
connection = "Wired"
device_name = device.get('name', device.get('hostname', 'Unknown'))
device_hostname = device.get('hostname', '')
device_ip = device.get('ip', '')
# Check for UniFi hostname bug
unifi_hostname_bug_detected = False
device_reported_hostname = None
unifi_name_matches_unknown = (
self.is_hostname_unknown(device_name, unknown_patterns) or
self.is_hostname_unknown(device_hostname, unknown_patterns)
)
if unifi_name_matches_unknown and device_ip:
device_reported_hostname, success = self.get_device_hostname(
device_ip, device_name, timeout=5
)
if success:
# Check if self-reported hostname matches unknown patterns
device_hostname_base = device_reported_hostname.split('-')[0].lower()
device_hostname_matches_unknown = self.is_hostname_unknown(
device_hostname_base, unknown_patterns
)
if not device_hostname_matches_unknown:
unifi_hostname_bug_detected = True
self.logger.info(
f"UniFi OS hostname bug detected for {device_name}: "
f"self-reported hostname '{device_reported_hostname}' "
f"doesn't match unknown patterns"
)
device_info = {
"name": device_name,
"ip": device_ip,
"mac": device.get('mac', ''),
"last_seen": device.get('last_seen', ''),
"hostname": device_hostname,
"notes": device.get('note', ''),
"connection": connection,
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
}
devices.append(device_info)
self.logger.debug(f"Found {len(devices)} Tasmota devices")
return devices
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
raise
def save_tasmota_config(self, devices: List[Dict], previous_data: Optional[Dict] = None):
"""
Save current devices and track changes.
Args:
devices: List of current devices
previous_data: Previously saved device data
"""
current_file = get_data_file_path('current.json')
deprecated_file = get_data_file_path('deprecated.json')
# Save current devices
save_json_file(current_file, devices, self.logger)
# Track deprecated devices
if previous_data:
current_ips = {d['ip'] for d in devices}
deprecated = [d for d in previous_data if d.get('ip') not in current_ips]
if deprecated:
self.logger.info(f"Found {len(deprecated)} deprecated devices")
save_json_file(deprecated_file, deprecated, self.logger)
def get_unknown_devices(self, devices: List[Dict]) -> List[Dict]:
"""
Filter devices to find those matching unknown patterns.
Args:
devices: List of all Tasmota devices
Returns:
list: Devices matching unknown patterns
"""
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
unknown_devices = []
for device in devices:
device_name = device.get('name', '')
device_hostname = device.get('hostname', '')
if (self.is_hostname_unknown(device_name, unknown_patterns) or
self.is_hostname_unknown(device_hostname, unknown_patterns)):
unknown_devices.append(device)
return unknown_devices
def is_ip_in_network_filter(self, ip: str) -> bool:
"""
Check if an IP address is in any configured network filter.
Args:
ip: IP address to check
Returns:
bool: True if IP is in a configured network
"""
network_filters = self.config.get('unifi', {}).get('network_filter', {})
for network_config in network_filters.values():
subnet = network_config.get('subnet', '')
if ip.startswith(subnet):
return True
return False