869 lines
41 KiB
Python
869 lines
41 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tasmota Device Discovery Script
|
|
|
|
This script implements the Device Discovery process for Tasmota devices on a network.
|
|
It connects to a Unifi Switch, retrieves a list of connected devices, and filters for
|
|
potential Tasmota devices based on network_filter information in the config file.
|
|
|
|
Usage:
|
|
python discover_devices.py [options]
|
|
|
|
Options:
|
|
-h, --help Show this help message and exit
|
|
-d, --debug Enable debug mode with verbose logging
|
|
-c, --config Specify a custom config file path (default: config.yaml)
|
|
"""
|
|
|
|
import argparse
|
|
import yaml
|
|
import logging
|
|
import os
|
|
import sys
|
|
import re
|
|
import json
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
|
|
# Suppress only the single InsecureRequestWarning from urllib3
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
|
|
|
# Set up logging
|
|
logger = logging.getLogger("tasmota_discovery")
|
|
|
|
# File paths
|
|
VALID_HOSTNAMES_FILE = "valid_hostnames.json"
|
|
DEFAULT_HOSTNAMES_FILE = "default_hostnames.json"
|
|
DEPRECATED_HOSTNAMES_FILE = "deprecated_hostnames.json"
|
|
UNIDENTIFIED_DEVICES_FILE = "unidentified_devices.json"
|
|
|
|
class UnifiClient:
|
|
"""Client for interacting with the Unifi Controller API."""
|
|
|
|
def __init__(self, config: Dict[str, Any], debug: bool = False):
|
|
"""
|
|
Initialize the Unifi client with configuration.
|
|
|
|
Args:
|
|
config: Dictionary containing Unifi configuration
|
|
debug: Whether to enable debug mode
|
|
"""
|
|
self.host = config.get('host')
|
|
self.port = config.get('port', 8443)
|
|
self.type = config.get('type', 'UDMSE')
|
|
self.site_id = config.get('site', 'default') # Site ID parameter from config
|
|
self.api_key = config.get('API_Key') # API Key for authentication
|
|
self.debug = debug
|
|
self.session = requests.Session()
|
|
self.base_url = self.host # Using host without port number as per issue requirement
|
|
self.is_authenticated = False
|
|
self.available_sites = [] # Will store available sites from the controller
|
|
self.application_version = "Unknown" # Will store the application version
|
|
|
|
# Check for required configuration parameters
|
|
if not self.host:
|
|
raise ValueError("Missing required Unifi host parameter")
|
|
|
|
# Set up API key authentication
|
|
if self.api_key:
|
|
logger.debug("API Key authentication will be used")
|
|
# Add API key to session headers
|
|
self.session.headers.update({
|
|
'X-API-KEY': self.api_key,
|
|
'Accept': 'application/json'
|
|
})
|
|
self.is_authenticated = True # With API key, we're pre-authenticated
|
|
|
|
# Retrieve and validate the site ID
|
|
self.retrieve_available_sites()
|
|
|
|
# Get the application version
|
|
self.get_application_version()
|
|
else:
|
|
raise ValueError("Missing required Unifi API_Key parameter")
|
|
|
|
|
|
def retrieve_available_sites(self) -> bool:
|
|
"""
|
|
Retrieve available sites from the Unifi Controller and validate the configured site ID.
|
|
|
|
Returns:
|
|
bool: True if site ID is valid, False otherwise
|
|
"""
|
|
if not self.is_authenticated:
|
|
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
|
|
return False
|
|
|
|
try:
|
|
# Try different API endpoints for retrieving sites
|
|
endpoints = [
|
|
"/proxy/network/integration/v1/sites", # Integration API endpoint
|
|
"/proxy/network/api/self/sites", # Newer API endpoint
|
|
"/api/self/sites", # Legacy API endpoint
|
|
"/v2/api/sites" # v2 API endpoint
|
|
]
|
|
|
|
logger.info("Attempting to retrieve available sites from Unifi Controller")
|
|
|
|
for endpoint in endpoints:
|
|
sites_url = f"{self.base_url}{endpoint}"
|
|
logger.debug(f"Trying sites API endpoint: {sites_url}")
|
|
|
|
try:
|
|
response = self.session.get(sites_url, verify=False, timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
sites_data = response.json()
|
|
|
|
# Handle different response formats
|
|
if isinstance(sites_data, list):
|
|
self.available_sites = sites_data
|
|
elif isinstance(sites_data, dict) and 'data' in sites_data:
|
|
self.available_sites = sites_data.get('data', [])
|
|
else:
|
|
logger.debug(f"Unexpected response format from sites API endpoint: {sites_url}")
|
|
continue
|
|
|
|
# Log the available sites
|
|
site_names = [site.get('name', site.get('desc', 'Unknown')) for site in self.available_sites]
|
|
site_ids = [site.get('id', site.get('name', 'Unknown')) for site in self.available_sites]
|
|
|
|
logger.info(f"Retrieved {len(self.available_sites)} sites from Unifi Controller")
|
|
logger.debug(f"Available sites: {', '.join(site_names)}")
|
|
logger.debug(f"Available site IDs: {', '.join(site_ids)}")
|
|
|
|
# Validate the configured site ID
|
|
valid_site = False
|
|
for site in self.available_sites:
|
|
site_id = site.get('id', site.get('name', ''))
|
|
if site_id == self.site_id:
|
|
valid_site = True
|
|
site_name = site.get('name', site.get('desc', 'Unknown'))
|
|
logger.info(f"Configured site ID '{self.site_id}' is valid (Site name: {site_name})")
|
|
break
|
|
|
|
if not valid_site:
|
|
logger.warning(f"Configured site ID '{self.site_id}' not found in available sites")
|
|
if self.site_id == 'default' and len(self.available_sites) > 0:
|
|
# Try to use the first available site as default
|
|
first_site = self.available_sites[0]
|
|
self.site_id = first_site.get('id', first_site.get('name', 'default'))
|
|
site_name = first_site.get('name', first_site.get('desc', 'Unknown'))
|
|
logger.info(f"Using first available site as default: '{self.site_id}' (Site name: {site_name})")
|
|
valid_site = True
|
|
|
|
return valid_site
|
|
else:
|
|
logger.debug(f"Sites API endpoint failed: {sites_url} - Status code: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error with sites API endpoint {sites_url}: {str(e)}")
|
|
|
|
logger.error("Failed to retrieve available sites from Unifi Controller")
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving available sites from Unifi Controller: {str(e)}")
|
|
return False
|
|
|
|
def get_application_version(self) -> str:
|
|
"""
|
|
Retrieve the application version from the Unifi Controller.
|
|
|
|
Returns:
|
|
str: The application version or hardware/model information if version not found
|
|
"""
|
|
if not self.is_authenticated:
|
|
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
|
|
return "Unknown"
|
|
|
|
try:
|
|
# Try different API endpoints for retrieving application version
|
|
endpoints = [
|
|
"/proxy/network/integration/v1/info", # Integration API info endpoint (from issue description)
|
|
"/proxy/network/api/s/{site_id}/status", # Site-specific status endpoint
|
|
"/proxy/network/api/status", # Network status endpoint
|
|
"/api/system", # System info endpoint
|
|
"/proxy/network/api/system", # Network system endpoint
|
|
"/v1/api/system/info", # v1 system info endpoint
|
|
"/proxy/protect/api/system", # Protect system endpoint
|
|
"/proxy/network/v1/api/system/info", # Network v1 system info
|
|
"/proxy/network/api/s/{site_id}/stat/sysinfo", # Site-specific system info
|
|
"/api/s/{site_id}/stat/sysinfo" # Legacy site-specific system info
|
|
]
|
|
|
|
logger.info("Attempting to retrieve application version from Unifi Controller")
|
|
|
|
# Variables to store fallback information if no explicit version is found
|
|
hardware_info = None
|
|
model_name = None
|
|
|
|
for endpoint in endpoints:
|
|
# Replace {site_id} placeholder with actual site_id if present
|
|
current_endpoint = endpoint.replace("{site_id}", self.site_id)
|
|
version_url = f"{self.base_url}{current_endpoint}"
|
|
logger.debug(f"Trying version API endpoint: {version_url}")
|
|
|
|
try:
|
|
response = self.session.get(version_url, verify=False, timeout=5)
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
version_data = response.json()
|
|
logger.debug(f"Response from {current_endpoint}: {version_data}")
|
|
|
|
# Handle different response formats
|
|
# Try common paths where version information might be found
|
|
version = None
|
|
|
|
# Check for version in meta.version
|
|
if isinstance(version_data, dict) and 'meta' in version_data and 'version' in version_data['meta']:
|
|
version = version_data['meta']['version']
|
|
|
|
# Check for version in data.version
|
|
elif isinstance(version_data, dict) and 'data' in version_data:
|
|
data = version_data['data']
|
|
if isinstance(data, list) and len(data) > 0 and 'version' in data[0]:
|
|
version = data[0]['version']
|
|
elif isinstance(data, dict) and 'version' in data:
|
|
version = data['version']
|
|
|
|
# Check for version directly in the response
|
|
elif isinstance(version_data, dict) and 'version' in version_data:
|
|
version = version_data['version']
|
|
|
|
# Check for firmware_version
|
|
elif isinstance(version_data, dict) and 'firmware_version' in version_data:
|
|
version = version_data['firmware_version']
|
|
|
|
# Check for firmware
|
|
elif isinstance(version_data, dict) and 'firmware' in version_data:
|
|
version = version_data['firmware']
|
|
|
|
# Check for controller.version
|
|
elif isinstance(version_data, dict) and 'controller' in version_data and 'version' in version_data['controller']:
|
|
version = version_data['controller']['version']
|
|
|
|
# Check for applicationVersion (from issue description)
|
|
elif isinstance(version_data, dict) and 'applicationVersion' in version_data:
|
|
version = version_data['applicationVersion']
|
|
logger.debug(f"Found applicationVersion in response: {version}")
|
|
|
|
# Store hardware and model information as fallback
|
|
if isinstance(version_data, dict):
|
|
# Check for hardware information
|
|
if 'hardware' in version_data and isinstance(version_data['hardware'], dict) and 'shortname' in version_data['hardware']:
|
|
hardware_info = version_data['hardware']['shortname']
|
|
|
|
# Check for model/name information
|
|
if 'name' in version_data:
|
|
model_name = version_data['name']
|
|
|
|
if version:
|
|
self.application_version = str(version)
|
|
logger.info(f"Retrieved application version from Unifi Controller: {self.application_version}")
|
|
return self.application_version
|
|
|
|
except ValueError as json_error:
|
|
logger.debug(f"Invalid JSON response from endpoint: {version_url} - Error: {str(json_error)}")
|
|
# Try to extract version from raw response if JSON parsing fails
|
|
try:
|
|
raw_response = response.text
|
|
logger.debug(f"Raw response from {current_endpoint}: {raw_response[:200]}...") # Log first 200 chars
|
|
|
|
# Look for version patterns in raw response
|
|
import re
|
|
version_match = re.search(r'version["\']?\s*:\s*["\']([^"\']+)["\']', raw_response, re.IGNORECASE)
|
|
if version_match:
|
|
version = version_match.group(1)
|
|
self.application_version = str(version)
|
|
logger.info(f"Retrieved application version from raw response: {self.application_version}")
|
|
return self.application_version
|
|
except Exception as raw_error:
|
|
logger.debug(f"Error processing raw response: {str(raw_error)}")
|
|
else:
|
|
logger.debug(f"Version API endpoint failed: {version_url} - Status code: {response.status_code}")
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Error with version API endpoint {version_url}: {str(e)}")
|
|
|
|
# If we have hardware or model information, use that as a fallback
|
|
if hardware_info and model_name:
|
|
fallback_version = f"{hardware_info} - {model_name}"
|
|
self.application_version = fallback_version
|
|
logger.info(f"Using hardware and model information as version: {fallback_version}")
|
|
return fallback_version
|
|
elif hardware_info:
|
|
self.application_version = hardware_info
|
|
logger.info(f"Using hardware information as version: {hardware_info}")
|
|
return hardware_info
|
|
elif model_name:
|
|
self.application_version = model_name
|
|
logger.info(f"Using model name as version: {model_name}")
|
|
return model_name
|
|
|
|
logger.warning("Failed to retrieve application version from Unifi Controller")
|
|
return "Unknown"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving application version from Unifi Controller: {str(e)}")
|
|
return "Unknown"
|
|
|
|
def get_devices(self) -> List[Dict[str, Any]]:
|
|
"""
|
|
Retrieve list of all connected devices from the Unifi Controller using the specified
|
|
site-specific integration API endpoint. Handles pagination to retrieve all devices.
|
|
|
|
Returns:
|
|
List of dictionaries containing device information
|
|
"""
|
|
if not self.is_authenticated:
|
|
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
|
|
return []
|
|
|
|
try:
|
|
logger.info("Retrieving devices from Unifi Controller using site-specific integration API endpoint")
|
|
|
|
# Ensure we have a valid site ID
|
|
if not self.available_sites:
|
|
logger.debug("No available sites found, attempting to retrieve sites")
|
|
if not self.retrieve_available_sites():
|
|
logger.error("Failed to retrieve and validate site ID, cannot use site-specific endpoint")
|
|
return []
|
|
|
|
# Use the specific site-specific integration API endpoint from the issue description
|
|
# Format: /proxy/network/integration/v1/sites/{site_id}/clients
|
|
site_integration_url = f"{self.base_url}/proxy/network/integration/v1/sites/{self.site_id}/clients"
|
|
logger.debug(f"Using site-specific integration API endpoint: {site_integration_url}")
|
|
|
|
# Initialize variables for pagination
|
|
all_devices = []
|
|
offset = 0
|
|
limit = 100 # Increase the limit to reduce the number of API calls
|
|
total_count = None
|
|
|
|
# Loop until we've retrieved all devices
|
|
while True:
|
|
try:
|
|
# Add pagination parameters to the URL
|
|
paginated_url = f"{site_integration_url}?offset={offset}&limit={limit}"
|
|
logger.debug(f"Retrieving page with offset={offset}, limit={limit}")
|
|
|
|
response = self.session.get(paginated_url, verify=False, timeout=10) # Increased timeout for larger responses
|
|
|
|
if response.status_code == 200:
|
|
response_data = response.json()
|
|
|
|
# Based on the test results, the response is a dictionary with keys: offset, limit, count, totalCount, data
|
|
# The 'data' field contains the actual client information
|
|
if isinstance(response_data, dict):
|
|
# Extract pagination information
|
|
current_offset = response_data.get('offset', 0)
|
|
current_limit = response_data.get('limit', 0)
|
|
current_count = response_data.get('count', 0)
|
|
|
|
# Set total_count if not already set
|
|
if total_count is None:
|
|
total_count = response_data.get('totalCount', 0)
|
|
logger.info(f"Total number of devices: {total_count}")
|
|
|
|
if 'data' in response_data:
|
|
devices = response_data.get('data', [])
|
|
logger.info(f"Retrieved {len(devices)} devices from page {offset//limit + 1} (offset={offset}, limit={limit})")
|
|
|
|
# Log the first device for debugging (if available and first page)
|
|
if len(devices) > 0 and offset == 0:
|
|
logger.debug(f"First device sample: {devices[0]}")
|
|
logger.debug(f"Keys in first device: {', '.join(devices[0].keys())}")
|
|
|
|
# Add devices to the all_devices list
|
|
all_devices.extend(devices)
|
|
|
|
# Update offset for next page
|
|
offset += current_count
|
|
|
|
# Check if we've retrieved all devices
|
|
if offset >= total_count or current_count == 0:
|
|
logger.info(f"Retrieved all {len(all_devices)} devices")
|
|
break
|
|
else:
|
|
logger.error("No 'data' field in response")
|
|
break
|
|
elif isinstance(response_data, list):
|
|
devices = response_data
|
|
logger.info(f"Retrieved {len(devices)} devices from Unifi Controller (direct list)")
|
|
|
|
# Log the first device for debugging (if available and first page)
|
|
if len(devices) > 0 and offset == 0:
|
|
logger.debug(f"First device sample: {devices[0]}")
|
|
|
|
# Add devices to the all_devices list
|
|
all_devices.extend(devices)
|
|
|
|
# Since we don't have pagination information, we can't continue
|
|
break
|
|
else:
|
|
logger.error(f"Unexpected response format: {type(response_data)}")
|
|
logger.debug(f"Response content: {response_data}")
|
|
break
|
|
else:
|
|
logger.error(f"Site-specific integration API endpoint failed: {response.status_code}")
|
|
try:
|
|
error_content = response.text
|
|
logger.debug(f"Error response content: {error_content}")
|
|
except Exception:
|
|
pass
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error with site-specific integration API endpoint: {str(e)}")
|
|
break
|
|
|
|
# Transform the response to match the expected format for our application
|
|
logger.info(f"Transforming {len(all_devices)} devices to application format")
|
|
transformed_devices = []
|
|
for device in all_devices:
|
|
transformed_device = {
|
|
'hostname': device.get('name', ''),
|
|
'ip': device.get('ipAddress', ''),
|
|
'mac': device.get('macAddress', ''),
|
|
'status': 'connected' if device.get('connectedAt') else 'disconnected',
|
|
'id': device.get('id', ''),
|
|
'type': device.get('type', ''),
|
|
'connected_at': device.get('connectedAt', ''),
|
|
'uplink_device_id': device.get('uplinkDeviceId', '')
|
|
}
|
|
transformed_devices.append(transformed_device)
|
|
|
|
return transformed_devices
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in get_devices method: {str(e)}")
|
|
return []
|
|
|
|
|
|
class DeviceDiscovery:
|
|
"""Main class for Tasmota device discovery."""
|
|
|
|
def __init__(self, config_path: str, debug: bool = False):
|
|
"""
|
|
Initialize the device discovery with configuration.
|
|
|
|
Args:
|
|
config_path: Path to the configuration file
|
|
debug: Whether to enable debug mode
|
|
"""
|
|
self.config_path = config_path
|
|
self.debug = debug
|
|
self.config = self._load_config()
|
|
self.unifi_client = UnifiClient(self.config.get('unifi', {}), debug)
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""
|
|
Load configuration from YAML file.
|
|
|
|
Returns:
|
|
Dictionary containing configuration
|
|
"""
|
|
try:
|
|
with open(self.config_path, 'r') as file:
|
|
config = yaml.safe_load(file)
|
|
logger.debug(f"Loaded configuration from {self.config_path}")
|
|
return config
|
|
except Exception as e:
|
|
logger.error(f"Error loading configuration from {self.config_path}: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def _match_pattern(self, hostname: str, patterns: List[str]) -> bool:
|
|
"""
|
|
Check if hostname matches any of the given patterns.
|
|
|
|
Args:
|
|
hostname: Hostname to check
|
|
patterns: List of glob patterns to match against
|
|
|
|
Returns:
|
|
True if hostname matches any pattern, False otherwise
|
|
"""
|
|
if not hostname or not patterns:
|
|
return False
|
|
|
|
for pattern in patterns:
|
|
# Convert glob pattern to regex
|
|
regex_pattern = pattern.replace("*", ".*")
|
|
if re.match(f"^{regex_pattern}$", hostname, re.IGNORECASE):
|
|
return True
|
|
return False
|
|
|
|
def filter_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
"""
|
|
Filter devices based on network_filter criteria.
|
|
|
|
Args:
|
|
devices: List of devices from Unifi Controller
|
|
|
|
Returns:
|
|
Tuple of (included_devices, excluded_devices, unidentified_devices)
|
|
"""
|
|
included_devices = []
|
|
excluded_devices = []
|
|
unidentified_devices = []
|
|
|
|
# Track devices that have been processed to avoid duplicates
|
|
processed_devices = set()
|
|
|
|
network_filters = self.config.get('unifi', {}).get('network_filter', {})
|
|
|
|
for network_name, network_config in network_filters.items():
|
|
# Support both old (subnet) and new (subnets) configuration formats
|
|
subnet = network_config.get('subnet')
|
|
subnets = network_config.get('subnets', [])
|
|
|
|
# If subnet is specified but subnets is not, add subnet to subnets for backward compatibility
|
|
if subnet and not subnets:
|
|
subnets = [subnet]
|
|
|
|
exclude_patterns = network_config.get('exclude_patterns', [])
|
|
|
|
logger.debug(f"Filtering devices for network {network_name} with subnets {subnets}")
|
|
|
|
for device in devices:
|
|
# Skip devices that have already been processed
|
|
device_id = device.get('id')
|
|
if device_id in processed_devices:
|
|
continue
|
|
|
|
ip = device.get('ip')
|
|
hostname = device.get('hostname', '')
|
|
|
|
# Check if device is in any of the specified subnets
|
|
in_subnet = False
|
|
matching_subnet = None
|
|
|
|
if ip:
|
|
for subnet_prefix in subnets:
|
|
if ip.startswith(subnet_prefix):
|
|
in_subnet = True
|
|
matching_subnet = subnet_prefix
|
|
break
|
|
|
|
if not in_subnet:
|
|
# Add to unidentified devices if not in any of the specified subnets
|
|
logger.debug(f"Unidentified device {hostname} ({ip}) - not in any configured subnet")
|
|
unidentified_devices.append(device)
|
|
processed_devices.add(device_id)
|
|
continue
|
|
|
|
# Check if device should be excluded
|
|
if self._match_pattern(hostname, exclude_patterns):
|
|
logger.debug(f"Excluding device {hostname} ({ip}) - matches exclude pattern")
|
|
excluded_devices.append(device)
|
|
else:
|
|
logger.debug(f"Including device {hostname} ({ip}) - matched subnet {matching_subnet}")
|
|
included_devices.append(device)
|
|
|
|
# Mark device as processed
|
|
processed_devices.add(device_id)
|
|
|
|
logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})")
|
|
return included_devices, excluded_devices, unidentified_devices
|
|
|
|
def classify_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
"""
|
|
Classify devices into valid hostname and default hostname groups.
|
|
|
|
Args:
|
|
devices: List of filtered devices
|
|
|
|
Returns:
|
|
Tuple of (valid_hostname_devices, default_hostname_devices)
|
|
"""
|
|
valid_hostname_devices = []
|
|
default_hostname_devices = []
|
|
|
|
network_filters = self.config.get('unifi', {}).get('network_filter', {})
|
|
|
|
for device in devices:
|
|
hostname = device.get('hostname', '')
|
|
|
|
# Check if hostname matches any default name pattern
|
|
is_default = False
|
|
for network_name, network_config in network_filters.items():
|
|
default_patterns = network_config.get('default_name_patterns', [])
|
|
if self._match_pattern(hostname, default_patterns):
|
|
logger.debug(f"Device {hostname} matches default name pattern")
|
|
default_hostname_devices.append(device)
|
|
is_default = True
|
|
break
|
|
|
|
if not is_default:
|
|
logger.debug(f"Device {hostname} has valid hostname")
|
|
valid_hostname_devices.append(device)
|
|
|
|
logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default")
|
|
return valid_hostname_devices, default_hostname_devices
|
|
|
|
def process_existing_files(self, valid_devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
|
|
"""
|
|
Process existing hostname files and update them based on current devices.
|
|
|
|
Args:
|
|
valid_devices: List of devices with valid hostnames
|
|
|
|
Returns:
|
|
Tuple of (updated_valid_devices, deprecated_devices)
|
|
"""
|
|
existing_valid_devices = []
|
|
deprecated_devices = []
|
|
|
|
# Load existing valid hostnames if file exists
|
|
if os.path.exists(VALID_HOSTNAMES_FILE):
|
|
try:
|
|
with open(VALID_HOSTNAMES_FILE, 'r') as file:
|
|
existing_valid_devices = json.load(file)
|
|
logger.debug(f"Loaded {len(existing_valid_devices)} existing valid hostnames")
|
|
except Exception as e:
|
|
logger.error(f"Error loading existing valid hostnames: {str(e)}")
|
|
existing_valid_devices = []
|
|
|
|
# Delete deprecated file if it exists
|
|
if os.path.exists(DEPRECATED_HOSTNAMES_FILE):
|
|
try:
|
|
os.remove(DEPRECATED_HOSTNAMES_FILE)
|
|
logger.debug("Deleted existing deprecated hostnames file")
|
|
except Exception as e:
|
|
logger.error(f"Error deleting deprecated hostnames file: {str(e)}")
|
|
|
|
if not existing_valid_devices:
|
|
# If no existing valid hostnames, just return the current valid devices
|
|
return valid_devices, []
|
|
|
|
# Create lookup dictionaries for faster processing
|
|
current_devices_by_hostname = {device.get('hostname'): device for device in valid_devices if device.get('hostname')}
|
|
existing_devices_by_hostname = {device.get('hostname'): device for device in existing_valid_devices if device.get('hostname')}
|
|
|
|
updated_valid_devices = []
|
|
|
|
# Process current devices
|
|
for hostname, device in current_devices_by_hostname.items():
|
|
if hostname in existing_devices_by_hostname:
|
|
# Device exists in both current and existing lists
|
|
existing_device = existing_devices_by_hostname[hostname]
|
|
|
|
# Check if any fields have changed
|
|
changed_fields = []
|
|
for key, value in device.items():
|
|
if key in existing_device and existing_device[key] != value:
|
|
changed_fields.append(key)
|
|
|
|
if changed_fields:
|
|
logger.info(f"Device {hostname} has changed fields: {', '.join(changed_fields)}")
|
|
|
|
# Use the current device data
|
|
updated_valid_devices.append(device)
|
|
else:
|
|
# New device not in existing list
|
|
logger.info(f"New device found: {hostname}")
|
|
updated_valid_devices.append(device)
|
|
|
|
# Find deprecated devices (in existing but not in current)
|
|
for hostname, device in existing_devices_by_hostname.items():
|
|
if hostname not in current_devices_by_hostname:
|
|
logger.info(f"Device {hostname} is no longer available, marking as deprecated")
|
|
deprecated_devices.append(device)
|
|
|
|
return updated_valid_devices, deprecated_devices
|
|
|
|
def save_device_files(self, valid_devices: List[Dict[str, Any]], default_devices: List[Dict[str, Any]],
|
|
deprecated_devices: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Save device information to respective files.
|
|
|
|
Args:
|
|
valid_devices: List of devices with valid hostnames
|
|
default_devices: List of devices with default hostnames
|
|
deprecated_devices: List of devices that are deprecated
|
|
"""
|
|
try:
|
|
with open(VALID_HOSTNAMES_FILE, 'w') as file:
|
|
json.dump(valid_devices, file, indent=2)
|
|
logger.info(f"Saved {len(valid_devices)} valid hostnames to {VALID_HOSTNAMES_FILE}")
|
|
|
|
with open(DEFAULT_HOSTNAMES_FILE, 'w') as file:
|
|
json.dump(default_devices, file, indent=2)
|
|
logger.info(f"Saved {len(default_devices)} default hostnames to {DEFAULT_HOSTNAMES_FILE}")
|
|
|
|
if deprecated_devices:
|
|
with open(DEPRECATED_HOSTNAMES_FILE, 'w') as file:
|
|
json.dump(deprecated_devices, file, indent=2)
|
|
logger.info(f"Saved {len(deprecated_devices)} deprecated hostnames to {DEPRECATED_HOSTNAMES_FILE}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving device files: {str(e)}")
|
|
|
|
def save_unidentified_devices(self, unidentified_devices: List[Dict[str, Any]]) -> None:
|
|
"""
|
|
Save unidentified devices to a file when debug mode is enabled.
|
|
|
|
Args:
|
|
unidentified_devices: List of devices that were not identified during filtering
|
|
"""
|
|
if not self.debug:
|
|
return
|
|
|
|
if not unidentified_devices:
|
|
logger.debug("No unidentified devices to save")
|
|
return
|
|
|
|
try:
|
|
with open(UNIDENTIFIED_DEVICES_FILE, 'w') as file:
|
|
json.dump(unidentified_devices, file, indent=2)
|
|
logger.info(f"Saved {len(unidentified_devices)} unidentified devices to {UNIDENTIFIED_DEVICES_FILE}")
|
|
except Exception as e:
|
|
logger.error(f"Error saving unidentified devices file: {str(e)}")
|
|
|
|
def discover(self) -> None:
|
|
"""
|
|
Run the device discovery process.
|
|
|
|
This method retrieves devices from the Unifi Controller using the site-specific
|
|
integration API endpoint, filters and classifies them, and then processes them
|
|
against existing device files.
|
|
"""
|
|
logger.info("Starting Tasmota device discovery")
|
|
|
|
# Get devices from Unifi Controller using the site-specific integration API endpoint
|
|
devices = self.unifi_client.get_devices()
|
|
|
|
if not devices:
|
|
logger.warning("No devices retrieved from Unifi Controller")
|
|
|
|
# Try to use existing valid_hostnames.json file as fallback
|
|
if os.path.exists(VALID_HOSTNAMES_FILE):
|
|
try:
|
|
with open(VALID_HOSTNAMES_FILE, 'r') as file:
|
|
valid_hostname_devices = json.load(file)
|
|
logger.info(f"Loaded {len(valid_hostname_devices)} devices from {VALID_HOSTNAMES_FILE} as fallback")
|
|
|
|
# Process existing files and update device lists
|
|
updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices)
|
|
|
|
# Initialize default_hostname_devices as an empty list
|
|
default_hostname_devices = []
|
|
|
|
# Save device information to files
|
|
self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices)
|
|
|
|
logger.info("Device discovery completed successfully using fallback method")
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Error loading {VALID_HOSTNAMES_FILE}: {str(e)}")
|
|
logger.warning("Cannot proceed with device discovery")
|
|
return
|
|
else:
|
|
logger.warning("No valid hostnames file found for fallback")
|
|
logger.warning("Cannot proceed with device discovery")
|
|
return
|
|
|
|
logger.info(f"Retrieved {len(devices)} devices from Unifi Controller")
|
|
|
|
# Filter devices based on network_filter criteria
|
|
included_devices, excluded_devices, unidentified_devices = self.filter_devices(devices)
|
|
|
|
# Save unidentified devices to a file when debug mode is enabled
|
|
self.save_unidentified_devices(unidentified_devices)
|
|
|
|
if not included_devices:
|
|
logger.warning("No devices match the filter criteria")
|
|
return
|
|
|
|
logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})")
|
|
|
|
# Classify devices into valid and default hostname groups
|
|
valid_hostname_devices, default_hostname_devices = self.classify_devices(included_devices)
|
|
|
|
logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default")
|
|
|
|
# Process existing files and update device lists
|
|
updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices)
|
|
|
|
# Save device information to files
|
|
self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices)
|
|
|
|
logger.info("Device discovery completed successfully")
|
|
|
|
|
|
def setup_logging(debug: bool) -> None:
|
|
"""
|
|
Set up logging configuration.
|
|
|
|
Args:
|
|
debug: Whether to enable debug mode
|
|
"""
|
|
log_level = logging.DEBUG if debug else logging.INFO
|
|
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
|
|
# Create logs directory if it doesn't exist
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
# Configure file handler
|
|
file_handler = logging.FileHandler('logs/discovery.log')
|
|
file_handler.setLevel(log_level)
|
|
file_handler.setFormatter(logging.Formatter(log_format))
|
|
|
|
# Configure console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(log_level)
|
|
console_handler.setFormatter(logging.Formatter(log_format))
|
|
|
|
# Configure root logger
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(log_level)
|
|
root_logger.addHandler(file_handler)
|
|
root_logger.addHandler(console_handler)
|
|
|
|
# Configure tasmota_discovery logger
|
|
logger.setLevel(log_level)
|
|
|
|
if debug:
|
|
logger.debug("Debug mode enabled")
|
|
|
|
|
|
def parse_arguments() -> argparse.Namespace:
|
|
"""
|
|
Parse command-line arguments.
|
|
|
|
Returns:
|
|
Parsed arguments
|
|
"""
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Discovery')
|
|
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode with verbose logging')
|
|
parser.add_argument('-c', '--config', default='config.yaml', help='Specify a custom config file path')
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
"""
|
|
Main entry point for the script.
|
|
"""
|
|
args = parse_arguments()
|
|
|
|
# Set up logging
|
|
setup_logging(args.debug)
|
|
|
|
try:
|
|
# Initialize device discovery
|
|
discovery = DeviceDiscovery(args.config, args.debug)
|
|
|
|
# Run discovery process
|
|
discovery.discover()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during device discovery: {str(e)}")
|
|
if args.debug:
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |