2852 lines
146 KiB
Python
Executable File
2852 lines
146 KiB
Python
Executable File
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
import re # Import the regular expression module
|
|
import time
|
|
import argparse
|
|
|
|
# Disable SSL warnings
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
|
|
|
class UnifiClient:
|
|
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.site_id = site_id
|
|
self.session = requests.Session()
|
|
self.session.verify = verify_ssl
|
|
|
|
# Initialize cookie jar
|
|
self.session.cookies.clear()
|
|
|
|
def _login(self) -> requests.Response: # Changed return type annotation
|
|
"""Authenticate with the UniFi Controller."""
|
|
login_url = f"{self.base_url}/api/auth/login"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
payload = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"remember": False
|
|
}
|
|
try:
|
|
response = self.session.post(
|
|
login_url,
|
|
json=payload,
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
if 'X-CSRF-Token' in response.headers:
|
|
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
|
|
return response # Return the response object
|
|
except requests.exceptions.RequestException as e:
|
|
if hasattr(e, 'response') and e.response.status_code == 401:
|
|
raise Exception("Authentication failed. Please verify your username and password.") from e
|
|
raise
|
|
|
|
def get_clients(self) -> list:
|
|
"""Get all clients from the UniFi Controller."""
|
|
# Try the newer API endpoint first
|
|
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If the newer endpoint fails, try the legacy endpoint
|
|
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If both fail, try the v2 API endpoint
|
|
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
|
|
def get_devices(self) -> list:
|
|
"""Get UniFi network devices (e.g., Access Points) from the Controller."""
|
|
# Try the newer API endpoint first
|
|
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException:
|
|
# Try legacy endpoint
|
|
url = f"{self.base_url}/api/s/{self.site_id}/stat/device"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException:
|
|
return []
|
|
|
|
class TasmotaDiscovery:
|
|
def __init__(self, debug: bool = False):
|
|
"""Initialize the TasmotaDiscovery with optional debug mode."""
|
|
log_level = logging.DEBUG if debug else logging.INFO
|
|
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if debug else '%(asctime)s - %(levelname)s - %(message)s'
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format=log_format,
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
# Redirect info logs to debug so all 'info' statements behave as debug-level
|
|
try:
|
|
self.logger.info = self.logger.debug
|
|
except Exception:
|
|
pass
|
|
self.config = None
|
|
self.unifi_client = None
|
|
|
|
def load_config(self, config_path: Optional[str] = None) -> dict:
|
|
"""Load configuration from JSON file."""
|
|
if config_path is None:
|
|
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
|
|
|
self.logger.debug(f"Loading configuration from: {config_path}")
|
|
try:
|
|
with open(config_path, 'r') as config_file:
|
|
self.config = json.load(config_file)
|
|
self.logger.debug("Configuration loaded successfully from %s", config_path)
|
|
return self.config
|
|
except FileNotFoundError:
|
|
self.logger.error(f"Configuration file not found at {config_path}")
|
|
sys.exit(1)
|
|
except json.JSONDecodeError:
|
|
self.logger.error("Invalid JSON in configuration file")
|
|
sys.exit(1)
|
|
|
|
def setup_unifi_client(self):
|
|
"""Set up the UniFi client with better error handling"""
|
|
self.logger.debug("Setting up UniFi client")
|
|
|
|
if not self.config or 'unifi' not in self.config:
|
|
raise ValueError("Missing UniFi configuration")
|
|
|
|
unifi_config = self.config['unifi']
|
|
required_fields = ['host', 'username', 'password', 'site']
|
|
missing_fields = [field for field in required_fields if field not in unifi_config]
|
|
|
|
if missing_fields:
|
|
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
|
|
|
|
try:
|
|
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
|
|
self.unifi_client = UnifiClient(
|
|
base_url=unifi_config['host'],
|
|
username=unifi_config['username'],
|
|
password=unifi_config['password'],
|
|
site_id=unifi_config['site'],
|
|
verify_ssl=False # Add this if using self-signed certificates
|
|
)
|
|
|
|
# Test the connection by making a simple request
|
|
response = self.unifi_client._login()
|
|
if not response:
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
|
|
|
|
self.logger.debug("UniFi client setup successful")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error setting up UniFi client: {str(e)}")
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
|
|
|
|
def is_tasmota_device(self, device: dict) -> bool:
|
|
"""Determine if a device is in the network_filter and not in exclude_patterns.
|
|
|
|
The function checks:
|
|
1. If the device's IP address starts with any subnet in network_filter
|
|
2. If the device's name or hostname does NOT match any exclude_patterns
|
|
|
|
Both checks use case-insensitive matching, and glob patterns (with *) in
|
|
exclude_patterns are converted to regex patterns for matching.
|
|
"""
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
ip = device.get('ip', '')
|
|
|
|
# Check if device is in the configured network
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
if ip.startswith(network['subnet']):
|
|
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
|
|
|
|
# Check if device should be excluded based on exclude_patterns
|
|
exclude_patterns = network.get('exclude_patterns', [])
|
|
if self.is_device_excluded(name, hostname, exclude_patterns, log_level='debug'):
|
|
return False
|
|
|
|
# Device is in the network and not excluded
|
|
self.logger.debug(f"Found device in network: {name}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def _match_pattern(self, text_or_texts, pattern: str, use_complex_matching: bool = False, match_entire_string: bool = False, log_level: str = 'debug') -> bool:
|
|
"""Common function to match a string or multiple strings against a pattern.
|
|
|
|
This function handles the regex pattern matching logic for both is_hostname_unknown
|
|
and is_device_excluded functions. It supports both simple prefix matching and more
|
|
complex matching for patterns that should match anywhere in the string.
|
|
|
|
Args:
|
|
text_or_texts: The string or list of strings to match against the pattern.
|
|
If a list is provided, the function returns True if any string matches.
|
|
pattern: The pattern to match against
|
|
use_complex_matching: Whether to use the more complex matching logic for patterns
|
|
starting with ^.* (default: False)
|
|
match_entire_string: Whether to match the entire string by adding $ at the end
|
|
of the regex pattern (default: False)
|
|
log_level: The logging level to use (default: 'debug')
|
|
|
|
Returns:
|
|
bool: True if any of the provided texts match the pattern, False otherwise
|
|
"""
|
|
# Convert pattern to lowercase for case-insensitive matching
|
|
pattern_lower = pattern.lower()
|
|
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Handle single string or list of strings
|
|
if isinstance(text_or_texts, str):
|
|
texts = [text_or_texts] if text_or_texts else []
|
|
else:
|
|
texts = [t for t in text_or_texts if t]
|
|
|
|
# If no valid texts to match, return False
|
|
if not texts:
|
|
return False
|
|
|
|
# Special case for patterns like ^.*something.* which should match anywhere in the string
|
|
if use_complex_matching and pattern_lower.startswith('^.*'):
|
|
# Extract the part after ^.* to use with re.search
|
|
search_part = pattern_lower[3:] # Remove the ^.* prefix
|
|
|
|
# For patterns that should match anywhere, we need to handle wildcards differently
|
|
# We want "sonos.*" to match "sonos", "mysonos", "sonosdevice", etc.
|
|
# We also want "sonos" to match exactly, without requiring characters after it
|
|
|
|
# First, handle the case where the search part ends with .*
|
|
if search_part.endswith('.*'):
|
|
# Remove the .* at the end and make it optional
|
|
base_part = search_part[:-2] # Remove the .* at the end
|
|
search_regex = base_part
|
|
else:
|
|
# For other patterns, just use the search part as is
|
|
search_regex = search_part
|
|
|
|
# Replace any remaining * with .* but not escape the dots
|
|
search_regex = search_regex.replace('*', '.*')
|
|
|
|
# Check each text
|
|
for text in texts:
|
|
# Use re.search to find the pattern anywhere in the string
|
|
if re.search(search_regex, text):
|
|
return True
|
|
else:
|
|
# Standard pattern matching (prefix matching)
|
|
# Convert glob pattern to regex pattern
|
|
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
|
|
|
|
# Check if pattern already starts with ^
|
|
if pattern_regex.startswith('^'):
|
|
regex_pattern = pattern_regex
|
|
else:
|
|
regex_pattern = f"^{pattern_regex}"
|
|
|
|
# Add $ at the end if matching entire string
|
|
if match_entire_string:
|
|
regex_pattern = f"{regex_pattern}$"
|
|
|
|
# Check each text
|
|
for text in texts:
|
|
# Use re.match to check if the string starts with the pattern
|
|
if re.match(regex_pattern, text):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_device_hostname(self, ip: str, device_name: str = None, timeout: int = 5, log_level: str = 'debug') -> tuple:
|
|
"""Retrieve the hostname from a Tasmota device.
|
|
|
|
This function makes an HTTP request to a Tasmota device to retrieve its self-reported
|
|
hostname using the Status 5 command. It handles error conditions and provides
|
|
consistent logging.
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
device_name: Optional name of the device for logging purposes
|
|
timeout: Timeout for the HTTP request in seconds (default: 5)
|
|
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
|
|
|
|
Returns:
|
|
tuple: (hostname, success)
|
|
- hostname: The device's self-reported hostname, or empty string if not found
|
|
- success: Boolean indicating whether the hostname was successfully retrieved
|
|
|
|
Examples:
|
|
# Basic usage
|
|
hostname, success = manager.get_device_hostname("192.168.1.100")
|
|
if success:
|
|
print(f"Device hostname: {hostname}")
|
|
|
|
# With device name for better logging
|
|
hostname, success = manager.get_device_hostname("192.168.1.100", "Living Room Light")
|
|
|
|
# With custom timeout and log level
|
|
hostname, success = manager.get_device_hostname("192.168.1.100", timeout=10, log_level='info')
|
|
"""
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Use device_name in logs if provided, otherwise use IP
|
|
device_id = device_name if device_name else ip
|
|
|
|
hostname = ""
|
|
success = False
|
|
|
|
try:
|
|
# Log attempt to retrieve hostname
|
|
log_func(f"Retrieving hostname for {device_id} at {ip}")
|
|
|
|
# Make HTTP request to the device
|
|
url = f"http://{ip}/cm?cmnd=Status%205"
|
|
response = requests.get(url, timeout=timeout)
|
|
|
|
# Check if response is successful
|
|
if response.status_code == 200:
|
|
try:
|
|
# Parse JSON response
|
|
status_data = response.json()
|
|
|
|
# Extract hostname from response
|
|
hostname = status_data.get('StatusNET', {}).get('Hostname', '')
|
|
|
|
if hostname:
|
|
log_func(f"Successfully retrieved hostname for {device_id}: {hostname}")
|
|
success = True
|
|
else:
|
|
log_func(f"No hostname found in response for {device_id}")
|
|
except ValueError:
|
|
log_func(f"Failed to parse JSON response from {device_id}")
|
|
else:
|
|
log_func(f"Failed to get hostname for {device_id}: HTTP {response.status_code}")
|
|
except requests.exceptions.RequestException as e:
|
|
log_func(f"Error retrieving hostname for {device_id}: {str(e)}")
|
|
|
|
return hostname, success
|
|
|
|
def is_hostname_unknown(self, hostname: str, patterns: list = None, from_unifi_os: bool = False, ip: str = None) -> bool:
|
|
"""Check if a hostname matches any pattern in unknown_device_patterns.
|
|
|
|
This function provides a centralized way to check if a hostname matches any of the
|
|
unknown_device_patterns defined in the configuration. It uses case-insensitive
|
|
matching and supports glob patterns (with *) in the patterns list.
|
|
|
|
Args:
|
|
hostname: The hostname to check against unknown_device_patterns
|
|
patterns: Optional list of patterns to check against. If not provided,
|
|
patterns will be loaded from the configuration.
|
|
from_unifi_os: Whether the hostname is from Unifi OS (handles Unifi Hostname bug)
|
|
ip: IP address of the device. If provided, hostname validation is skipped.
|
|
|
|
Returns:
|
|
bool: True if the hostname matches any pattern, False otherwise
|
|
|
|
Examples:
|
|
# Check if a hostname matches any unknown_device_patterns in the config
|
|
if manager.is_hostname_unknown("tasmota_device123"):
|
|
print("This is an unknown device")
|
|
|
|
# Check against a specific list of patterns
|
|
custom_patterns = ["esp-*", "tasmota_*"]
|
|
if manager.is_hostname_unknown("esp-abcd", custom_patterns):
|
|
print("This matches a custom pattern")
|
|
|
|
# Check with Unifi Hostname bug handling
|
|
if manager.is_hostname_unknown("tasmota_device123", from_unifi_os=True):
|
|
print("This is an unknown device from Unifi OS")
|
|
|
|
# Skip hostname validation when IP is provided
|
|
if manager.is_hostname_unknown("", ip="192.168.1.100"):
|
|
print("This is an unknown device with IP")
|
|
"""
|
|
# If IP is provided and from_unifi_os is False, we can skip hostname validation
|
|
if ip and not from_unifi_os:
|
|
self.logger.debug(f"IP provided ({ip}) and from_unifi_os is False, skipping hostname validation")
|
|
return True
|
|
|
|
# If no patterns provided, get them from the configuration
|
|
if patterns is None:
|
|
patterns = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
# Convert hostname to lowercase for case-insensitive matching
|
|
hostname_lower = hostname.lower()
|
|
|
|
# Handle Unifi Hostname bug if hostname is from Unifi OS
|
|
if from_unifi_os and ip:
|
|
self.logger.debug(f"Handling hostname '{hostname}' from Unifi OS (bug handling enabled)")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
try:
|
|
device_reported_hostname, success = self.get_device_hostname(ip, hostname, timeout=5, log_level='debug')
|
|
except Exception as e:
|
|
self.logger.debug(f"Failed to retrieve self-reported hostname for {hostname} at {ip}: {e}")
|
|
device_reported_hostname, success = "", False
|
|
|
|
if success:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
# Use the base of the self-reported hostname up to the first '-' for bug detection
|
|
device_hostname_base = device_reported_hostname.split('-')[0].lower()
|
|
for pattern in patterns:
|
|
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.debug(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
|
|
# this indicates the UniFi OS hostname bug
|
|
if not device_hostname_matches_unknown:
|
|
# First check if the UniFi-reported hostname matches unknown patterns
|
|
unifi_hostname_matches_unknown = False
|
|
for pattern in patterns:
|
|
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
|
|
unifi_hostname_matches_unknown = True
|
|
break
|
|
|
|
if unifi_hostname_matches_unknown:
|
|
self.logger.info(f"UniFi OS hostname bug detected for {hostname}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
|
|
return False # Not an unknown device despite what UniFi reports
|
|
elif from_unifi_os:
|
|
self.logger.debug(f"Cannot check device's self-reported hostname for {hostname}: No IP address provided")
|
|
|
|
# Check if hostname matches any pattern
|
|
for pattern in patterns:
|
|
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
|
|
self.logger.debug(f"Hostname '{hostname}' matches unknown device pattern: {pattern}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_device_excluded(self, device_name: str, hostname: str = '', patterns: list = None, log_level: str = 'debug') -> bool:
|
|
"""Check if a device name or hostname matches any pattern in exclude_patterns.
|
|
|
|
This function provides a centralized way to check if a device should be excluded
|
|
based on its name or hostname matching any of the exclude_patterns defined in the
|
|
configuration. It uses case-insensitive matching and supports glob patterns (with *)
|
|
in the patterns list.
|
|
|
|
Args:
|
|
device_name: The device name to check against exclude_patterns
|
|
hostname: The device hostname to check against exclude_patterns (optional)
|
|
patterns: Optional list of patterns to check against. If not provided,
|
|
patterns will be loaded from the configuration.
|
|
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
|
|
|
|
Returns:
|
|
bool: True if the device should be excluded (name or hostname matches any pattern),
|
|
False otherwise
|
|
|
|
Examples:
|
|
# Check if a device should be excluded based on patterns in the config
|
|
if manager.is_device_excluded("homeassistant", "homeassistant.local"):
|
|
print("This device should be excluded")
|
|
|
|
# Check against a specific list of patterns
|
|
custom_patterns = ["^homeassistant*", "^.*sonos.*"]
|
|
if manager.is_device_excluded("sonos-speaker", "sonos.local", custom_patterns, log_level='info'):
|
|
print("This device matches a custom exclude pattern")
|
|
"""
|
|
# If no patterns provided, get them from the configuration
|
|
if patterns is None:
|
|
patterns = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
patterns.extend(network.get('exclude_patterns', []))
|
|
|
|
# Convert device_name and hostname to lowercase for case-insensitive matching
|
|
name = device_name.lower() if device_name else ''
|
|
hostname_lower = hostname.lower() if hostname else ''
|
|
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Create a list of texts to check (device name and hostname)
|
|
texts = [name, hostname_lower]
|
|
|
|
# Check if device name or hostname matches any pattern
|
|
for pattern in patterns:
|
|
pattern_lower = pattern.lower()
|
|
|
|
# Special case for patterns like ^.*something.* which should match anywhere in the string
|
|
if pattern_lower.startswith('^.*'):
|
|
# Use complex matching for patterns starting with ^.*
|
|
if self._match_pattern(texts, pattern, use_complex_matching=True, log_level=log_level):
|
|
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
|
|
return True
|
|
continue
|
|
|
|
# For normal patterns, match the entire string
|
|
if self._match_pattern(texts, pattern, match_entire_string=True, log_level=log_level):
|
|
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_tasmota_devices(self) -> list:
|
|
"""Query UniFi controller and filter Tasmota devices.
|
|
|
|
This function:
|
|
1. Retrieves all clients from the UniFi controller
|
|
2. Filters them using is_tasmota_device() to find devices in the network_filter
|
|
that are not in exclude_patterns
|
|
3. Determines each device's connection type (Wireless/Wired)
|
|
4. Checks for the UniFi OS hostname bug whenever a device matches unknown_device_patterns
|
|
5. Creates a standardized device_info dictionary for each device
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing device information with fields:
|
|
- name: Device name or hostname
|
|
- ip: IP address
|
|
- mac: MAC address
|
|
- last_seen: When the device was last seen by UniFi
|
|
- hostname: Device hostname
|
|
- notes: Any notes from UniFi
|
|
- connection: Connection type (Wireless/Wired)
|
|
- unifi_hostname_bug_detected: Flag indicating if the UniFi OS hostname bug was detected
|
|
|
|
If an error occurs during the process, an empty list is returned.
|
|
"""
|
|
devices = []
|
|
self.logger.debug("Querying UniFi controller for devices")
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
|
|
# Get unknown device patterns for hostname bug detection
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_clients:
|
|
if self.is_tasmota_device(device):
|
|
# Determine connection type based on available fields
|
|
connection = "Unknown"
|
|
if device.get('essid'):
|
|
connection = f"Wireless - {device.get('essid')}"
|
|
elif device.get('radio') or device.get('wifi'):
|
|
connection = "Wireless"
|
|
elif device.get('port') or device.get('switch_port') or device.get('switch'):
|
|
connection = "Wired"
|
|
|
|
# Get device details
|
|
device_name = device.get('name', device.get('hostname', 'Unknown'))
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device.get('ip', '')
|
|
|
|
# Initialize the UniFi hostname bug flag
|
|
unifi_hostname_bug_detected = False
|
|
device_reported_hostname = None
|
|
|
|
# Check if device name or hostname matches unknown patterns
|
|
unifi_name_matches_unknown = (
|
|
self.is_hostname_unknown(device_name, unknown_patterns) or
|
|
self.is_hostname_unknown(device_hostname, unknown_patterns)
|
|
)
|
|
if unifi_name_matches_unknown:
|
|
self.logger.debug(f"Device {device_name} matches unknown device pattern")
|
|
|
|
# If the name matches unknown patterns, check the device's self-reported hostname
|
|
# before declaring it unknown
|
|
#
|
|
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
|
|
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
|
|
if unifi_name_matches_unknown and device_ip:
|
|
self.logger.debug(f"Checking device's self-reported hostname for {device_name}")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='debug')
|
|
|
|
if success:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
# Use the base of the self-reported hostname up to the first '-' for bug detection
|
|
device_hostname_base = device_reported_hostname.split('-')[0].lower()
|
|
for pattern in unknown_patterns:
|
|
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.debug(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
|
|
# this indicates the UniFi OS hostname bug
|
|
if not device_hostname_matches_unknown:
|
|
unifi_hostname_bug_detected = True
|
|
self.logger.info(f"UniFi OS hostname bug detected for {device_name}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
|
|
|
|
# If bug detected and we have the device-reported hostname, log it but keep UniFi fields for listing
|
|
if unifi_hostname_bug_detected and device_reported_hostname:
|
|
self.logger.info(f"UniFi bug detected: will prefer self-reported hostname '{device_reported_hostname}' during processing; keeping UniFi listing name '{device_name}'")
|
|
|
|
device_info = {
|
|
"name": device_name,
|
|
"ip": device_ip,
|
|
"mac": device.get('mac', ''),
|
|
"last_seen": device.get('last_seen', ''),
|
|
"hostname": device_hostname,
|
|
"notes": device.get('note', ''),
|
|
"connection": connection,
|
|
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
|
|
}
|
|
devices.append(device_info)
|
|
|
|
self.logger.debug(f"Found {len(devices)} Tasmota devices")
|
|
return devices
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
return []
|
|
|
|
def save_tasmota_config(self, devices: list) -> None:
|
|
"""Save Tasmota device information to a JSON file with device tracking."""
|
|
filename = "current.json"
|
|
self.logger.debug(f"Saving Tasmota configuration to {filename}")
|
|
deprecated_filename = "deprecated.json"
|
|
|
|
current_devices = []
|
|
deprecated_devices = []
|
|
|
|
# Load existing devices if file exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
existing_config = json.load(f)
|
|
current_devices = existing_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {filename}, treating as empty")
|
|
current_devices = []
|
|
|
|
# Load deprecated devices if file exists
|
|
if os.path.exists(deprecated_filename):
|
|
try:
|
|
with open(deprecated_filename, 'r') as f:
|
|
deprecated_config = json.load(f)
|
|
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
|
|
deprecated_devices = []
|
|
|
|
# Create new config
|
|
new_devices = []
|
|
moved_to_deprecated = []
|
|
restored_from_deprecated = []
|
|
removed_from_deprecated = []
|
|
excluded_devices = []
|
|
|
|
# Get exclude_patterns for checking excluded devices
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
exclude_patterns = []
|
|
for network in network_filters.values():
|
|
exclude_patterns.extend(network.get('exclude_patterns', []))
|
|
|
|
# Process current devices
|
|
for device in devices:
|
|
device_name = device['name']
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device['ip']
|
|
device_mac = device['mac']
|
|
|
|
# Check if device should be excluded
|
|
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='info'):
|
|
print(f"Device {device_name} excluded by pattern - skipping")
|
|
excluded_devices.append(device_name)
|
|
continue
|
|
|
|
# Check in current devices
|
|
existing_device = next((d for d in current_devices
|
|
if d['name'] == device_name), None)
|
|
|
|
if existing_device:
|
|
# Device exists, check if IP or MAC changed
|
|
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
|
|
moved_to_deprecated.append(existing_device)
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
|
|
else:
|
|
new_devices.append(existing_device) # Keep existing device
|
|
else:
|
|
# New device, check if it was in deprecated
|
|
deprecated_device = next((d for d in deprecated_devices
|
|
if d['name'] == device_name), None)
|
|
if deprecated_device:
|
|
removed_from_deprecated.append(device_name)
|
|
print(f"Device {device_name} removed from deprecated (restored)")
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} added to output file")
|
|
|
|
# Find devices that are no longer present
|
|
current_names = {d['name'] for d in devices}
|
|
for existing_device in current_devices:
|
|
if existing_device['name'] not in current_names:
|
|
if not self.is_device_excluded(existing_device['name'], existing_device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
moved_to_deprecated.append(existing_device)
|
|
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
|
|
|
|
# Update deprecated devices list, excluding any excluded devices
|
|
final_deprecated = []
|
|
for device in deprecated_devices:
|
|
if device['name'] not in removed_from_deprecated and not self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
final_deprecated.append(device)
|
|
elif self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
|
|
|
|
final_deprecated.extend(moved_to_deprecated)
|
|
|
|
# Save new configuration
|
|
config = {
|
|
"tasmota": {
|
|
"devices": new_devices,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(new_devices)
|
|
}
|
|
}
|
|
|
|
# Save deprecated configuration
|
|
deprecated_config = {
|
|
"tasmota": {
|
|
"devices": final_deprecated,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(final_deprecated)
|
|
}
|
|
}
|
|
|
|
# Backup existing file if it exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
backup_name = f"{filename}.backup"
|
|
os.rename(filename, backup_name)
|
|
self.logger.info(f"Created backup of existing configuration as {backup_name}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating backup: {e}")
|
|
|
|
# Save files
|
|
try:
|
|
with open(filename, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
with open(deprecated_filename, 'w') as f:
|
|
json.dump(deprecated_config, f, indent=4)
|
|
|
|
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
|
|
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
|
|
|
|
print("\nDevice Status Summary:")
|
|
if excluded_devices:
|
|
print("\nExcluded Devices:")
|
|
for name in excluded_devices:
|
|
print(f"- {name}")
|
|
|
|
if moved_to_deprecated:
|
|
print("\nMoved to deprecated:")
|
|
for device in moved_to_deprecated:
|
|
print(f"- {device['name']}")
|
|
|
|
if removed_from_deprecated:
|
|
print("\nRestored from deprecated:")
|
|
for name in removed_from_deprecated:
|
|
print(f"- {name}")
|
|
|
|
print("\nCurrent Tasmota Devices:")
|
|
for device in new_devices:
|
|
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving configuration: {e}")
|
|
|
|
def get_unknown_devices(self, use_current_json=True):
|
|
"""Identify devices that match unknown_device_patterns from current.json."""
|
|
self.logger.info("Identifying unknown devices for processing...")
|
|
unknown_devices = []
|
|
|
|
try:
|
|
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
|
with open(source_file, 'r') as f:
|
|
data = json.load(f)
|
|
all_devices = data.get('tasmota', {}).get('devices', [])
|
|
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
|
except FileNotFoundError:
|
|
self.logger.error(f"{source_file} not found. Run discovery first.")
|
|
return []
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Invalid JSON format in {source_file}")
|
|
return []
|
|
|
|
# Identify devices matching unknown_device_patterns
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_devices:
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
|
|
for pattern in unknown_patterns:
|
|
pattern = pattern.lower()
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern.startswith('^'):
|
|
regex_pattern = pattern
|
|
else:
|
|
regex_pattern = f"^{pattern}"
|
|
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
|
|
self.logger.debug(f"Found unknown device: {name} ({hostname})")
|
|
unknown_devices.append(device)
|
|
break
|
|
|
|
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
|
|
return unknown_devices
|
|
|
|
def process_unknown_devices(self):
|
|
"""Process unknown devices by checking for toggle button and configuring them.
|
|
|
|
This method:
|
|
1. Identifies devices matching unknown_device_patterns
|
|
2. Checks if each device has a toggle button (indicating it's a light/switch)
|
|
3. Toggles the button at 1/2Hz while checking for hostname changes
|
|
4. Prompts the user to enter a new name for the device in the console
|
|
5. Once a name is entered, configures the device with the new hostname
|
|
"""
|
|
unknown_devices = self.get_unknown_devices()
|
|
if not unknown_devices:
|
|
self.logger.info("No unknown devices found to process")
|
|
return
|
|
|
|
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
|
|
|
|
for device in unknown_devices:
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
connection = device.get('connection', 'Unknown')
|
|
|
|
if not ip:
|
|
self.logger.warning(f"Skipping device {name} - no IP address")
|
|
continue
|
|
|
|
self.logger.info(f"Processing unknown device: {name} at {ip} with connection {connection}")
|
|
|
|
# Check if device has a toggle button
|
|
try:
|
|
# Get the main page to check for toggle button
|
|
url = f"http://{ip}/"
|
|
response = requests.get(url, timeout=5)
|
|
|
|
# Check if there's a toggle button in the response
|
|
has_toggle = "toggle" in response.text.lower()
|
|
|
|
if has_toggle:
|
|
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
|
|
|
|
# Start toggling at 1/2Hz
|
|
original_hostname = device.get('hostname', '')
|
|
toggle_state = False
|
|
|
|
# Temporarily disable all logging during toggling
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
try:
|
|
# Clear console output and show prompt
|
|
print("\n" + "="*50)
|
|
print(f"DEVICE: {name} at IP: {ip} Connection: {connection}")
|
|
print(f"Current hostname: {original_hostname}")
|
|
print("="*50)
|
|
print("The device is now toggling to help you identify it.")
|
|
|
|
# Start toggling in background while waiting for input
|
|
import threading
|
|
stop_toggle = threading.Event()
|
|
|
|
def toggle_device():
|
|
toggle_state = False
|
|
while not stop_toggle.is_set():
|
|
toggle_state = not toggle_state
|
|
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
|
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
|
|
try:
|
|
requests.get(toggle_url, timeout=2)
|
|
except:
|
|
pass
|
|
time.sleep(2.0) # 1/2Hz rate
|
|
|
|
# Start toggle thread
|
|
toggle_thread = threading.Thread(target=toggle_device)
|
|
toggle_thread.daemon = True
|
|
toggle_thread.start()
|
|
|
|
# Prompt for new hostname
|
|
print("\nPlease enter a new name for this device:")
|
|
print("(Enter nothing, 'unknown', or 'na' to assume device could not be found and end)")
|
|
new_hostname = input("> ").strip()
|
|
|
|
# Stop toggling
|
|
stop_toggle.set()
|
|
toggle_thread.join(timeout=3)
|
|
|
|
# Check for special inputs that indicate device could not be found
|
|
if not new_hostname or new_hostname.lower() == "unknown" or new_hostname.lower() == "na":
|
|
print("Assuming device could not be found, ending process")
|
|
return # End the entire process
|
|
|
|
if new_hostname != original_hostname:
|
|
print(f"Setting new hostname to: {new_hostname}")
|
|
else:
|
|
print("No valid hostname entered, skipping device")
|
|
new_hostname = ""
|
|
|
|
finally:
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
|
|
# If a new hostname was entered, configure the device
|
|
if new_hostname:
|
|
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
|
|
self.configure_unknown_device(ip, new_hostname)
|
|
else:
|
|
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
|
|
else:
|
|
self.logger.info(f"Device {name} does not have a toggle button, skipping")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
|
|
|
def check_and_update_template(self, ip, name):
|
|
"""Check and update device template based on config_other settings.
|
|
|
|
Algorithm:
|
|
1. Get the device name from the Configuration/Other page using Status 0
|
|
2. Get the current template using Template command
|
|
3. Check if any key in config_other matches the device name
|
|
4. If a match is found, check if the template matches the value
|
|
5. If the template doesn't match, write the value to the template
|
|
6. If no key matches, check if any value matches the template
|
|
7. If a value match is found, write the key to the device name
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
name: The name/hostname of the device
|
|
|
|
Returns:
|
|
bool: True if template was updated, False otherwise
|
|
"""
|
|
try:
|
|
# Get device_list (new) or legacy config_other settings
|
|
device_list = self.config.get('device_list')
|
|
config_other = {}
|
|
if isinstance(device_list, dict):
|
|
# Map device_list entries to name -> template string
|
|
for k, v in device_list.items():
|
|
if isinstance(v, dict) and 'template' in v:
|
|
config_other[k] = v.get('template', '')
|
|
else:
|
|
config_other = self.config.get('config_other', {})
|
|
if not config_other:
|
|
self.logger.debug(f"{name}: No device_list/config_other settings found in configuration")
|
|
return False
|
|
|
|
# Get Status 0 for device name from Configuration/Other page with increased timeout
|
|
url_status0 = f"http://{ip}/cm?cmnd=Status%200"
|
|
try:
|
|
self.logger.debug(f"{name}: Getting Status 0 with increased timeout (10 seconds)")
|
|
response = requests.get(url_status0, timeout=10)
|
|
status0_data = response.json()
|
|
|
|
# Log the actual response format for debugging
|
|
self.logger.debug(f"{name}: Status 0 response: {status0_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting Status 0 (10 seconds) - device may be busy")
|
|
# Try one more time with even longer timeout
|
|
try:
|
|
self.logger.debug(f"{name}: Retrying Status 0 with 20 second timeout")
|
|
response = requests.get(url_status0, timeout=20)
|
|
status0_data = response.json()
|
|
self.logger.debug(f"{name}: Status 0 response on retry: {status0_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting Status 0 even with 20 second timeout")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting Status 0 on retry: {str(e)}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting Status 0: {str(e)}")
|
|
return False
|
|
|
|
# Extract device name from Status 0 response
|
|
device_name = status0_data.get("Status", {}).get("DeviceName", "")
|
|
if not device_name:
|
|
self.logger.debug(f"{name}: Could not get device name from Status 0")
|
|
return False
|
|
|
|
self.logger.debug(f"{name}: Device name from Configuration/Other page: {device_name}")
|
|
|
|
# Get current template with increased timeout
|
|
url_template = f"http://{ip}/cm?cmnd=Template"
|
|
try:
|
|
self.logger.debug(f"{name}: Getting template with increased timeout (10 seconds)")
|
|
response = requests.get(url_template, timeout=10)
|
|
template_data = response.json()
|
|
|
|
# Log the actual response format for debugging
|
|
self.logger.debug(f"{name}: Template response: {template_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting template (10 seconds) - device may be busy")
|
|
# Try one more time with even longer timeout
|
|
try:
|
|
self.logger.debug(f"{name}: Retrying with 20 second timeout")
|
|
response = requests.get(url_template, timeout=20)
|
|
template_data = response.json()
|
|
self.logger.debug(f"{name}: Template response on retry: {template_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting template even with 20 second timeout")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting template on retry: {str(e)}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting template: {str(e)}")
|
|
return False
|
|
|
|
# Extract current template - handle different response formats
|
|
current_template = ""
|
|
|
|
# Try different possible response formats
|
|
if "Template" in template_data:
|
|
current_template = template_data.get("Template", "")
|
|
elif isinstance(template_data, dict) and len(template_data) > 0:
|
|
# If there's no "Template" key but we have a dict, try to get the first value
|
|
# This handles cases where the response might be {"NAME":"...","GPIO":[...]}
|
|
first_key = next(iter(template_data))
|
|
if isinstance(template_data[first_key], str) and "{" in template_data[first_key]:
|
|
current_template = template_data[first_key]
|
|
self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}")
|
|
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
|
|
elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
|
# Convert the dict to a JSON string to match the expected format
|
|
import json
|
|
current_template = json.dumps(template_data)
|
|
self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys")
|
|
|
|
if not current_template:
|
|
self.logger.debug(f"{name}: Could not get current template from response")
|
|
return False
|
|
|
|
self.logger.debug(f"{name}: Current template: {current_template}")
|
|
|
|
# Check if any key in config_other matches the device name
|
|
template_updated = False
|
|
if device_name in config_other:
|
|
# Key matches device name, check if value is blank or empty
|
|
template_value = config_other[device_name]
|
|
if not template_value or template_value.strip() == "":
|
|
# Value is blank or empty, print message and skip template check
|
|
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but value is blank or empty")
|
|
print(f"\nDevice {name} at {ip} must be set manually in Configuration/Module to: {device_name}")
|
|
print(f"The config_other entry has a blank value for key: {device_name}")
|
|
return False
|
|
elif current_template != template_value:
|
|
# Template doesn't match, write value to template with retry and post-verification
|
|
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but template doesn't match")
|
|
self.logger.info(f"{name}: Setting template to: {template_value}")
|
|
import urllib.parse
|
|
encoded_value = urllib.parse.quote(template_value)
|
|
|
|
max_attempts = 3
|
|
last_error = None
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
|
|
self.logger.debug(f"{name}: Setting template (attempt {attempt}/{max_attempts})")
|
|
response = requests.get(url, timeout=10)
|
|
if response.status_code != 200:
|
|
last_error = f"HTTP {response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to update template: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
self.logger.info(f"{name}: Template command accepted")
|
|
|
|
# Activate the template by setting module to 0 (Template module)
|
|
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
|
try:
|
|
module_response = requests.get(module_url, timeout=10)
|
|
if module_response.status_code != 200:
|
|
last_error = f"HTTP {module_response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to set module to 0: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
self.logger.info(f"{name}: Module set to 0 successfully")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error setting module to 0: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Restart the device to apply the template
|
|
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
try:
|
|
restart_response = requests.get(restart_url, timeout=10)
|
|
if restart_response.status_code != 200:
|
|
last_error = f"HTTP {restart_response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to restart device: {last_error}")
|
|
else:
|
|
self.logger.info(f"{name}: Device restart initiated successfully")
|
|
except requests.exceptions.Timeout:
|
|
# Restart may time out due to reboot; log and proceed to verification
|
|
last_error = "Timeout"
|
|
self.logger.info(f"{name}: Restart timed out (device rebooting); proceeding to verification")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error restarting device: {last_error}")
|
|
|
|
# Post-update verification: poll the device for the new template
|
|
verified = False
|
|
for vtry in range(1, 4):
|
|
try:
|
|
# Wait a bit to let device come back
|
|
time.sleep(2 if vtry == 1 else 3)
|
|
vt_resp = requests.get(f"http://{ip}/cm?cmnd=Template", timeout=10)
|
|
vt_data = vt_resp.json()
|
|
# Extract template similarly to initial parse
|
|
new_template = ""
|
|
if "Template" in vt_data:
|
|
new_template = vt_data.get("Template", "")
|
|
elif isinstance(vt_data, dict) and len(vt_data) > 0:
|
|
first_key = next(iter(vt_data))
|
|
if isinstance(vt_data[first_key], str) and "{" in vt_data[first_key]:
|
|
new_template = vt_data[first_key]
|
|
elif all(key in vt_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
|
new_template = json.dumps(vt_data)
|
|
if new_template == template_value:
|
|
self.logger.info(f"{name}: Template verification succeeded on attempt {vtry}")
|
|
template_updated = True
|
|
verified = True
|
|
break
|
|
except Exception as ve:
|
|
self.logger.debug(f"{name}: Template verification attempt {vtry} failed: {ve}")
|
|
if verified:
|
|
break
|
|
else:
|
|
last_error = last_error or "Verification failed"
|
|
self.logger.warning(f"{name}: Template verification failed (attempt {attempt}/{max_attempts})")
|
|
except requests.exceptions.Timeout:
|
|
last_error = "Timeout"
|
|
self.logger.warning(f"{name}: Timeout updating template (attempt {attempt}/{max_attempts})")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error updating template: {last_error} (attempt {attempt}/{max_attempts})")
|
|
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
|
|
if not template_updated:
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": "Template <config_other>",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
self.logger.debug(f"{name}: Device name '{device_name}' matches key in config_other and template matches value")
|
|
else:
|
|
# No key matches device name, check if any value matches the template
|
|
matching_key = None
|
|
for key, value in config_other.items():
|
|
if value == current_template:
|
|
matching_key = key
|
|
break
|
|
|
|
if matching_key:
|
|
# Value matches template, write key to device name
|
|
self.logger.info(f"{name}: Template matches value for key '{matching_key}' in config_other")
|
|
self.logger.info(f"{name}: Setting device name to: {matching_key}")
|
|
|
|
max_attempts = 3
|
|
last_error = None
|
|
for attempt in range(1, max_attempts + 1):
|
|
try:
|
|
url = f"http://{ip}/cm?cmnd=DeviceName%20{matching_key}"
|
|
self.logger.debug(f"{name}: Setting device name (attempt {attempt}/{max_attempts})")
|
|
response = requests.get(url, timeout=10)
|
|
if response.status_code != 200:
|
|
last_error = f"HTTP {response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to update device name: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
self.logger.info(f"{name}: Device name command accepted")
|
|
|
|
# Activate the template by setting module to 0 (Template module)
|
|
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
|
try:
|
|
module_response = requests.get(module_url, timeout=10)
|
|
if module_response.status_code != 200:
|
|
last_error = f"HTTP {module_response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to set module to 0: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
self.logger.info(f"{name}: Module set to 0 successfully")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error setting module to 0: {last_error}")
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
continue
|
|
|
|
# Restart the device to apply the template
|
|
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
try:
|
|
restart_response = requests.get(restart_url, timeout=10)
|
|
if restart_response.status_code != 200:
|
|
last_error = f"HTTP {restart_response.status_code}"
|
|
self.logger.warning(f"{name}: Failed to restart device: {last_error}")
|
|
else:
|
|
self.logger.info(f"{name}: Device restart initiated successfully")
|
|
except requests.exceptions.Timeout:
|
|
last_error = "Timeout"
|
|
self.logger.info(f"{name}: Restart timed out (device rebooting); proceeding to verification")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error restarting device: {last_error}")
|
|
|
|
# Post-update verification: poll Status 0 for the new device name
|
|
verified = False
|
|
for vtry in range(1, 4):
|
|
try:
|
|
time.sleep(2 if vtry == 1 else 3)
|
|
v_resp = requests.get(f"http://{ip}/cm?cmnd=Status%200", timeout=10)
|
|
v_data = v_resp.json()
|
|
new_name = v_data.get("Status", {}).get("DeviceName", "")
|
|
if new_name == matching_key:
|
|
self.logger.info(f"{name}: Device name verification succeeded on attempt {vtry}")
|
|
template_updated = True
|
|
verified = True
|
|
break
|
|
except Exception as ve:
|
|
self.logger.debug(f"{name}: Device name verification attempt {vtry} failed: {ve}")
|
|
if verified:
|
|
break
|
|
else:
|
|
last_error = last_error or "Verification failed"
|
|
self.logger.warning(f"{name}: Device name verification failed (attempt {attempt}/{max_attempts})")
|
|
except requests.exceptions.Timeout:
|
|
last_error = "Timeout"
|
|
self.logger.warning(f"{name}: Timeout updating device name (attempt {attempt}/{max_attempts})")
|
|
except requests.exceptions.RequestException as e:
|
|
last_error = str(e)
|
|
self.logger.warning(f"{name}: Error updating device name: {last_error} (attempt {attempt}/{max_attempts})")
|
|
|
|
if attempt < max_attempts:
|
|
time.sleep(1)
|
|
|
|
if not template_updated:
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"DeviceName {matching_key}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# No matches found, print detailed information about what's on the device
|
|
self.logger.info(f"{name}: No matches found in config_other for either Device Name or Template")
|
|
self.logger.info(f"{name}: Current Device Name on device: '{device_name}'")
|
|
self.logger.info(f"{name}: Current Template on device: '{current_template}'")
|
|
print(f"\nNo template match found for device {name} at {ip}")
|
|
print(f" Device Name on device: '{device_name}'")
|
|
print(f" Template on device: '{current_template}'")
|
|
print("Please add an appropriate entry to device_list (preferred) or legacy config_other in your configuration file.")
|
|
|
|
return template_updated
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error checking/updating template for device at {ip}: {str(e)}")
|
|
return False
|
|
|
|
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False):
|
|
"""Configure MQTT settings for a device.
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
name: The name/hostname of the device
|
|
mqtt_status: The current MQTT status of the device (from Status 6)
|
|
is_new_device: Whether this is a new device (True) or existing device (False)
|
|
set_friendly_name: Whether to set the friendly name
|
|
enable_mqtt: Whether to enable MQTT
|
|
with_retry: Whether to use retry logic
|
|
reboot: Whether to reboot the device after configuration
|
|
|
|
Returns:
|
|
bool: True if configuration was successful, False otherwise
|
|
"""
|
|
try:
|
|
# Set Friendly Name if requested
|
|
if set_friendly_name:
|
|
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{name}"
|
|
response = requests.get(friendly_name_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Set Friendly Name to {name}")
|
|
else:
|
|
self.logger.error(f"Failed to set Friendly Name to {name}")
|
|
|
|
# Enable MQTT if requested
|
|
if enable_mqtt:
|
|
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
|
|
response = requests.get(mqtt_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Enabled MQTT for {name}")
|
|
else:
|
|
self.logger.error(f"Failed to enable MQTT for {name}")
|
|
|
|
# Configure MQTT settings
|
|
mqtt_config = self.config.get('mqtt', {})
|
|
if not mqtt_config:
|
|
self.logger.error("MQTT configuration missing from config file")
|
|
return False
|
|
|
|
# Get the base hostname (everything before the dash)
|
|
hostname_base = name.split('-')[0] if '-' in name else name
|
|
|
|
# Define MQTT fields
|
|
mqtt_fields = {
|
|
"MqttHost": mqtt_config.get('Host', ''),
|
|
"MqttPort": mqtt_config.get('Port', 1883),
|
|
"MqttUser": mqtt_config.get('User', ''),
|
|
"MqttPassword": mqtt_config.get('Password', ''),
|
|
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
|
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
|
}
|
|
|
|
# For existing devices, check if MQTT settings need to be updated
|
|
changes_needed = []
|
|
if not is_new_device and mqtt_status:
|
|
device_mqtt = mqtt_status.get('MqttHost', {})
|
|
force_password_update = False
|
|
|
|
# Check each MQTT setting
|
|
if device_mqtt.get('Host') != mqtt_fields['MqttHost']:
|
|
changes_needed.append(('MqttHost', mqtt_fields['MqttHost']))
|
|
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['MqttHost']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Port') != mqtt_fields['MqttPort']:
|
|
changes_needed.append(('MqttPort', mqtt_fields['MqttPort']))
|
|
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['MqttPort']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('User') != mqtt_fields['MqttUser']:
|
|
changes_needed.append(('MqttUser', mqtt_fields['MqttUser']))
|
|
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['MqttUser']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
|
|
changes_needed.append(('Topic', mqtt_fields['Topic']))
|
|
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
|
|
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
|
|
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
|
|
force_password_update = True
|
|
|
|
# Add password update if any MQTT setting changed or user was updated
|
|
if force_password_update:
|
|
changes_needed.append(('MqttPassword', mqtt_fields['MqttPassword']))
|
|
self.logger.debug(f"{name}: MQTT Password will be updated")
|
|
|
|
# Check NoRetain setting
|
|
no_retain = mqtt_config.get('NoRetain', False)
|
|
if no_retain:
|
|
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
|
|
else:
|
|
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
|
|
else:
|
|
# For new devices, set all MQTT settings
|
|
for field, value in mqtt_fields.items():
|
|
changes_needed.append((field, value))
|
|
|
|
# Apply MQTT settings
|
|
mqtt_updated = False
|
|
for setting, value in changes_needed:
|
|
try:
|
|
# For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value
|
|
if setting == "FullTopic":
|
|
url = f"http://{ip}/cm?cmnd={setting}{value}"
|
|
else:
|
|
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.debug(f"{name}: Updated {setting} to {value}")
|
|
else:
|
|
self.logger.debug(f"{name}: Updated MQTT Password")
|
|
mqtt_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to update {setting} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout updating {setting} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error updating {setting}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to update {setting} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{setting} {value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.info(f"{name}: Set {setting} to {value}")
|
|
else:
|
|
self.logger.info(f"{name}: Set MQTT Password")
|
|
mqtt_updated = True
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {setting}")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
|
|
|
|
# Apply console settings
|
|
console_updated = self.apply_console_settings(ip, name, with_retry)
|
|
|
|
# Reboot the device if requested
|
|
if reboot:
|
|
save_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
response = requests.get(save_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Saved configuration and rebooted {name}")
|
|
else:
|
|
self.logger.error(f"Failed to save configuration for {name}")
|
|
|
|
return mqtt_updated or console_updated
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
|
|
return False
|
|
|
|
def apply_console_settings(self, ip, name, with_retry=False):
|
|
"""Apply console parameters from configuration to the device.
|
|
Returns True if any setting was updated, False otherwise.
|
|
Supports both legacy 'console' dict and new 'console_set' list formats
|
|
at either the config root or under the 'mqtt' section. The 'console_set'
|
|
list uses strings like "SetOption1 0" or "rule1 on ...".
|
|
"""
|
|
console_updated = False
|
|
|
|
# Build a unified console_params dict from various supported locations
|
|
console_params = {}
|
|
|
|
def add_from_console_set(console_set_list):
|
|
# Parse list of "Param Value..." into dict preserving last-wins
|
|
for entry in console_set_list or []:
|
|
try:
|
|
if not isinstance(entry, str):
|
|
continue
|
|
parts = entry.strip().split(' ', 1)
|
|
if not parts:
|
|
continue
|
|
param = parts[0].strip()
|
|
value = parts[1] if len(parts) > 1 else ''
|
|
console_params[param] = value
|
|
except Exception:
|
|
continue
|
|
|
|
# Determine selected console set name based on device_list mapping
|
|
selected_set_name = 'Default'
|
|
try:
|
|
# Attempt to read device name from device_list using Status 0 like template logic (best-effort)
|
|
device_list = self.config.get('device_list', {})
|
|
if isinstance(device_list, dict) and device_list:
|
|
# Fetch Status 0 to get DeviceName for key matching
|
|
try:
|
|
url_status0 = f"http://{ip}/cm?cmnd=Status%200"
|
|
resp = requests.get(url_status0, timeout=5)
|
|
data = resp.json()
|
|
device_name = data.get('Status', {}).get('DeviceName')
|
|
except Exception:
|
|
device_name = None
|
|
if device_name and device_name in device_list:
|
|
entry = device_list.get(device_name, {})
|
|
if isinstance(entry, dict) and isinstance(entry.get('console_set'), str):
|
|
selected_set_name = entry['console_set']
|
|
except Exception:
|
|
pass
|
|
|
|
# Prefer top-level console_set (new format), then mqtt.console_set
|
|
top_console_set = self.config.get('console_set')
|
|
mqtt_console_set = self.config.get('mqtt', {}).get('console_set')
|
|
|
|
def add_named_or_list(cs, set_name):
|
|
if isinstance(cs, dict):
|
|
add_from_console_set(cs.get(set_name) or cs.get('Default') or [])
|
|
elif isinstance(cs, list):
|
|
add_from_console_set(cs)
|
|
|
|
add_named_or_list(top_console_set, selected_set_name)
|
|
add_named_or_list(mqtt_console_set, selected_set_name)
|
|
|
|
# Backward compatibility: legacy dicts (top-level and mqtt.console)
|
|
legacy_console = self.config.get('console', {})
|
|
if isinstance(legacy_console, dict):
|
|
for k, v in legacy_console.items():
|
|
console_params.setdefault(k, v)
|
|
legacy_mqtt_console = self.config.get('mqtt', {}).get('console', {})
|
|
if isinstance(legacy_mqtt_console, dict):
|
|
for k, v in legacy_mqtt_console.items():
|
|
console_params.setdefault(k, v)
|
|
|
|
if not console_params:
|
|
return False
|
|
|
|
self.logger.info(f"{name}: Setting console parameters from configuration")
|
|
|
|
# Special handling for Retain parameters - need to send opposite state first, then final state
|
|
# This is necessary because the changes are what create the update of the Retain state at the MQTT server
|
|
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
|
|
|
|
# Process Retain parameters first
|
|
for param in retain_params:
|
|
if param in console_params:
|
|
try:
|
|
final_value = console_params[param]
|
|
# Pre-check current value; skip if already at desired state
|
|
current_val, ok = self._get_console_param_value(ip, name, param)
|
|
if ok:
|
|
desired_cmp = str(final_value).strip().lower()
|
|
current_cmp = str(current_val or "").strip().lower()
|
|
# Map 1/0 to on/off for retain-like responses
|
|
if current_cmp in ("1", "0") and desired_cmp in ("on", "off"):
|
|
current_cmp = "on" if current_cmp == "1" else "off"
|
|
if current_cmp == desired_cmp:
|
|
self.logger.debug(f"{name}: {param} already {final_value}, skipping retain toggle")
|
|
continue
|
|
# Set opposite state first
|
|
opposite_value = "On" if final_value.lower() == "off" else "Off"
|
|
|
|
if with_retry:
|
|
# First command (opposite state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {opposite_value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# First command (opposite state) - without retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {param} to {opposite_value}")
|
|
|
|
# Small delay to ensure commands are processed in order
|
|
time.sleep(0.5)
|
|
|
|
if with_retry:
|
|
# Second command (final state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
|
|
# Verify the change took effect
|
|
verified = self._verify_console_param_value(ip, name, param, final_value)
|
|
if verified:
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Verification failed for {param} after update; retrying (attempt {attempts}/{max_attempts})")
|
|
if attempts < max_attempts:
|
|
time.sleep(1)
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {final_value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Second command (final state) - without retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {param} to {final_value}")
|
|
except Exception as e:
|
|
self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
|
|
# Track the failure for later reporting if using retry logic
|
|
if with_retry:
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} (both steps)",
|
|
"error": str(e)
|
|
})
|
|
|
|
# Process all other console parameters
|
|
# Track rules that need to be enabled
|
|
rules_to_enable = {}
|
|
|
|
for param, value in console_params.items():
|
|
# Skip Retain parameters as they're handled specially above
|
|
if param in retain_params:
|
|
continue
|
|
|
|
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
# Store the rule number for later enabling
|
|
rule_num = param[-1]
|
|
rules_to_enable[rule_num] = True
|
|
if with_retry:
|
|
self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable")
|
|
else:
|
|
self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable")
|
|
|
|
# Skip Rule1, Rule2, etc. if we're auto-enabling rules and using retry logic
|
|
if with_retry and param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit():
|
|
# If this is in the config, we'll respect it, but log that it's not needed
|
|
self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature")
|
|
|
|
# Determine if update is needed before sending
|
|
should_send = True
|
|
try:
|
|
current_val, ok = self._get_console_param_value(ip, name, param)
|
|
if ok:
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
# Compare rule definitions (whitespace-insensitive, case-insensitive)
|
|
import re as _re
|
|
def _norm_rule(s):
|
|
s = str(s or "")
|
|
s = s.strip().lower()
|
|
s = _re.sub(r"\s+", " ", s)
|
|
return s
|
|
if _norm_rule(current_val) == _norm_rule(value):
|
|
self.logger.debug(f"{name}: {param} rule already matches definition, skipping")
|
|
should_send = False
|
|
else:
|
|
# Generic comparison
|
|
desired_cmp = str(value).strip().lower()
|
|
current_cmp = str(current_val or "").strip().lower()
|
|
# Normalize common ON/OFF vs 1/0 forms
|
|
if current_cmp in ("on", "off") and desired_cmp in ("1", "0"):
|
|
current_cmp = "1" if current_cmp == "on" else "0"
|
|
if current_cmp in ("1", "0") and desired_cmp in ("on", "off"):
|
|
current_cmp = "on" if current_cmp == "1" else "off"
|
|
if current_cmp == desired_cmp:
|
|
self.logger.debug(f"{name}: {param} already set to {value}, skipping")
|
|
should_send = False
|
|
except Exception:
|
|
# If pre-check fails, fall back to sending command
|
|
pass
|
|
|
|
if not should_send:
|
|
continue
|
|
|
|
# Regular console parameter
|
|
# Special handling for rule parameters to properly encode the URL
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
# For rule commands, we need to URL encode the entire value to preserve special characters
|
|
import urllib.parse
|
|
encoded_value = urllib.parse.quote(value)
|
|
url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}"
|
|
self.logger.info(f"{name}: Sending rule command: {url}")
|
|
else:
|
|
url = f"http://{ip}/cm?cmnd={param}%20{value}"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
# Special logging for rule parameters
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
self.logger.info(f"{name}: Rule command response: {response.text}")
|
|
self.logger.info(f"{name}: Set rule {param} to '{value}'")
|
|
else:
|
|
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
|
|
# Verify the change took effect before marking success
|
|
if self._verify_console_param_value(ip, name, param, value):
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Verification failed for {param} after update; retrying (attempt {attempts}/{max_attempts})")
|
|
if attempts < max_attempts:
|
|
time.sleep(1)
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
self.logger.info(f"{name}: Rule command response: {response.text}")
|
|
self.logger.info(f"{name}: Set rule {param} to '{value}'")
|
|
else:
|
|
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set console parameter {param}")
|
|
|
|
# Auto-enable any rules that were defined
|
|
if with_retry:
|
|
self.logger.info(f"{name}: Rules to enable: {rules_to_enable}")
|
|
|
|
for rule_num in rules_to_enable:
|
|
rule_enable_param = f"Rule{rule_num}"
|
|
|
|
# Skip if the rule enable command was already in the config
|
|
if with_retry:
|
|
# Check if the uppercase version (Rule1) is in the config
|
|
if rule_enable_param in console_params:
|
|
self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)")
|
|
continue
|
|
|
|
# If we're here, it means we found a rule definition earlier and added it to rules_to_enable
|
|
# No need to check again if it's in console_params
|
|
self.logger.info(f"{name}: Will enable {rule_enable_param} for rule definition found in config")
|
|
else:
|
|
# Simple check for any version of the rule enable command
|
|
if any(p.lower() == rule_enable_param.lower() for p in console_params):
|
|
continue
|
|
|
|
# Rule auto-enabling
|
|
# Pre-check if rule already enabled
|
|
try:
|
|
current_val, ok = self._get_console_param_value(ip, name, rule_enable_param)
|
|
if ok:
|
|
state = str(current_val or "").strip().lower()
|
|
if state in ("on", "1", "enabled", "active"):
|
|
self.logger.debug(f"{name}: {rule_enable_param} already enabled, skipping")
|
|
continue
|
|
except Exception:
|
|
pass
|
|
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
|
|
if self._verify_console_param_value(ip, name, rule_enable_param, "1"):
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Verification failed for {rule_enable_param} after update; retrying (attempt {attempts}/{max_attempts})")
|
|
if attempts < max_attempts:
|
|
time.sleep(1)
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{rule_enable_param} 1",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}")
|
|
|
|
return console_updated
|
|
|
|
def _verify_console_param_value(self, ip, name, param, expected):
|
|
"""Post-update verification; returns True if device value equals expected.
|
|
Normalizes values:
|
|
- ruleN (lowercase) compare rules text (case/whitespace-insensitive)
|
|
- RuleN (uppercase) consider ON/1/enabled as True when expected is truthy ('1','on')
|
|
- Generic: normalize 'on'/'off' vs '1'/'0'
|
|
"""
|
|
try:
|
|
cur, ok = self._get_console_param_value(ip, name, param)
|
|
if not ok:
|
|
return False
|
|
# Rule definition verify
|
|
if param.lower().startswith('rule') and param.islower() and param[-1].isdigit():
|
|
import re as _re
|
|
def _norm(s):
|
|
s = str(s or '')
|
|
s = s.strip().lower()
|
|
s = _re.sub(r"\s+", " ", s)
|
|
return s
|
|
return _norm(cur) == _norm(expected)
|
|
# Rule enable verify (RuleN)
|
|
if param.lower().startswith('rule') and not param.islower() and param[-1].isdigit():
|
|
val = str(cur or '').strip().lower()
|
|
if val in ('on','enabled','active'):
|
|
val = '1'
|
|
if val in ('off','disabled','inactive'):
|
|
val = '0'
|
|
exp = str(expected or '').strip().lower()
|
|
if exp in ('on','enabled','active'):
|
|
exp = '1'
|
|
if exp in ('off','disabled','inactive'):
|
|
exp = '0'
|
|
return val == exp or val == '1'
|
|
# Generic verify
|
|
val = str(cur or '').strip().lower()
|
|
exp = str(expected or '').strip().lower()
|
|
if val in ('on','off') and exp in ('1','0'):
|
|
val = '1' if val=='on' else '0'
|
|
if val in ('1','0') and exp in ('on','off'):
|
|
val = 'on' if val=='1' else 'off'
|
|
return val == exp
|
|
except Exception:
|
|
return False
|
|
|
|
def _get_console_param_value(self, ip, name, param):
|
|
"""Query the device for the current value of a console parameter.
|
|
Returns (value, True) on success, (None, False) on failure.
|
|
Special handling:
|
|
- Lowercase ruleN: returns the current rule definition text using RuleN 5
|
|
- Uppercase RuleN: returns the current enable state (ON/OFF or 1/0)
|
|
"""
|
|
try:
|
|
# Rules handling
|
|
if param.lower().startswith('rule') and param[-1].isdigit():
|
|
rule_num = param[-1]
|
|
# If lowercase 'ruleN', fetch rule definition using 'RuleN 5'
|
|
if param.islower():
|
|
url = f"http://{ip}/cm?cmnd=Rule{rule_num}%205"
|
|
response = requests.get(url, timeout=5)
|
|
# Try to parse JSON first
|
|
try:
|
|
data = response.json()
|
|
# Common keys: 'Rules' may contain the definition
|
|
if isinstance(data, dict):
|
|
for k, v in data.items():
|
|
if str(k).lower() == 'rules':
|
|
return v, True
|
|
# Fallback: first string value
|
|
for v in data.values():
|
|
if isinstance(v, str):
|
|
return v, True
|
|
return str(data), True
|
|
except Exception:
|
|
# Fallback to text parsing
|
|
text = response.text or ''
|
|
return text, True
|
|
else:
|
|
# Uppercase 'RuleN' - fetch enable state
|
|
url = f"http://{ip}/cm?cmnd=Rule{rule_num}"
|
|
response = requests.get(url, timeout=5)
|
|
try:
|
|
data = response.json()
|
|
# Expect something like {"Rule1":"ON"}
|
|
if isinstance(data, dict):
|
|
# Try exact key first
|
|
key = f"Rule{rule_num}"
|
|
if key in data:
|
|
return data[key], True
|
|
# Fallback: any ON/OFF value
|
|
for v in data.values():
|
|
if isinstance(v, (str, int)):
|
|
return v, True
|
|
return str(data), True
|
|
except Exception:
|
|
text = response.text or ''
|
|
return text, True
|
|
# Generic parameter query - send the command name without a value
|
|
url = f"http://{ip}/cm?cmnd={param}"
|
|
response = requests.get(url, timeout=5)
|
|
try:
|
|
data = response.json()
|
|
if isinstance(data, dict) and data:
|
|
# Prefer an exact key match (case-insensitive)
|
|
param_lower = param.lower()
|
|
for k, v in data.items():
|
|
if str(k).lower() == param_lower:
|
|
return v, True
|
|
# Fallback: first value
|
|
first_val = next(iter(data.values()))
|
|
return first_val, True
|
|
# If not a dict, return as string
|
|
return str(data), True
|
|
except Exception:
|
|
# Fallback to raw text
|
|
return response.text, True
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.debug(f"{name}: Failed to query current value for {param}: {e}")
|
|
return None, False
|
|
|
|
def apply_config_other(self, ip, name):
|
|
"""Wrapper for applying config_other (template) settings."""
|
|
return self.check_and_update_template(ip, name)
|
|
|
|
def configure_unknown_device(self, ip, hostname):
|
|
"""Configure an unknown device with the given hostname and MQTT settings."""
|
|
return self.configure_mqtt_settings(
|
|
ip=ip,
|
|
name=hostname,
|
|
is_new_device=True,
|
|
set_friendly_name=True,
|
|
enable_mqtt=True,
|
|
with_retry=False,
|
|
reboot=True
|
|
)
|
|
|
|
def is_ip_in_network_filter(self, ip_address):
|
|
"""Check if an IP address is in any of the configured network filters.
|
|
|
|
Args:
|
|
ip_address: The IP address to check
|
|
|
|
Returns:
|
|
tuple: (is_in_network, target_network, network_name) where:
|
|
- is_in_network is a boolean indicating if the IP is in a network
|
|
- target_network is the network configuration dict or None
|
|
- network_name is the name of the network or None
|
|
"""
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
|
|
for network_name, network in network_filters.items():
|
|
if ip_address.startswith(network['subnet']):
|
|
self.logger.info(f"IP {ip_address} is in network: {network_name}")
|
|
return True, network, network_name
|
|
|
|
self.logger.error(f"IP {ip_address} is not in any configured network")
|
|
return False, None, None
|
|
|
|
def process_single_device(self, device_identifier):
|
|
"""Process a single device by hostname or IP address.
|
|
|
|
Args:
|
|
device_identifier: Either a hostname or IP address
|
|
|
|
Returns:
|
|
bool: True if device was processed successfully, False otherwise
|
|
"""
|
|
self.logger.info(f"Processing single device: {device_identifier}")
|
|
|
|
# Check if device_identifier is an IP address or hostname
|
|
is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier))
|
|
|
|
# If it's an IP address, check if it's in the network_filter first
|
|
if is_ip:
|
|
in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier)
|
|
if not in_network:
|
|
return False
|
|
|
|
# Setup Unifi client if not already done
|
|
if not self.unifi_client:
|
|
try:
|
|
self.setup_unifi_client()
|
|
except ConnectionError as e:
|
|
self.logger.error(f"Failed to connect to UniFi controller: {str(e)}")
|
|
return False
|
|
|
|
# Get all clients from Unifi
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
return False
|
|
|
|
# Find the device in Unifi
|
|
target_device = None
|
|
if is_ip:
|
|
# Search by IP
|
|
self.logger.debug(f"Searching for device with IP: {device_identifier}")
|
|
target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None)
|
|
if not target_device:
|
|
self.logger.error(f"No device found with IP: {device_identifier}")
|
|
return False
|
|
else:
|
|
# Search by hostname - support partial and wildcard matches
|
|
self.logger.debug(f"Searching for device with hostname: {device_identifier}")
|
|
|
|
# Check if the identifier contains wildcards
|
|
has_wildcards = '*' in device_identifier
|
|
|
|
# Convert wildcards to regex pattern if present
|
|
if has_wildcards:
|
|
pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*')
|
|
self.logger.debug(f"Using wildcard pattern: {pattern}")
|
|
else:
|
|
# For partial matches, we'll use the identifier as a substring
|
|
pattern = device_identifier.lower()
|
|
self.logger.debug(f"Using partial match pattern: {pattern}")
|
|
|
|
# Find all matching devices
|
|
matching_devices = []
|
|
for device in all_clients:
|
|
hostname = device.get('hostname', '').lower()
|
|
name = device.get('name', '').lower()
|
|
|
|
if has_wildcards:
|
|
# For wildcard matches, use regex
|
|
if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)):
|
|
matching_devices.append(device)
|
|
else:
|
|
# For partial matches, check if pattern is a substring
|
|
if pattern in hostname or pattern in name:
|
|
matching_devices.append(device)
|
|
|
|
# Handle the results
|
|
if not matching_devices:
|
|
self.logger.error(f"No devices found matching: {device_identifier}")
|
|
return False
|
|
elif len(matching_devices) > 1:
|
|
# Multiple matches found - log them and use the first one
|
|
self.logger.warning(f"Multiple devices found matching '{device_identifier}':")
|
|
for i, device in enumerate(matching_devices, 1):
|
|
device_name = device.get('name', device.get('hostname', 'Unknown'))
|
|
device_ip = device.get('ip', '')
|
|
self.logger.warning(f" {i}. {device_name} (IP: {device_ip})")
|
|
self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}")
|
|
|
|
# Use the first (or only) matching device
|
|
target_device = matching_devices[0]
|
|
|
|
# Get device details
|
|
device_name = target_device.get('name', target_device.get('hostname', 'Unknown'))
|
|
device_hostname = target_device.get('hostname', '')
|
|
device_ip = target_device.get('ip', '')
|
|
device_mac = target_device.get('mac', '')
|
|
|
|
self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})")
|
|
|
|
# If we're processing a hostname (not an IP), check if the device's IP is in the network_filter
|
|
if not is_ip:
|
|
in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip)
|
|
if not in_network:
|
|
self.logger.error(f"Device {device_name} is not in any configured network")
|
|
return False
|
|
# For IP addresses, we already have the target_network from the earlier check
|
|
|
|
# Check if device is excluded
|
|
exclude_patterns = target_network.get('exclude_patterns', [])
|
|
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='error'):
|
|
return False
|
|
|
|
# Check if device is in unknown_device_patterns
|
|
unknown_patterns = target_network.get('unknown_device_patterns', [])
|
|
is_unknown = False
|
|
device_reported_hostname = None
|
|
|
|
# Initialize variables for hostname bug detection
|
|
unifi_name_matches_unknown = False
|
|
device_hostname_matches_unknown = False
|
|
for pattern in unknown_patterns:
|
|
pattern_lower = pattern.lower()
|
|
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern_regex.startswith('^'):
|
|
regex_pattern = pattern_regex
|
|
else:
|
|
regex_pattern = f"^{pattern_regex}"
|
|
if (re.match(regex_pattern, device_name.lower()) or
|
|
re.match(regex_pattern, device_hostname.lower())):
|
|
unifi_name_matches_unknown = True
|
|
self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}")
|
|
break
|
|
|
|
# If the name matches unknown patterns, check the device's self-reported hostname
|
|
# before declaring it unknown
|
|
#
|
|
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
|
|
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
|
|
# By checking the device's self-reported hostname, we can determine if the device
|
|
# actually has a real hostname that UniFi is not showing correctly.
|
|
if unifi_name_matches_unknown:
|
|
self.logger.info(f"Checking device's self-reported hostname before declaring unknown")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='info')
|
|
|
|
if success and device_reported_hostname:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
# Use the base of the self-reported hostname up to the first '-' for bug detection
|
|
device_hostname_base = device_reported_hostname.split('-')[0].lower()
|
|
for pattern in unknown_patterns:
|
|
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.info(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# Only declare as unknown if both UniFi-reported and self-reported hostnames match unknown patterns
|
|
if device_hostname_matches_unknown:
|
|
is_unknown = True
|
|
self.logger.info("Device declared as unknown: both UniFi-reported and self-reported hostnames match unknown patterns")
|
|
else:
|
|
is_unknown = False
|
|
self.logger.info("Device NOT declared as unknown: self-reported hostname doesn't match unknown patterns (possible UniFi OS bug)")
|
|
else:
|
|
# No self-reported hostname found or error occurred, fall back to UniFi-reported name
|
|
is_unknown = unifi_name_matches_unknown
|
|
self.logger.info("Failed to get device's self-reported hostname, using UniFi-reported name")
|
|
else:
|
|
# Not in Device mode or name doesn't match unknown patterns, use the standard check
|
|
is_unknown = unifi_name_matches_unknown
|
|
|
|
# Determine connection type based on available fields
|
|
connection = "Unknown"
|
|
if target_device.get('essid'):
|
|
connection = f"Wireless - {target_device.get('essid')}"
|
|
elif target_device.get('radio') or target_device.get('wifi'):
|
|
connection = "Wireless"
|
|
elif target_device.get('port') or target_device.get('switch_port') or target_device.get('switch'):
|
|
connection = "Wired"
|
|
|
|
# Print the IP and Connection after verifying the device in UniFi
|
|
print(f"Verified in UniFi: IP: {device_ip} | Connection: {connection}")
|
|
|
|
# Create a device info dictionary
|
|
# Add unifi_hostname_bug_detected flag to indicate when the UniFi OS hostname bug is detected
|
|
# (when UniFi name matches unknown patterns but device's self-reported name doesn't)
|
|
unifi_hostname_bug_detected = (unifi_name_matches_unknown and not is_unknown and
|
|
not device_hostname_matches_unknown)
|
|
|
|
# If bug detected and we have the device-reported hostname, prefer it going forward
|
|
if unifi_hostname_bug_detected and device_reported_hostname:
|
|
self.logger.info(f"Using device self-reported hostname '{device_reported_hostname}' instead of UniFi '{device_name}' for IP {device_ip}")
|
|
device_name = device_reported_hostname
|
|
device_hostname = device_reported_hostname
|
|
|
|
device_info = {
|
|
"name": device_name,
|
|
"ip": device_ip,
|
|
"mac": device_mac,
|
|
"last_seen": target_device.get('last_seen', ''),
|
|
"hostname": device_hostname,
|
|
"notes": target_device.get('note', ''),
|
|
"connection": connection,
|
|
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
|
|
}
|
|
|
|
# Process the device based on whether it's unknown or not
|
|
if is_unknown:
|
|
self.logger.info(f"Processing unknown device: {device_name}")
|
|
|
|
# Check if device has a toggle button
|
|
try:
|
|
# Get the main page to check for toggle button
|
|
url = f"http://{device_ip}/"
|
|
response = requests.get(url, timeout=5)
|
|
|
|
# Check if there's a toggle button in the response
|
|
has_toggle = "toggle" in response.text.lower()
|
|
|
|
if has_toggle:
|
|
self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate")
|
|
|
|
# Start toggling at 1/2Hz
|
|
toggle_state = False
|
|
|
|
# Temporarily disable all logging during toggling
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
try:
|
|
# Clear console output and show prompt
|
|
print("\n" + "="*50)
|
|
print(f"DEVICE: {device_name} at IP: {device_ip} Connection: {connection}")
|
|
print(f"Current hostname: {device_hostname}")
|
|
print("="*50)
|
|
print("The device is now toggling to help you identify it.")
|
|
|
|
# Start toggling in background while waiting for input
|
|
import threading
|
|
stop_toggle = threading.Event()
|
|
|
|
def toggle_device():
|
|
toggle_state = False
|
|
while not stop_toggle.is_set():
|
|
toggle_state = not toggle_state
|
|
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
|
toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}"
|
|
try:
|
|
requests.get(toggle_url, timeout=2)
|
|
except:
|
|
pass
|
|
time.sleep(2.0) # 1/2Hz rate
|
|
|
|
# Start toggle thread
|
|
toggle_thread = threading.Thread(target=toggle_device)
|
|
toggle_thread.daemon = True
|
|
toggle_thread.start()
|
|
|
|
# Prompt for new hostname
|
|
print("\nPlease enter a new name for this device:")
|
|
new_hostname = input("> ").strip()
|
|
|
|
# Stop toggling
|
|
stop_toggle.set()
|
|
toggle_thread.join(timeout=3)
|
|
|
|
if new_hostname and new_hostname != device_hostname:
|
|
print(f"Setting new hostname to: {new_hostname}")
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
return self.configure_unknown_device(device_ip, new_hostname)
|
|
else:
|
|
print("No valid hostname entered, skipping device")
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
return False
|
|
|
|
finally:
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
else:
|
|
self.logger.info(f"Device {device_name} does not have a toggle button")
|
|
return self.configure_unknown_device(device_ip, device_hostname)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}")
|
|
return False
|
|
else:
|
|
self.logger.info(f"Processing normal device: {device_name}")
|
|
# Create a temporary list with just this device
|
|
temp_devices = [device_info]
|
|
|
|
# Save to current.json temporarily
|
|
current_config = {"tasmota": {"devices": temp_devices}}
|
|
with open('current.json', 'w') as f:
|
|
json.dump(current_config, f, indent=2)
|
|
|
|
# Process the device - skip unknown device filtering in Device mode
|
|
self.get_device_details(use_current_json=True, skip_unknown_filter=True)
|
|
return True
|
|
|
|
def get_device_details(self, use_current_json=True, skip_unknown_filter=False):
|
|
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
|
|
Filters out devices matching unknown_device_patterns unless skip_unknown_filter is True.
|
|
|
|
Implements retry logic for console commands with up to 3 attempts and tracks failures.
|
|
|
|
Args:
|
|
use_current_json: Whether to use current.json instead of tasmota.json
|
|
skip_unknown_filter: If True, don't filter out unknown devices (used by --Device mode)
|
|
"""
|
|
self.logger.info("Starting to gather detailed device information...")
|
|
device_details = []
|
|
|
|
try:
|
|
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
|
with open(source_file, 'r') as f:
|
|
data = json.load(f)
|
|
all_devices = data.get('tasmota', {}).get('devices', [])
|
|
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
|
except FileNotFoundError:
|
|
self.logger.error(f"{source_file} not found. Run discovery first.")
|
|
return
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Invalid JSON format in {source_file}")
|
|
return
|
|
|
|
# Determine which devices to process
|
|
if skip_unknown_filter:
|
|
# When using --Device parameter, don't filter out unknown devices
|
|
devices = all_devices
|
|
self.logger.debug("Skipping unknown device filtering (Device mode)")
|
|
else:
|
|
# Normal mode: Filter out devices matching unknown_device_patterns
|
|
devices = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_devices:
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
|
|
is_unknown = False
|
|
for pattern in unknown_patterns:
|
|
pattern = pattern.lower()
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern.startswith('^'):
|
|
regex_pattern = pattern
|
|
else:
|
|
regex_pattern = f"^{pattern}"
|
|
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
|
|
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
|
|
is_unknown = True
|
|
break
|
|
|
|
if not is_unknown:
|
|
devices.append(device)
|
|
|
|
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
|
|
|
|
mqtt_config = self.config.get('mqtt', {})
|
|
if not mqtt_config:
|
|
self.logger.error("MQTT configuration missing from config file")
|
|
return
|
|
|
|
def check_mqtt_settings(ip, name, mqtt_status):
|
|
"""Check and update MQTT settings if they don't match config"""
|
|
# Use the unified MQTT configuration method
|
|
return self.configure_mqtt_settings(
|
|
ip=ip,
|
|
name=name,
|
|
mqtt_status=mqtt_status,
|
|
is_new_device=False,
|
|
set_friendly_name=False,
|
|
enable_mqtt=False,
|
|
with_retry=True,
|
|
reboot=False
|
|
)
|
|
|
|
for device in devices:
|
|
if not isinstance(device, dict):
|
|
self.logger.warning(f"Skipping invalid device entry: {device}")
|
|
continue
|
|
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
mac = device.get('mac')
|
|
|
|
if not ip:
|
|
self.logger.warning(f"Skipping device {name} - no IP address")
|
|
continue
|
|
|
|
self.logger.info(f"Checking device: {name} at {ip}")
|
|
|
|
try:
|
|
# Get Status 2 for firmware version
|
|
url_status = f"http://{ip}/cm?cmnd=Status%202"
|
|
response = requests.get(url_status, timeout=5)
|
|
status_data = response.json()
|
|
|
|
# Get Status 5 for network info using the common function
|
|
hostname, hostname_success = self.get_device_hostname(ip, name, timeout=5, log_level='info')
|
|
|
|
# Create a network_data structure for backward compatibility
|
|
network_data = {"StatusNET": {"Hostname": hostname if hostname_success else "Unknown"}}
|
|
|
|
# Get Status 6 for MQTT info
|
|
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
|
|
response = requests.get(url_mqtt, timeout=5)
|
|
mqtt_data = response.json()
|
|
|
|
# Decide the effective name to use for operations/logs
|
|
op_name = name
|
|
if device.get('unifi_hostname_bug_detected') and hostname_success and hostname:
|
|
op_name = hostname
|
|
self.logger.info(f"Using device self-reported hostname '{op_name}' for operations instead of UniFi name '{name}' (IP: {ip})")
|
|
|
|
# Check and update MQTT settings if needed
|
|
mqtt_updated = check_mqtt_settings(ip, op_name, mqtt_data)
|
|
|
|
# Check and update template (config_other) if needed
|
|
template_updated = self.apply_config_other(ip, op_name)
|
|
|
|
# Console settings are now applied in configure_mqtt_settings
|
|
console_updated = mqtt_updated
|
|
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
|
|
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
|
|
"mqtt_status": "Updated" if mqtt_updated else "Verified",
|
|
"console_status": "Updated" if console_updated else "Verified",
|
|
"template_status": "Updated" if template_updated else "Verified",
|
|
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"status": "online"
|
|
}
|
|
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": "Unknown",
|
|
"status": "offline",
|
|
"error": str(e)
|
|
}
|
|
|
|
device_details.append(device_detail)
|
|
time.sleep(0.5)
|
|
|
|
# Save all device details at once
|
|
try:
|
|
with open('TasmotaDevices.json', 'w') as f:
|
|
json.dump(device_details, f, indent=2)
|
|
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving device details: {e}")
|
|
|
|
# Print summary of command failures if any occurred
|
|
if hasattr(self, 'command_failures') and self.command_failures:
|
|
failure_count = len(self.command_failures)
|
|
print("\n" + "="*80)
|
|
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
|
|
print("="*80)
|
|
|
|
# Group failures by device for better readability
|
|
failures_by_device = {}
|
|
for failure in self.command_failures:
|
|
device_name = failure['device']
|
|
if device_name not in failures_by_device:
|
|
failures_by_device[device_name] = []
|
|
failures_by_device[device_name].append(failure)
|
|
|
|
# Print failures grouped by device
|
|
for device_name, failures in failures_by_device.items():
|
|
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
|
|
print("-" * 40)
|
|
for i, failure in enumerate(failures, 1):
|
|
print(f" {i}. Command: {failure['command']}")
|
|
print(f" Error: {failure['error']}")
|
|
|
|
print("\n" + "="*80)
|
|
|
|
|
|
def generate_unifi_hostname_report(self, timeout: int = 5, save_path: str = 'TasmotaHostnameReport.json', print_report: bool = True):
|
|
"""Generate a report comparing UniFi-reported hostnames with Tasmota device hostnames.
|
|
Filters devices using network_filter (via is_tasmota_device). For each device, queries
|
|
the device to retrieve its self-reported hostname and compares it to the UniFi-reported
|
|
hostname or name. Saves a JSON report and optionally prints a human-readable summary.
|
|
"""
|
|
if not self.unifi_client:
|
|
# Attempt to set up UniFi client (requires self.config already loaded)
|
|
try:
|
|
self.setup_unifi_client()
|
|
except Exception as e:
|
|
self.logger.error(f"Cannot set up UniFi client: {e}")
|
|
return []
|
|
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to retrieve clients from UniFi: {e}")
|
|
return []
|
|
|
|
# Build AP name maps to resolve human-friendly AP names
|
|
ap_mac_to_name = {}
|
|
bssid_to_name = {}
|
|
try:
|
|
ap_devices = self.unifi_client.get_devices()
|
|
for ap in ap_devices or []:
|
|
ap_name = ap.get('name') or ap.get('device_name') or ap.get('hostname') or ''
|
|
mac = (ap.get('mac') or '').lower()
|
|
if mac and ap_name:
|
|
ap_mac_to_name[mac] = ap_name
|
|
# Map BSSIDs from vap_table and radio_table to the AP name
|
|
vap_table = ap.get('vap_table') or []
|
|
if isinstance(vap_table, dict):
|
|
vap_table = [vap_table]
|
|
for vap in vap_table:
|
|
b = (vap.get('bssid') or '').lower()
|
|
if b and ap_name:
|
|
bssid_to_name[b] = ap_name
|
|
radio_table = ap.get('radio_table') or []
|
|
if isinstance(radio_table, dict):
|
|
radio_table = [radio_table]
|
|
for r in radio_table:
|
|
b = (r.get('bssid') or '').lower()
|
|
if b and ap_name:
|
|
bssid_to_name[b] = ap_name
|
|
except Exception as e:
|
|
self.logger.debug(f"Unable to fetch UniFi devices for AP mapping: {e}")
|
|
|
|
report_entries = []
|
|
mismatches = []
|
|
total_considered = 0
|
|
|
|
for device in all_clients:
|
|
try:
|
|
if not self.is_tasmota_device(device):
|
|
continue
|
|
total_considered += 1
|
|
|
|
ip = device.get('ip', '')
|
|
mac = device.get('mac', '')
|
|
unifi_name = device.get('name') or device.get('hostname') or ''
|
|
unifi_hostname = device.get('hostname', '') or ''
|
|
|
|
# Resolve Connected AP name using UniFi device map; SSID is not needed for output
|
|
ssid = device.get('essid') or device.get('ssid') or ''
|
|
raw_ap_name = device.get('ap_name') or device.get('ap') or ''
|
|
bssid = (device.get('bssid') or '').lower()
|
|
ap_mac = (device.get('ap_mac') or '').lower()
|
|
resolved_ap = ''
|
|
# Prefer mapping by ap_mac, then by bssid, then raw_ap_name, otherwise Unknown
|
|
if ap_mac and ap_mac in ap_mac_to_name:
|
|
resolved_ap = ap_mac_to_name.get(ap_mac, '')
|
|
elif bssid and bssid in bssid_to_name:
|
|
resolved_ap = bssid_to_name.get(bssid, '')
|
|
elif raw_ap_name:
|
|
resolved_ap = raw_ap_name
|
|
else:
|
|
resolved_ap = 'Unknown'
|
|
|
|
device_hostname = ''
|
|
success = False
|
|
if ip:
|
|
try:
|
|
device_hostname, success = self.get_device_hostname(ip, unifi_name, timeout=timeout, log_level='info')
|
|
except Exception as e:
|
|
self.logger.debug(f"Error retrieving device hostname for {unifi_name} at {ip}: {e}")
|
|
|
|
# Compare against UniFi's hostname if present; otherwise, fall back to name
|
|
unifi_compare = (unifi_hostname or unifi_name or '').strip()
|
|
match = bool(success) and device_hostname.strip().lower() == unifi_compare.lower()
|
|
|
|
entry = {
|
|
'ip': ip,
|
|
'mac': mac,
|
|
'unifi_name': unifi_name,
|
|
'unifi_hostname': unifi_hostname,
|
|
'device_hostname': device_hostname,
|
|
'match': match,
|
|
'ssid': ssid,
|
|
'ap': resolved_ap
|
|
}
|
|
report_entries.append(entry)
|
|
if success and not match:
|
|
mismatches.append(entry)
|
|
except Exception as e:
|
|
self.logger.debug(f"Skipping device due to error: {e}")
|
|
continue
|
|
|
|
# Save JSON report
|
|
try:
|
|
summary = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'total_tasmota_devices': total_considered,
|
|
'mismatch_count': len(mismatches),
|
|
'devices': report_entries,
|
|
'mismatches': mismatches
|
|
}
|
|
with open(save_path, 'w') as f:
|
|
json.dump(summary, f, indent=2)
|
|
self.logger.info(f"Hostname report saved to {save_path}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to save hostname report: {e}")
|
|
|
|
if print_report:
|
|
sep = " | " # 3-char separator
|
|
def pad(text, width):
|
|
s = str(text) if text is not None else ""
|
|
if len(s) > width:
|
|
return s[:width]
|
|
return s.ljust(width)
|
|
def row(c1, c2, c3, c4):
|
|
return f"{pad(c1,20)}{sep}{pad(c2,20)}{sep}{pad(c3,15)}{sep}{pad(c4,20)}"
|
|
header_sep = ("-"*20) + " + " + ("-"*20) + " + " + ("-"*15) + " + " + ("-"*20)
|
|
print("\n" + "="*80)
|
|
print("Tasmota UniFi Hostname Report")
|
|
print("="*80)
|
|
print(f"Total Tasmota devices considered: {total_considered}")
|
|
print(f"Hostname mismatches: {len([d for d in report_entries if not d.get('match')])}")
|
|
matched = [d for d in report_entries if d.get('match')]
|
|
unknown = [d for d in report_entries if not d.get('match')]
|
|
def conn_value(d):
|
|
ap = (d.get('ap') or '').strip()
|
|
if not ap or ap.lower() == 'unknown':
|
|
return 'Unknown'
|
|
import re as _re
|
|
m = _re.match(r'^\s*ap\s*-\s*(.*)$', ap, flags=_re.IGNORECASE)
|
|
if m:
|
|
remainder = m.group(1).strip()
|
|
return f"AP - {remainder}" if remainder else 'AP -'
|
|
return f"AP - {ap}"
|
|
# Matched section
|
|
print("\nMatched devices:")
|
|
print(row("Hostname", "Device Hostname", "IP", "Conncted"))
|
|
print(header_sep)
|
|
for d in matched:
|
|
uni = d.get('unifi_hostname') or d.get('unifi_name') or ''
|
|
dev = d.get('device_hostname') or 'Unknown'
|
|
ip = d.get('ip') or ''
|
|
conn = conn_value(d)
|
|
print(row(uni, dev, ip, conn))
|
|
# Unknown/Mismatched section
|
|
print("\nUnknown/Mismatched devices:")
|
|
print(row("Hostname", "Device Hostname", "IP", "Conncted"))
|
|
print(header_sep)
|
|
for d in unknown:
|
|
uni = d.get('unifi_hostname') or d.get('unifi_name') or ''
|
|
dev = d.get('device_hostname') or 'Unknown'
|
|
ip = d.get('ip') or ''
|
|
conn = conn_value(d)
|
|
print(row(uni, dev, ip, conn))
|
|
print("="*80 + "\n")
|
|
|
|
return report_entries
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
|
parser.add_argument('--config', default='network_configuration.json',
|
|
help='Path to configuration file')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug logging')
|
|
parser.add_argument('--skip-unifi', action='store_true',
|
|
help='Skip UniFi discovery and use existing current.json')
|
|
parser.add_argument('--process-unknown', action='store_true',
|
|
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
|
|
parser.add_argument('--unifi-hostname-report', action='store_true',
|
|
help='Generate a report comparing UniFi and Tasmota device hostnames')
|
|
parser.add_argument('--Device',
|
|
help='Process a single device by hostname or IP address')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set up logging
|
|
log_level = logging.DEBUG if args.debug else logging.INFO
|
|
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if args.debug else '%(asctime)s - %(levelname)s - %(message)s'
|
|
logging.basicConfig(level=log_level,
|
|
format=log_format,
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
print("Starting Tasmota Device Discovery and Version Check...")
|
|
|
|
# Create TasmotaDiscovery instance
|
|
discovery = TasmotaDiscovery(debug=args.debug)
|
|
discovery.load_config(args.config)
|
|
|
|
try:
|
|
# Generate UniFi/Tasmota hostname report if requested
|
|
if args.unifi_hostname_report:
|
|
print("Generating UniFi/Tasmota hostname report...")
|
|
discovery.generate_unifi_hostname_report()
|
|
return 0
|
|
|
|
# Process a single device if --Device parameter is provided
|
|
if args.Device:
|
|
print(f"Processing single device: {args.Device}")
|
|
# Let process_single_device handle the UniFi client setup as needed
|
|
success = discovery.process_single_device(args.Device)
|
|
if success:
|
|
print(f"\nDevice {args.Device} processed successfully!")
|
|
print("- Detailed information saved to: TasmotaDevices.json")
|
|
else:
|
|
print(f"\nFailed to process device: {args.Device}")
|
|
return 1
|
|
else:
|
|
# Normal processing flow
|
|
if not args.skip_unifi:
|
|
print("Step 1: Discovering Tasmota devices...")
|
|
discovery.setup_unifi_client()
|
|
tasmota_devices = discovery.get_tasmota_devices()
|
|
discovery.save_tasmota_config(tasmota_devices)
|
|
else:
|
|
print("Skipping UniFi discovery, using existing current.json...")
|
|
|
|
if args.process_unknown:
|
|
print("\nStep 2: Processing unknown devices...")
|
|
discovery.process_unknown_devices()
|
|
else:
|
|
print("\nStep 2: Getting detailed version information...")
|
|
discovery.get_device_details(use_current_json=True)
|
|
|
|
print("\nProcess completed successfully!")
|
|
print("- Device list saved to: current.json")
|
|
print("- Detailed information saved to: TasmotaDevices.json")
|
|
|
|
except ConnectionError as e:
|
|
print(f"Connection Error: {str(e)}")
|
|
print("\nTrying to proceed with existing current.json...")
|
|
try:
|
|
discovery.get_device_details(use_current_json=True)
|
|
print("\nSuccessfully retrieved device details from existing current.json")
|
|
except Exception as inner_e:
|
|
print(f"Error processing existing devices: {str(inner_e)}")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
main() |