Add discover_devices.py and config.yaml to repository

This commit is contained in:
Mike Geppert 2025-07-20 12:10:05 -05:00
parent 3342a3620f
commit d4bdf43ad6
2 changed files with 905 additions and 0 deletions

36
config.yaml Normal file
View File

@ -0,0 +1,36 @@
unifi:
type: UDMSE # Unifi Dream Machine SE
host: https://192.168.6.1
username: Tasmota
password: TasmotaManager12!@
port: 8443 # Default Unifi Controller port
site: "default" # Site ID for Unifi Controller
API_Key: "nIfTdZAXVUGQgyNqsATluTja-noaNLAk" # API Key for Unifi Controller
network_filter:
NoT_network:
name: "NoT"
# Updated to support multiple subnets
subnets:
- "192.168.5" # Main network
- "192.168.7" # IoT network
- "192.168.8" # Tasmota network
- "192.168.9" # Camera network
exclude_patterns:
- "homeassistant*"
- "*sonos*"
default_name_patterns:
- "tasmota*"
- "ESP-*"
tasmota:
mqtt_settings:
host: "homeassistant.NoT.mgeppert.com"
port: 1883
user: "mgeppert"
password: "mgeppert"
topic: "%hostname_base%"
full_topic: "%prefix%/%topic%/"
no_retain: false
other_settings: # Yet to be defined

869
discover_devices.py Normal file
View File

