- Created modular Python files (main, utils, discovery, configuration, console_settings, unknown_devices, reporting, unifi_client) - Moved documentation files to docs/ - Moved data files to data/ - Removed old monolithic TasmotaManager.py and TasmotaManager_fixed.py - Updated .gitignore and pyproject.toml - All functionality preserved, command-line interface unchanged Version: 2.0.0
282 lines
11 KiB
Python
282 lines
11 KiB
Python
"""Device discovery and filtering logic."""
|
|
|
|
import logging
|
|
from typing import List, Dict, Optional, Tuple
|
|
|
|
from utils import match_pattern, send_tasmota_command, get_hostname_base, get_data_file_path, save_json_file
|
|
from unifi_client import UnifiClient
|
|
|
|
|
|
class TasmotaDiscovery:
|
|
"""Handles discovery and filtering of Tasmota devices via UniFi."""
|
|
|
|
def __init__(self, config: dict, unifi_client: UnifiClient,
|
|
logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize discovery handler.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
unifi_client: Authenticated UniFi client
|
|
logger: Optional logger instance
|
|
"""
|
|
self.config = config
|
|
self.unifi_client = unifi_client
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
|
|
def is_tasmota_device(self, device: dict) -> bool:
|
|
"""
|
|
Check if a device should be considered a Tasmota device.
|
|
|
|
Args:
|
|
device: Device dictionary from UniFi
|
|
|
|
Returns:
|
|
bool: True if device matches network filter and is not excluded
|
|
"""
|
|
device_ip = device.get('ip', '')
|
|
device_name = device.get('name', device.get('hostname', ''))
|
|
|
|
if not device_ip:
|
|
return False
|
|
|
|
# Check if device is in any configured network
|
|
network_filters = self.config.get('unifi', {}).get('network_filter', {})
|
|
|
|
for network_name, network_config in network_filters.items():
|
|
subnet = network_config.get('subnet', '')
|
|
|
|
# Check if IP is in this subnet
|
|
if not device_ip.startswith(subnet):
|
|
continue
|
|
|
|
# Check if device is excluded
|
|
if self.is_device_excluded(device, network_config):
|
|
self.logger.debug(f"Device {device_name} ({device_ip}) is excluded")
|
|
return False
|
|
|
|
# Device is in network and not excluded
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_device_excluded(self, device: dict, network_config: dict) -> bool:
|
|
"""
|
|
Check if a device matches any exclusion patterns.
|
|
|
|
Args:
|
|
device: Device dictionary
|
|
network_config: Network configuration with exclude_patterns
|
|
|
|
Returns:
|
|
bool: True if device should be excluded
|
|
"""
|
|
device_name = device.get('name', '')
|
|
device_hostname = device.get('hostname', '')
|
|
exclude_patterns = network_config.get('exclude_patterns', [])
|
|
|
|
for pattern in exclude_patterns:
|
|
if match_pattern(device_name, pattern) or match_pattern(device_hostname, pattern):
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_hostname_unknown(self, hostname: str, unknown_patterns: List[str]) -> bool:
|
|
"""
|
|
Check if a hostname matches unknown device patterns.
|
|
|
|
Args:
|
|
hostname: Hostname to check
|
|
unknown_patterns: List of patterns for unknown devices
|
|
|
|
Returns:
|
|
bool: True if hostname matches any unknown pattern
|
|
"""
|
|
if not hostname:
|
|
return False
|
|
|
|
for pattern in unknown_patterns:
|
|
if match_pattern(hostname, pattern):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_device_hostname(self, ip: str, device_name: str,
|
|
timeout: int = 5, log_level: str = 'debug') -> Tuple[Optional[str], bool]:
|
|
"""
|
|
Get the self-reported hostname from a Tasmota device.
|
|
|
|
Args:
|
|
ip: Device IP address
|
|
device_name: Device name for logging
|
|
timeout: Request timeout
|
|
log_level: Logging level ('debug', 'info', etc.)
|
|
|
|
Returns:
|
|
Tuple of (hostname, success)
|
|
"""
|
|
if log_level == 'debug':
|
|
self.logger.debug(f"Getting self-reported hostname for {device_name} at {ip}")
|
|
|
|
result, success = send_tasmota_command(ip, "Status%205", timeout, self.logger)
|
|
|
|
if success and result:
|
|
hostname = result.get('StatusNET', {}).get('Hostname')
|
|
if hostname:
|
|
if log_level == 'debug':
|
|
self.logger.debug(f"Self-reported hostname: {hostname}")
|
|
return hostname, True
|
|
|
|
return None, False
|
|
|
|
def get_tasmota_devices(self) -> List[Dict]:
|
|
"""
|
|
Query UniFi controller and filter Tasmota devices.
|
|
|
|
Returns:
|
|
list: List of device info dictionaries
|
|
"""
|
|
devices = []
|
|
self.logger.debug("Querying UniFi controller for devices")
|
|
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
|
|
# Get unknown device patterns
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_clients:
|
|
if self.is_tasmota_device(device):
|
|
# Determine connection type
|
|
connection = "Unknown"
|
|
if device.get('essid'):
|
|
connection = f"Wireless - {device.get('essid')}"
|
|
elif device.get('radio') or device.get('wifi'):
|
|
connection = "Wireless"
|
|
elif device.get('port') or device.get('switch_port') or device.get('switch'):
|
|
connection = "Wired"
|
|
|
|
device_name = device.get('name', device.get('hostname', 'Unknown'))
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device.get('ip', '')
|
|
|
|
# Check for UniFi hostname bug
|
|
unifi_hostname_bug_detected = False
|
|
device_reported_hostname = None
|
|
|
|
unifi_name_matches_unknown = (
|
|
self.is_hostname_unknown(device_name, unknown_patterns) or
|
|
self.is_hostname_unknown(device_hostname, unknown_patterns)
|
|
)
|
|
|
|
if unifi_name_matches_unknown and device_ip:
|
|
device_reported_hostname, success = self.get_device_hostname(
|
|
device_ip, device_name, timeout=5
|
|
)
|
|
|
|
if success:
|
|
# Check if self-reported hostname matches unknown patterns
|
|
device_hostname_base = device_reported_hostname.split('-')[0].lower()
|
|
device_hostname_matches_unknown = self.is_hostname_unknown(
|
|
device_hostname_base, unknown_patterns
|
|
)
|
|
|
|
if not device_hostname_matches_unknown:
|
|
unifi_hostname_bug_detected = True
|
|
self.logger.info(
|
|
f"UniFi OS hostname bug detected for {device_name}: "
|
|
f"self-reported hostname '{device_reported_hostname}' "
|
|
f"doesn't match unknown patterns"
|
|
)
|
|
|
|
device_info = {
|
|
"name": device_name,
|
|
"ip": device_ip,
|
|
"mac": device.get('mac', ''),
|
|
"last_seen": device.get('last_seen', ''),
|
|
"hostname": device_hostname,
|
|
"notes": device.get('note', ''),
|
|
"connection": connection,
|
|
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
|
|
}
|
|
devices.append(device_info)
|
|
|
|
self.logger.debug(f"Found {len(devices)} Tasmota devices")
|
|
return devices
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
raise
|
|
|
|
def save_tasmota_config(self, devices: List[Dict], previous_data: Optional[Dict] = None):
|
|
"""
|
|
Save current devices and track changes.
|
|
|
|
Args:
|
|
devices: List of current devices
|
|
previous_data: Previously saved device data
|
|
"""
|
|
current_file = get_data_file_path('current.json')
|
|
deprecated_file = get_data_file_path('deprecated.json')
|
|
|
|
# Save current devices
|
|
save_json_file(current_file, devices, self.logger)
|
|
|
|
# Track deprecated devices
|
|
if previous_data:
|
|
current_ips = {d['ip'] for d in devices}
|
|
deprecated = [d for d in previous_data if d.get('ip') not in current_ips]
|
|
|
|
if deprecated:
|
|
self.logger.info(f"Found {len(deprecated)} deprecated devices")
|
|
save_json_file(deprecated_file, deprecated, self.logger)
|
|
|
|
def get_unknown_devices(self, devices: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Filter devices to find those matching unknown patterns.
|
|
|
|
Args:
|
|
devices: List of all Tasmota devices
|
|
|
|
Returns:
|
|
list: Devices matching unknown patterns
|
|
"""
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
unknown_devices = []
|
|
for device in devices:
|
|
device_name = device.get('name', '')
|
|
device_hostname = device.get('hostname', '')
|
|
|
|
if (self.is_hostname_unknown(device_name, unknown_patterns) or
|
|
self.is_hostname_unknown(device_hostname, unknown_patterns)):
|
|
unknown_devices.append(device)
|
|
|
|
return unknown_devices
|
|
|
|
def is_ip_in_network_filter(self, ip: str) -> bool:
|
|
"""
|
|
Check if an IP address is in any configured network filter.
|
|
|
|
Args:
|
|
ip: IP address to check
|
|
|
|
Returns:
|
|
bool: True if IP is in a configured network
|
|
"""
|
|
network_filters = self.config.get('unifi', {}).get('network_filter', {})
|
|
|
|
for network_config in network_filters.values():
|
|
subnet = network_config.get('subnet', '')
|
|
if ip.startswith(subnet):
|
|
return True
|
|
|
|
return False
|