@ -0,0 +1,869 @@
#!/usr/bin/env python3
"""
Tasmota Device Discovery Script
This script implements the Device Discovery process for Tasmota devices on a network.
It connects to a Unifi Switch, retrieves a list of connected devices, and filters for
potential Tasmota devices based on network_filter information in the config file.
Usage:
python discover_devices.py [options]
Options:
-h, --help Show this help message and exit
-d, --debug Enable debug mode with verbose logging
-c, --config Specify a custom config file path (default: config.yaml)
"""
import argparse
import yaml
import logging
import os
import sys
import re
import json
from typing import Dict, List, Any, Optional, Tuple
import requests
from urllib3.exceptions import InsecureRequestWarning
# Suppress only the single InsecureRequestWarning from urllib3
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
# Set up logging
logger = logging.getLogger("tasmota_discovery")
# File paths
VALID_HOSTNAMES_FILE = "valid_hostnames.json"
DEFAULT_HOSTNAMES_FILE = "default_hostnames.json"
DEPRECATED_HOSTNAMES_FILE = "deprecated_hostnames.json"
UNIDENTIFIED_DEVICES_FILE = "unidentified_devices.json"
class UnifiClient:
"""Client for interacting with the Unifi Controller API."""
def __init__(self, config: Dict[str, Any], debug: bool = False):
"""
Initialize the Unifi client with configuration.
Args:
config: Dictionary containing Unifi configuration
debug: Whether to enable debug mode
"""
self.host = config.get('host')
self.port = config.get('port', 8443)
self.type = config.get('type', 'UDMSE')
self.site_id = config.get('site', 'default') # Site ID parameter from config
self.api_key = config.get('API_Key') # API Key for authentication
self.debug = debug
self.session = requests.Session()
self.base_url = self.host # Using host without port number as per issue requirement
self.is_authenticated = False
self.available_sites = [] # Will store available sites from the controller
self.application_version = "Unknown" # Will store the application version
# Check for required configuration parameters
if not self.host:
raise ValueError("Missing required Unifi host parameter")
# Set up API key authentication
if self.api_key:
logger.debug("API Key authentication will be used")
# Add API key to session headers
self.session.headers.update({
'X-API-KEY': self.api_key,
'Accept': 'application/json'
})
self.is_authenticated = True # With API key, we're pre-authenticated
# Retrieve and validate the site ID
self.retrieve_available_sites()
# Get the application version
self.get_application_version()
else:
raise ValueError("Missing required Unifi API_Key parameter")
def retrieve_available_sites(self) -> bool:
"""
Retrieve available sites from the Unifi Controller and validate the configured site ID.
Returns:
bool: True if site ID is valid, False otherwise
"""
if not self.is_authenticated:
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
return False
try:
# Try different API endpoints for retrieving sites
endpoints = [
"/proxy/network/integration/v1/sites", # Integration API endpoint
"/proxy/network/api/self/sites", # Newer API endpoint
"/api/self/sites", # Legacy API endpoint
"/v2/api/sites" # v2 API endpoint
]
logger.info("Attempting to retrieve available sites from Unifi Controller")
for endpoint in endpoints:
sites_url = f"{self.base_url}{endpoint}"
logger.debug(f"Trying sites API endpoint: {sites_url}")
try:
response = self.session.get(sites_url, verify=False, timeout=5)
if response.status_code == 200:
sites_data = response.json()
# Handle different response formats
if isinstance(sites_data, list):
self.available_sites = sites_data
elif isinstance(sites_data, dict) and 'data' in sites_data:
self.available_sites = sites_data.get('data', [])
else:
logger.debug(f"Unexpected response format from sites API endpoint: {sites_url}")
continue
# Log the available sites
site_names = [site.get('name', site.get('desc', 'Unknown')) for site in self.available_sites]
site_ids = [site.get('id', site.get('name', 'Unknown')) for site in self.available_sites]
logger.info(f"Retrieved {len(self.available_sites)} sites from Unifi Controller")
logger.debug(f"Available sites: {', '.join(site_names)}")
logger.debug(f"Available site IDs: {', '.join(site_ids)}")
# Validate the configured site ID
valid_site = False
for site in self.available_sites:
site_id = site.get('id', site.get('name', ''))
if site_id == self.site_id:
valid_site = True
site_name = site.get('name', site.get('desc', 'Unknown'))
logger.info(f"Configured site ID '{self.site_id}' is valid (Site name: {site_name})")
break
if not valid_site:
logger.warning(f"Configured site ID '{self.site_id}' not found in available sites")
if self.site_id == 'default' and len(self.available_sites) > 0:
# Try to use the first available site as default
first_site = self.available_sites[0]
self.site_id = first_site.get('id', first_site.get('name', 'default'))
site_name = first_site.get('name', first_site.get('desc', 'Unknown'))
logger.info(f"Using first available site as default: '{self.site_id}' (Site name: {site_name})")
valid_site = True
return valid_site
else:
logger.debug(f"Sites API endpoint failed: {sites_url} - Status code: {response.status_code}")
except Exception as e:
logger.debug(f"Error with sites API endpoint {sites_url}: {str(e)}")
logger.error("Failed to retrieve available sites from Unifi Controller")
return False
except Exception as e:
logger.error(f"Error retrieving available sites from Unifi Controller: {str(e)}")
return False
def get_application_version(self) -> str:
"""
Retrieve the application version from the Unifi Controller.
Returns:
str: The application version or hardware/model information if version not found
"""
if not self.is_authenticated:
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
return "Unknown"
try:
# Try different API endpoints for retrieving application version
endpoints = [
"/proxy/network/integration/v1/info", # Integration API info endpoint (from issue description)
"/proxy/network/api/s/{site_id}/status", # Site-specific status endpoint
"/proxy/network/api/status", # Network status endpoint
"/api/system", # System info endpoint
"/proxy/network/api/system", # Network system endpoint
"/v1/api/system/info", # v1 system info endpoint
"/proxy/protect/api/system", # Protect system endpoint
"/proxy/network/v1/api/system/info", # Network v1 system info
"/proxy/network/api/s/{site_id}/stat/sysinfo", # Site-specific system info
"/api/s/{site_id}/stat/sysinfo" # Legacy site-specific system info
]
logger.info("Attempting to retrieve application version from Unifi Controller")
# Variables to store fallback information if no explicit version is found
hardware_info = None
model_name = None
for endpoint in endpoints:
# Replace {site_id} placeholder with actual site_id if present
current_endpoint = endpoint.replace("{site_id}", self.site_id)
version_url = f"{self.base_url}{current_endpoint}"
logger.debug(f"Trying version API endpoint: {version_url}")
try:
response = self.session.get(version_url, verify=False, timeout=5)
if response.status_code == 200:
try:
version_data = response.json()
logger.debug(f"Response from {current_endpoint}: {version_data}")
# Handle different response formats
# Try common paths where version information might be found
version = None
# Check for version in meta.version
if isinstance(version_data, dict) and 'meta' in version_data and 'version' in version_data['meta']:
version = version_data['meta']['version']
# Check for version in data.version
elif isinstance(version_data, dict) and 'data' in version_data:
data = version_data['data']
if isinstance(data, list) and len(data) > 0 and 'version' in data[0]:
version = data[0]['version']
elif isinstance(data, dict) and 'version' in data:
version = data['version']
# Check for version directly in the response
elif isinstance(version_data, dict) and 'version' in version_data:
version = version_data['version']
# Check for firmware_version
elif isinstance(version_data, dict) and 'firmware_version' in version_data:
version = version_data['firmware_version']
# Check for firmware
elif isinstance(version_data, dict) and 'firmware' in version_data:
version = version_data['firmware']
# Check for controller.version
elif isinstance(version_data, dict) and 'controller' in version_data and 'version' in version_data['controller']:
version = version_data['controller']['version']
# Check for applicationVersion (from issue description)
elif isinstance(version_data, dict) and 'applicationVersion' in version_data:
version = version_data['applicationVersion']
logger.debug(f"Found applicationVersion in response: {version}")
# Store hardware and model information as fallback
if isinstance(version_data, dict):
# Check for hardware information
if 'hardware' in version_data and isinstance(version_data['hardware'], dict) and 'shortname' in version_data['hardware']:
hardware_info = version_data['hardware']['shortname']
# Check for model/name information
if 'name' in version_data:
model_name = version_data['name']
if version:
self.application_version = str(version)
logger.info(f"Retrieved application version from Unifi Controller: {self.application_version}")
return self.application_version
except ValueError as json_error:
logger.debug(f"Invalid JSON response from endpoint: {version_url} - Error: {str(json_error)}")
# Try to extract version from raw response if JSON parsing fails
try:
raw_response = response.text
logger.debug(f"Raw response from {current_endpoint}: {raw_response[:200]}...") # Log first 200 chars
# Look for version patterns in raw response
import re
version_match = re.search(r'version["\']?\s*:\s*["\']([^"\']+)["\']', raw_response, re.IGNORECASE)
if version_match:
version = version_match.group(1)
self.application_version = str(version)
logger.info(f"Retrieved application version from raw response: {self.application_version}")
return self.application_version
except Exception as raw_error:
logger.debug(f"Error processing raw response: {str(raw_error)}")
else:
logger.debug(f"Version API endpoint failed: {version_url} - Status code: {response.status_code}")
except Exception as e:
logger.debug(f"Error with version API endpoint {version_url}: {str(e)}")
# If we have hardware or model information, use that as a fallback
if hardware_info and model_name:
fallback_version = f"{hardware_info} - {model_name}"
self.application_version = fallback_version
logger.info(f"Using hardware and model information as version: {fallback_version}")
return fallback_version
elif hardware_info:
self.application_version = hardware_info
logger.info(f"Using hardware information as version: {hardware_info}")
return hardware_info
elif model_name:
self.application_version = model_name
logger.info(f"Using model name as version: {model_name}")
return model_name
logger.warning("Failed to retrieve application version from Unifi Controller")
return "Unknown"
except Exception as e:
logger.error(f"Error retrieving application version from Unifi Controller: {str(e)}")
return "Unknown"
def get_devices(self) -> List[Dict[str, Any]]:
"""
Retrieve list of all connected devices from the Unifi Controller using the specified
site-specific integration API endpoint. Handles pagination to retrieve all devices.
Returns:
List of dictionaries containing device information
"""
if not self.is_authenticated:
logger.error("Not authenticated with Unifi Controller. API key authentication is required.")
return []
try:
logger.info("Retrieving devices from Unifi Controller using site-specific integration API endpoint")
# Ensure we have a valid site ID
if not self.available_sites:
logger.debug("No available sites found, attempting to retrieve sites")
if not self.retrieve_available_sites():
logger.error("Failed to retrieve and validate site ID, cannot use site-specific endpoint")
return []
# Use the specific site-specific integration API endpoint from the issue description
# Format: /proxy/network/integration/v1/sites/{site_id}/clients
site_integration_url = f"{self.base_url}/proxy/network/integration/v1/sites/{self.site_id}/clients"
logger.debug(f"Using site-specific integration API endpoint: {site_integration_url}")
# Initialize variables for pagination
all_devices = []
offset = 0
limit = 100 # Increase the limit to reduce the number of API calls
total_count = None
# Loop until we've retrieved all devices
while True:
try:
# Add pagination parameters to the URL
paginated_url = f"{site_integration_url}?offset={offset}&limit={limit}"
logger.debug(f"Retrieving page with offset={offset}, limit={limit}")
response = self.session.get(paginated_url, verify=False, timeout=10) # Increased timeout for larger responses
if response.status_code == 200:
response_data = response.json()
# Based on the test results, the response is a dictionary with keys: offset, limit, count, totalCount, data
# The 'data' field contains the actual client information
if isinstance(response_data, dict):
# Extract pagination information
current_offset = response_data.get('offset', 0)
current_limit = response_data.get('limit', 0)
current_count = response_data.get('count', 0)
# Set total_count if not already set
if total_count is None:
total_count = response_data.get('totalCount', 0)
logger.info(f"Total number of devices: {total_count}")
if 'data' in response_data:
devices = response_data.get('data', [])
logger.info(f"Retrieved {len(devices)} devices from page {offset//limit + 1} (offset={offset}, limit={limit})")
# Log the first device for debugging (if available and first page)
if len(devices) > 0 and offset == 0:
logger.debug(f"First device sample: {devices[0]}")
logger.debug(f"Keys in first device: {', '.join(devices[0].keys())}")
# Add devices to the all_devices list
all_devices.extend(devices)
# Update offset for next page
offset += current_count
# Check if we've retrieved all devices
if offset >= total_count or current_count == 0:
logger.info(f"Retrieved all {len(all_devices)} devices")
break
else:
logger.error("No 'data' field in response")
break
elif isinstance(response_data, list):
devices = response_data
logger.info(f"Retrieved {len(devices)} devices from Unifi Controller (direct list)")
# Log the first device for debugging (if available and first page)
if len(devices) > 0 and offset == 0:
logger.debug(f"First device sample: {devices[0]}")
# Add devices to the all_devices list
all_devices.extend(devices)
# Since we don't have pagination information, we can't continue
break
else:
logger.error(f"Unexpected response format: {type(response_data)}")
logger.debug(f"Response content: {response_data}")
break
else:
logger.error(f"Site-specific integration API endpoint failed: {response.status_code}")
try:
error_content = response.text
logger.debug(f"Error response content: {error_content}")
except Exception:
pass
break
except Exception as e:
logger.error(f"Error with site-specific integration API endpoint: {str(e)}")
break
# Transform the response to match the expected format for our application
logger.info(f"Transforming {len(all_devices)} devices to application format")
transformed_devices = []
for device in all_devices:
transformed_device = {
'hostname': device.get('name', ''),
'ip': device.get('ipAddress', ''),
'mac': device.get('macAddress', ''),
'status': 'connected' if device.get('connectedAt') else 'disconnected',
'id': device.get('id', ''),
'type': device.get('type', ''),
'connected_at': device.get('connectedAt', ''),
'uplink_device_id': device.get('uplinkDeviceId', '')
}
transformed_devices.append(transformed_device)
return transformed_devices
except Exception as e:
logger.error(f"Error in get_devices method: {str(e)}")
return []
class DeviceDiscovery:
"""Main class for Tasmota device discovery."""
def __init__(self, config_path: str, debug: bool = False):
"""
Initialize the device discovery with configuration.
Args:
config_path: Path to the configuration file
debug: Whether to enable debug mode
"""
self.config_path = config_path
self.debug = debug
self.config = self._load_config()
self.unifi_client = UnifiClient(self.config.get('unifi', {}), debug)
def _load_config(self) -> Dict[str, Any]:
"""
Load configuration from YAML file.
Returns:
Dictionary containing configuration
"""
try:
with open(self.config_path, 'r') as file:
config = yaml.safe_load(file)
logger.debug(f"Loaded configuration from {self.config_path}")
return config
except Exception as e:
logger.error(f"Error loading configuration from {self.config_path}: {str(e)}")
sys.exit(1)
def _match_pattern(self, hostname: str, patterns: List[str]) -> bool:
"""
Check if hostname matches any of the given patterns.
Args:
hostname: Hostname to check
patterns: List of glob patterns to match against
Returns:
True if hostname matches any pattern, False otherwise
"""
if not hostname or not patterns:
return False
for pattern in patterns:
# Convert glob pattern to regex
regex_pattern = pattern.replace("*", ".*")
if re.match(f"^{regex_pattern}$", hostname, re.IGNORECASE):
return True
return False
def filter_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Filter devices based on network_filter criteria.
Args:
devices: List of devices from Unifi Controller
Returns:
Tuple of (included_devices, excluded_devices, unidentified_devices)
"""
included_devices = []
excluded_devices = []
unidentified_devices = []
# Track devices that have been processed to avoid duplicates
processed_devices = set()
network_filters = self.config.get('unifi', {}).get('network_filter', {})
for network_name, network_config in network_filters.items():
# Support both old (subnet) and new (subnets) configuration formats
subnet = network_config.get('subnet')
subnets = network_config.get('subnets', [])
# If subnet is specified but subnets is not, add subnet to subnets for backward compatibility
if subnet and not subnets:
subnets = [subnet]
exclude_patterns = network_config.get('exclude_patterns', [])
logger.debug(f"Filtering devices for network {network_name} with subnets {subnets}")
for device in devices:
# Skip devices that have already been processed
device_id = device.get('id')
if device_id in processed_devices:
continue
ip = device.get('ip')
hostname = device.get('hostname', '')
# Check if device is in any of the specified subnets
in_subnet = False
matching_subnet = None
if ip:
for subnet_prefix in subnets:
if ip.startswith(subnet_prefix):
in_subnet = True
matching_subnet = subnet_prefix
break
if not in_subnet:
# Add to unidentified devices if not in any of the specified subnets
logger.debug(f"Unidentified device {hostname} ({ip}) - not in any configured subnet")
unidentified_devices.append(device)
processed_devices.add(device_id)
continue
# Check if device should be excluded
if self._match_pattern(hostname, exclude_patterns):
logger.debug(f"Excluding device {hostname} ({ip}) - matches exclude pattern")
excluded_devices.append(device)
else:
logger.debug(f"Including device {hostname} ({ip}) - matched subnet {matching_subnet}")
included_devices.append(device)
# Mark device as processed
processed_devices.add(device_id)
logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})")
return included_devices, excluded_devices, unidentified_devices
def classify_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Classify devices into valid hostname and default hostname groups.
Args:
devices: List of filtered devices
Returns:
Tuple of (valid_hostname_devices, default_hostname_devices)
"""
valid_hostname_devices = []
default_hostname_devices = []
network_filters = self.config.get('unifi', {}).get('network_filter', {})
for device in devices:
hostname = device.get('hostname', '')
# Check if hostname matches any default name pattern
is_default = False
for network_name, network_config in network_filters.items():
default_patterns = network_config.get('default_name_patterns', [])
if self._match_pattern(hostname, default_patterns):
logger.debug(f"Device {hostname} matches default name pattern")
default_hostname_devices.append(device)
is_default = True
break
if not is_default:
logger.debug(f"Device {hostname} has valid hostname")
valid_hostname_devices.append(device)
logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default")
return valid_hostname_devices, default_hostname_devices
def process_existing_files(self, valid_devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
"""
Process existing hostname files and update them based on current devices.
Args:
valid_devices: List of devices with valid hostnames
Returns:
Tuple of (updated_valid_devices, deprecated_devices)
"""
existing_valid_devices = []
deprecated_devices = []
# Load existing valid hostnames if file exists
if os.path.exists(VALID_HOSTNAMES_FILE):
try:
with open(VALID_HOSTNAMES_FILE, 'r') as file:
existing_valid_devices = json.load(file)
logger.debug(f"Loaded {len(existing_valid_devices)} existing valid hostnames")
except Exception as e:
logger.error(f"Error loading existing valid hostnames: {str(e)}")
existing_valid_devices = []
# Delete deprecated file if it exists
if os.path.exists(DEPRECATED_HOSTNAMES_FILE):
try:
os.remove(DEPRECATED_HOSTNAMES_FILE)
logger.debug("Deleted existing deprecated hostnames file")
except Exception as e:
logger.error(f"Error deleting deprecated hostnames file: {str(e)}")
if not existing_valid_devices:
# If no existing valid hostnames, just return the current valid devices
return valid_devices, []
# Create lookup dictionaries for faster processing
current_devices_by_hostname = {device.get('hostname'): device for device in valid_devices if device.get('hostname')}
existing_devices_by_hostname = {device.get('hostname'): device for device in existing_valid_devices if device.get('hostname')}
updated_valid_devices = []
# Process current devices
for hostname, device in current_devices_by_hostname.items():
if hostname in existing_devices_by_hostname:
# Device exists in both current and existing lists
existing_device = existing_devices_by_hostname[hostname]
# Check if any fields have changed
changed_fields = []
for key, value in device.items():
if key in existing_device and existing_device[key] != value:
changed_fields.append(key)
if changed_fields:
logger.info(f"Device {hostname} has changed fields: {', '.join(changed_fields)}")
# Use the current device data
updated_valid_devices.append(device)
else:
# New device not in existing list
logger.info(f"New device found: {hostname}")
updated_valid_devices.append(device)
# Find deprecated devices (in existing but not in current)
for hostname, device in existing_devices_by_hostname.items():
if hostname not in current_devices_by_hostname:
logger.info(f"Device {hostname} is no longer available, marking as deprecated")
deprecated_devices.append(device)
return updated_valid_devices, deprecated_devices
def save_device_files(self, valid_devices: List[Dict[str, Any]], default_devices: List[Dict[str, Any]],
deprecated_devices: List[Dict[str, Any]]) -> None:
"""
Save device information to respective files.
Args:
valid_devices: List of devices with valid hostnames
default_devices: List of devices with default hostnames
deprecated_devices: List of devices that are deprecated
"""
try:
with open(VALID_HOSTNAMES_FILE, 'w') as file:
json.dump(valid_devices, file, indent=2)
logger.info(f"Saved {len(valid_devices)} valid hostnames to {VALID_HOSTNAMES_FILE}")
with open(DEFAULT_HOSTNAMES_FILE, 'w') as file:
json.dump(default_devices, file, indent=2)
logger.info(f"Saved {len(default_devices)} default hostnames to {DEFAULT_HOSTNAMES_FILE}")
if deprecated_devices:
with open(DEPRECATED_HOSTNAMES_FILE, 'w') as file:
json.dump(deprecated_devices, file, indent=2)
logger.info(f"Saved {len(deprecated_devices)} deprecated hostnames to {DEPRECATED_HOSTNAMES_FILE}")
except Exception as e:
logger.error(f"Error saving device files: {str(e)}")
def save_unidentified_devices(self, unidentified_devices: List[Dict[str, Any]]) -> None:
"""
Save unidentified devices to a file when debug mode is enabled.
Args:
unidentified_devices: List of devices that were not identified during filtering
"""
if not self.debug:
return
if not unidentified_devices:
logger.debug("No unidentified devices to save")
return
try:
with open(UNIDENTIFIED_DEVICES_FILE, 'w') as file:
json.dump(unidentified_devices, file, indent=2)
logger.info(f"Saved {len(unidentified_devices)} unidentified devices to {UNIDENTIFIED_DEVICES_FILE}")
except Exception as e:
logger.error(f"Error saving unidentified devices file: {str(e)}")
def discover(self) -> None:
"""
Run the device discovery process.
This method retrieves devices from the Unifi Controller using the site-specific
integration API endpoint, filters and classifies them, and then processes them
against existing device files.
"""
logger.info("Starting Tasmota device discovery")
# Get devices from Unifi Controller using the site-specific integration API endpoint
devices = self.unifi_client.get_devices()
if not devices:
logger.warning("No devices retrieved from Unifi Controller")
# Try to use existing valid_hostnames.json file as fallback
if os.path.exists(VALID_HOSTNAMES_FILE):
try:
with open(VALID_HOSTNAMES_FILE, 'r') as file:
valid_hostname_devices = json.load(file)
logger.info(f"Loaded {len(valid_hostname_devices)} devices from {VALID_HOSTNAMES_FILE} as fallback")
# Process existing files and update device lists
updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices)
# Initialize default_hostname_devices as an empty list
default_hostname_devices = []
# Save device information to files
self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices)
logger.info("Device discovery completed successfully using fallback method")
return
except Exception as e:
logger.error(f"Error loading {VALID_HOSTNAMES_FILE}: {str(e)}")
logger.warning("Cannot proceed with device discovery")
return
else:
logger.warning("No valid hostnames file found for fallback")
logger.warning("Cannot proceed with device discovery")
return
logger.info(f"Retrieved {len(devices)} devices from Unifi Controller")
# Filter devices based on network_filter criteria
included_devices, excluded_devices, unidentified_devices = self.filter_devices(devices)
# Save unidentified devices to a file when debug mode is enabled
self.save_unidentified_devices(unidentified_devices)
if not included_devices:
logger.warning("No devices match the filter criteria")
return
logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})")
# Classify devices into valid and default hostname groups
valid_hostname_devices, default_hostname_devices = self.classify_devices(included_devices)
logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default")
# Process existing files and update device lists
updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices)
# Save device information to files
self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices)
logger.info("Device discovery completed successfully")
def setup_logging(debug: bool) -> None:
"""
Set up logging configuration.
Args:
debug: Whether to enable debug mode
"""
log_level = logging.DEBUG if debug else logging.INFO
log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Configure file handler
file_handler = logging.FileHandler('logs/discovery.log')
file_handler.setLevel(log_level)
file_handler.setFormatter(logging.Formatter(log_format))
# Configure console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(logging.Formatter(log_format))
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(log_level)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# Configure tasmota_discovery logger
logger.setLevel(log_level)
if debug:
logger.debug("Debug mode enabled")
def parse_arguments() -> argparse.Namespace:
"""
Parse command-line arguments.
Returns:
Parsed arguments
"""
parser = argparse.ArgumentParser(description='Tasmota Device Discovery')
parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode with verbose logging')
parser.add_argument('-c', '--config', default='config.yaml', help='Specify a custom config file path')
return parser.parse_args()
def main() -> None:
"""
Main entry point for the script.
"""
args = parse_arguments()
# Set up logging
setup_logging(args.debug)
try:
# Initialize device discovery
discovery = DeviceDiscovery(args.config, args.debug)
# Run discovery process
discovery.discover()
except Exception as e:
logger.error(f"Error during device discovery: {str(e)}")
if args.debug:
import traceback
logger.debug(traceback.format_exc())
sys.exit(1)
if __name__ == "__main__":
main()