TasmotaManager/TasmotaManager.py
2025-08-17 16:47:03 -05:00

2779 lines
142 KiB
Python
Executable File

import json
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import re # Import the regular expression module
import time
import argparse
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class UnifiClient:
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.site_id = site_id
self.session = requests.Session()
self.session.verify = verify_ssl
# Initialize cookie jar
self.session.cookies.clear()
def _login(self) -> requests.Response: # Changed return type annotation
"""Authenticate with the UniFi Controller."""
login_url = f"{self.base_url}/api/auth/login"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
payload = {
"username": self.username,
"password": self.password,
"remember": False
}
try:
response = self.session.post(
login_url,
json=payload,
headers=headers
)
response.raise_for_status()
if 'X-CSRF-Token' in response.headers:
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
return response # Return the response object
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response.status_code == 401:
raise Exception("Authentication failed. Please verify your username and password.") from e
raise
def get_clients(self) -> list:
"""Get all clients from the UniFi Controller."""
# Try the newer API endpoint first
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If the newer endpoint fails, try the legacy endpoint
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If both fail, try the v2 API endpoint
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
def get_devices(self) -> list:
"""Get UniFi network devices (e.g., Access Points) from the Controller."""
# Try the newer API endpoint first
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/device"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException:
# Try legacy endpoint
url = f"{self.base_url}/api/s/{self.site_id}/stat/device"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException:
return []
class TasmotaDiscovery:
def __init__(self, debug: bool = False):
"""Initialize the TasmotaDiscovery with optional debug mode."""
log_level = logging.DEBUG if debug else logging.INFO
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if debug else '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(
level=log_level,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
# Redirect info logs to debug so all 'info' statements behave as debug-level
try:
self.logger.info = self.logger.debug
except Exception:
pass
self.config = None
self.unifi_client = None
def load_config(self, config_path: Optional[str] = None) -> dict:
"""Load configuration from JSON file."""
if config_path is None:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
self.logger.debug(f"Loading configuration from: {config_path}")
try:
with open(config_path, 'r') as config_file:
self.config = json.load(config_file)
self.logger.debug("Configuration loaded successfully from %s", config_path)
return self.config
except FileNotFoundError:
self.logger.error(f"Configuration file not found at {config_path}")
sys.exit(1)
except json.JSONDecodeError:
self.logger.error("Invalid JSON in configuration file")
sys.exit(1)
def setup_unifi_client(self):
"""Set up the UniFi client with better error handling"""
self.logger.debug("Setting up UniFi client")
if not self.config or 'unifi' not in self.config:
raise ValueError("Missing UniFi configuration")
unifi_config = self.config['unifi']
required_fields = ['host', 'username', 'password', 'site']
missing_fields = [field for field in required_fields if field not in unifi_config]
if missing_fields:
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
try:
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
self.unifi_client = UnifiClient(
base_url=unifi_config['host'],
username=unifi_config['username'],
password=unifi_config['password'],
site_id=unifi_config['site'],
verify_ssl=False # Add this if using self-signed certificates
)
# Test the connection by making a simple request
response = self.unifi_client._login()
if not response:
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
self.logger.debug("UniFi client setup successful")
except Exception as e:
self.logger.error(f"Error setting up UniFi client: {str(e)}")
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
def is_tasmota_device(self, device: dict) -> bool:
"""Determine if a device is in the network_filter and not in exclude_patterns.
The function checks:
1. If the device's IP address starts with any subnet in network_filter
2. If the device's name or hostname does NOT match any exclude_patterns
Both checks use case-insensitive matching, and glob patterns (with *) in
exclude_patterns are converted to regex patterns for matching.
"""
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
ip = device.get('ip', '')
# Check if device is in the configured network
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
if ip.startswith(network['subnet']):
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
# Check if device should be excluded based on exclude_patterns
exclude_patterns = network.get('exclude_patterns', [])
if self.is_device_excluded(name, hostname, exclude_patterns, log_level='debug'):
return False
# Device is in the network and not excluded
self.logger.debug(f"Found device in network: {name}")
return True
return False
def _match_pattern(self, text_or_texts, pattern: str, use_complex_matching: bool = False, match_entire_string: bool = False, log_level: str = 'debug') -> bool:
"""Common function to match a string or multiple strings against a pattern.
This function handles the regex pattern matching logic for both is_hostname_unknown
and is_device_excluded functions. It supports both simple prefix matching and more
complex matching for patterns that should match anywhere in the string.
Args:
text_or_texts: The string or list of strings to match against the pattern.
If a list is provided, the function returns True if any string matches.
pattern: The pattern to match against
use_complex_matching: Whether to use the more complex matching logic for patterns
starting with ^.* (default: False)
match_entire_string: Whether to match the entire string by adding $ at the end
of the regex pattern (default: False)
log_level: The logging level to use (default: 'debug')
Returns:
bool: True if any of the provided texts match the pattern, False otherwise
"""
# Convert pattern to lowercase for case-insensitive matching
pattern_lower = pattern.lower()
# Set up logging based on the specified level
log_func = getattr(self.logger, log_level)
# Handle single string or list of strings
if isinstance(text_or_texts, str):
texts = [text_or_texts] if text_or_texts else []
else:
texts = [t for t in text_or_texts if t]
# If no valid texts to match, return False
if not texts:
return False
# Special case for patterns like ^.*something.* which should match anywhere in the string
if use_complex_matching and pattern_lower.startswith('^.*'):
# Extract the part after ^.* to use with re.search
search_part = pattern_lower[3:] # Remove the ^.* prefix
# For patterns that should match anywhere, we need to handle wildcards differently
# We want "sonos.*" to match "sonos", "mysonos", "sonosdevice", etc.
# We also want "sonos" to match exactly, without requiring characters after it
# First, handle the case where the search part ends with .*
if search_part.endswith('.*'):
# Remove the .* at the end and make it optional
base_part = search_part[:-2] # Remove the .* at the end
search_regex = base_part
else:
# For other patterns, just use the search part as is
search_regex = search_part
# Replace any remaining * with .* but not escape the dots
search_regex = search_regex.replace('*', '.*')
# Check each text
for text in texts:
# Use re.search to find the pattern anywhere in the string
if re.search(search_regex, text):
return True
else:
# Standard pattern matching (prefix matching)
# Convert glob pattern to regex pattern
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
# Check if pattern already starts with ^
if pattern_regex.startswith('^'):
regex_pattern = pattern_regex
else:
regex_pattern = f"^{pattern_regex}"
# Add $ at the end if matching entire string
if match_entire_string:
regex_pattern = f"{regex_pattern}$"
# Check each text
for text in texts:
# Use re.match to check if the string starts with the pattern
if re.match(regex_pattern, text):
return True
return False
def get_device_hostname(self, ip: str, device_name: str = None, timeout: int = 5, log_level: str = 'debug') -> tuple:
"""Retrieve the hostname from a Tasmota device.
This function makes an HTTP request to a Tasmota device to retrieve its self-reported
hostname using the Status 5 command. It handles error conditions and provides
consistent logging.
Args:
ip: The IP address of the device
device_name: Optional name of the device for logging purposes
timeout: Timeout for the HTTP request in seconds (default: 5)
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
Returns:
tuple: (hostname, success)
- hostname: The device's self-reported hostname, or empty string if not found
- success: Boolean indicating whether the hostname was successfully retrieved
Examples:
# Basic usage
hostname, success = manager.get_device_hostname("192.168.1.100")
if success:
print(f"Device hostname: {hostname}")
# With device name for better logging
hostname, success = manager.get_device_hostname("192.168.1.100", "Living Room Light")
# With custom timeout and log level
hostname, success = manager.get_device_hostname("192.168.1.100", timeout=10, log_level='info')
"""
# Set up logging based on the specified level
log_func = getattr(self.logger, log_level)
# Use device_name in logs if provided, otherwise use IP
device_id = device_name if device_name else ip
hostname = ""
success = False
try:
# Log attempt to retrieve hostname
log_func(f"Retrieving hostname for {device_id} at {ip}")
# Make HTTP request to the device
url = f"http://{ip}/cm?cmnd=Status%205"
response = requests.get(url, timeout=timeout)
# Check if response is successful
if response.status_code == 200:
try:
# Parse JSON response
status_data = response.json()
# Extract hostname from response
hostname = status_data.get('StatusNET', {}).get('Hostname', '')
if hostname:
log_func(f"Successfully retrieved hostname for {device_id}: {hostname}")
success = True
else:
log_func(f"No hostname found in response for {device_id}")
except ValueError:
log_func(f"Failed to parse JSON response from {device_id}")
else:
log_func(f"Failed to get hostname for {device_id}: HTTP {response.status_code}")
except requests.exceptions.RequestException as e:
log_func(f"Error retrieving hostname for {device_id}: {str(e)}")
return hostname, success
def is_hostname_unknown(self, hostname: str, patterns: list = None, from_unifi_os: bool = False, ip: str = None) -> bool:
"""Check if a hostname matches any pattern in unknown_device_patterns.
This function provides a centralized way to check if a hostname matches any of the
unknown_device_patterns defined in the configuration. It uses case-insensitive
matching and supports glob patterns (with *) in the patterns list.
Args:
hostname: The hostname to check against unknown_device_patterns
patterns: Optional list of patterns to check against. If not provided,
patterns will be loaded from the configuration.
from_unifi_os: Whether the hostname is from Unifi OS (handles Unifi Hostname bug)
ip: IP address of the device. If provided, hostname validation is skipped.
Returns:
bool: True if the hostname matches any pattern, False otherwise
Examples:
# Check if a hostname matches any unknown_device_patterns in the config
if manager.is_hostname_unknown("tasmota_device123"):
print("This is an unknown device")
# Check against a specific list of patterns
custom_patterns = ["esp-*", "tasmota_*"]
if manager.is_hostname_unknown("esp-abcd", custom_patterns):
print("This matches a custom pattern")
# Check with Unifi Hostname bug handling
if manager.is_hostname_unknown("tasmota_device123", from_unifi_os=True):
print("This is an unknown device from Unifi OS")
# Skip hostname validation when IP is provided
if manager.is_hostname_unknown("", ip="192.168.1.100"):
print("This is an unknown device with IP")
"""
# If IP is provided and from_unifi_os is False, we can skip hostname validation
if ip and not from_unifi_os:
self.logger.debug(f"IP provided ({ip}) and from_unifi_os is False, skipping hostname validation")
return True
# If no patterns provided, get them from the configuration
if patterns is None:
patterns = []
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
patterns.extend(network.get('unknown_device_patterns', []))
# Convert hostname to lowercase for case-insensitive matching
hostname_lower = hostname.lower()
# Handle Unifi Hostname bug if hostname is from Unifi OS
if from_unifi_os and ip:
self.logger.debug(f"Handling hostname '{hostname}' from Unifi OS (bug handling enabled)")
# Get the device's self-reported hostname using the common function
try:
device_reported_hostname, success = self.get_device_hostname(ip, hostname, timeout=5, log_level='debug')
except Exception as e:
self.logger.debug(f"Failed to retrieve self-reported hostname for {hostname} at {ip}: {e}")
device_reported_hostname, success = "", False
if success:
# Check if the self-reported hostname also matches unknown patterns
device_hostname_matches_unknown = False
# Use the base of the self-reported hostname up to the first '-' for bug detection
device_hostname_base = device_reported_hostname.split('-')[0].lower()
for pattern in patterns:
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
device_hostname_matches_unknown = True
self.logger.debug(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
break
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
# this indicates the UniFi OS hostname bug
if not device_hostname_matches_unknown:
# First check if the UniFi-reported hostname matches unknown patterns
unifi_hostname_matches_unknown = False
for pattern in patterns:
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
unifi_hostname_matches_unknown = True
break
if unifi_hostname_matches_unknown:
self.logger.info(f"UniFi OS hostname bug detected for {hostname}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
return False # Not an unknown device despite what UniFi reports
elif from_unifi_os:
self.logger.debug(f"Cannot check device's self-reported hostname for {hostname}: No IP address provided")
# Check if hostname matches any pattern
for pattern in patterns:
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
self.logger.debug(f"Hostname '{hostname}' matches unknown device pattern: {pattern}")
return True
return False
def is_device_excluded(self, device_name: str, hostname: str = '', patterns: list = None, log_level: str = 'debug') -> bool:
"""Check if a device name or hostname matches any pattern in exclude_patterns.
This function provides a centralized way to check if a device should be excluded
based on its name or hostname matching any of the exclude_patterns defined in the
configuration. It uses case-insensitive matching and supports glob patterns (with *)
in the patterns list.
Args:
device_name: The device name to check against exclude_patterns
hostname: The device hostname to check against exclude_patterns (optional)
patterns: Optional list of patterns to check against. If not provided,
patterns will be loaded from the configuration.
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
Returns:
bool: True if the device should be excluded (name or hostname matches any pattern),
False otherwise
Examples:
# Check if a device should be excluded based on patterns in the config
if manager.is_device_excluded("homeassistant", "homeassistant.local"):
print("This device should be excluded")
# Check against a specific list of patterns
custom_patterns = ["^homeassistant*", "^.*sonos.*"]
if manager.is_device_excluded("sonos-speaker", "sonos.local", custom_patterns, log_level='info'):
print("This device matches a custom exclude pattern")
"""
# If no patterns provided, get them from the configuration
if patterns is None:
patterns = []
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
patterns.extend(network.get('exclude_patterns', []))
# Convert device_name and hostname to lowercase for case-insensitive matching
name = device_name.lower() if device_name else ''
hostname_lower = hostname.lower() if hostname else ''
# Set up logging based on the specified level
log_func = getattr(self.logger, log_level)
# Create a list of texts to check (device name and hostname)
texts = [name, hostname_lower]
# Check if device name or hostname matches any pattern
for pattern in patterns:
pattern_lower = pattern.lower()
# Special case for patterns like ^.*something.* which should match anywhere in the string
if pattern_lower.startswith('^.*'):
# Use complex matching for patterns starting with ^.*
if self._match_pattern(texts, pattern, use_complex_matching=True, log_level=log_level):
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
return True
continue
# For normal patterns, match the entire string
if self._match_pattern(texts, pattern, match_entire_string=True, log_level=log_level):
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
return True
return False
def get_tasmota_devices(self) -> list:
"""Query UniFi controller and filter Tasmota devices.
This function:
1. Retrieves all clients from the UniFi controller
2. Filters them using is_tasmota_device() to find devices in the network_filter
that are not in exclude_patterns
3. Determines each device's connection type (Wireless/Wired)
4. Checks for the UniFi OS hostname bug whenever a device matches unknown_device_patterns
5. Creates a standardized device_info dictionary for each device
Returns:
list: A list of dictionaries containing device information with fields:
- name: Device name or hostname
- ip: IP address
- mac: MAC address
- last_seen: When the device was last seen by UniFi
- hostname: Device hostname
- notes: Any notes from UniFi
- connection: Connection type (Wireless/Wired)
- unifi_hostname_bug_detected: Flag indicating if the UniFi OS hostname bug was detected
If an error occurs during the process, an empty list is returned.
"""
devices = []
self.logger.debug("Querying UniFi controller for devices")
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
# Get unknown device patterns for hostname bug detection
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_clients:
if self.is_tasmota_device(device):
# Determine connection type based on available fields
connection = "Unknown"
if device.get('essid'):
connection = f"Wireless - {device.get('essid')}"
elif device.get('radio') or device.get('wifi'):
connection = "Wireless"
elif device.get('port') or device.get('switch_port') or device.get('switch'):
connection = "Wired"
# Get device details
device_name = device.get('name', device.get('hostname', 'Unknown'))
device_hostname = device.get('hostname', '')
device_ip = device.get('ip', '')
# Initialize the UniFi hostname bug flag
unifi_hostname_bug_detected = False
device_reported_hostname = None
# Check if device name or hostname matches unknown patterns
unifi_name_matches_unknown = (
self.is_hostname_unknown(device_name, unknown_patterns) or
self.is_hostname_unknown(device_hostname, unknown_patterns)
)
if unifi_name_matches_unknown:
self.logger.debug(f"Device {device_name} matches unknown device pattern")
# If the name matches unknown patterns, check the device's self-reported hostname
# before declaring it unknown
#
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
if unifi_name_matches_unknown and device_ip:
self.logger.debug(f"Checking device's self-reported hostname for {device_name}")
# Get the device's self-reported hostname using the common function
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='debug')
if success:
# Check if the self-reported hostname also matches unknown patterns
device_hostname_matches_unknown = False
# Use the base of the self-reported hostname up to the first '-' for bug detection
device_hostname_base = device_reported_hostname.split('-')[0].lower()
for pattern in unknown_patterns:
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
device_hostname_matches_unknown = True
self.logger.debug(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
break
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
# this indicates the UniFi OS hostname bug
if not device_hostname_matches_unknown:
unifi_hostname_bug_detected = True
self.logger.info(f"UniFi OS hostname bug detected for {device_name}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
# If bug detected and we have the device-reported hostname, log it but keep UniFi fields for listing
if unifi_hostname_bug_detected and device_reported_hostname:
self.logger.info(f"UniFi bug detected: will prefer self-reported hostname '{device_reported_hostname}' during processing; keeping UniFi listing name '{device_name}'")
device_info = {
"name": device_name,
"ip": device_ip,
"mac": device.get('mac', ''),
"last_seen": device.get('last_seen', ''),
"hostname": device_hostname,
"notes": device.get('note', ''),
"connection": connection,
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
}
devices.append(device_info)
self.logger.debug(f"Found {len(devices)} Tasmota devices")
return devices
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return []
def save_tasmota_config(self, devices: list) -> None:
"""Save Tasmota device information to a JSON file with device tracking."""
filename = "current.json"
self.logger.debug(f"Saving Tasmota configuration to {filename}")
deprecated_filename = "deprecated.json"
current_devices = []
deprecated_devices = []
# Load existing devices if file exists
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
existing_config = json.load(f)
current_devices = existing_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {filename}, treating as empty")
current_devices = []
# Load deprecated devices if file exists
if os.path.exists(deprecated_filename):
try:
with open(deprecated_filename, 'r') as f:
deprecated_config = json.load(f)
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
deprecated_devices = []
# Create new config
new_devices = []
moved_to_deprecated = []
restored_from_deprecated = []
removed_from_deprecated = []
excluded_devices = []
# Get exclude_patterns for checking excluded devices
network_filters = self.config['unifi'].get('network_filter', {})
exclude_patterns = []
for network in network_filters.values():
exclude_patterns.extend(network.get('exclude_patterns', []))
# Process current devices
for device in devices:
device_name = device['name']
device_hostname = device.get('hostname', '')
device_ip = device['ip']
device_mac = device['mac']
# Check if device should be excluded
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='info'):
print(f"Device {device_name} excluded by pattern - skipping")
excluded_devices.append(device_name)
continue
# Check in current devices
existing_device = next((d for d in current_devices
if d['name'] == device_name), None)
if existing_device:
# Device exists, check if IP or MAC changed
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
moved_to_deprecated.append(existing_device)
new_devices.append(device)
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
else:
new_devices.append(existing_device) # Keep existing device
else:
# New device, check if it was in deprecated
deprecated_device = next((d for d in deprecated_devices
if d['name'] == device_name), None)
if deprecated_device:
removed_from_deprecated.append(device_name)
print(f"Device {device_name} removed from deprecated (restored)")
new_devices.append(device)
print(f"Device {device_name} added to output file")
# Find devices that are no longer present
current_names = {d['name'] for d in devices}
for existing_device in current_devices:
if existing_device['name'] not in current_names:
if not self.is_device_excluded(existing_device['name'], existing_device.get('hostname', ''), exclude_patterns, log_level='info'):
moved_to_deprecated.append(existing_device)
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
# Update deprecated devices list, excluding any excluded devices
final_deprecated = []
for device in deprecated_devices:
if device['name'] not in removed_from_deprecated and not self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
final_deprecated.append(device)
elif self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
final_deprecated.extend(moved_to_deprecated)
# Save new configuration
config = {
"tasmota": {
"devices": new_devices,
"generated_at": datetime.now().isoformat(),
"total_devices": len(new_devices)
}
}
# Save deprecated configuration
deprecated_config = {
"tasmota": {
"devices": final_deprecated,
"generated_at": datetime.now().isoformat(),
"total_devices": len(final_deprecated)
}
}
# Backup existing file if it exists
if os.path.exists(filename):
try:
backup_name = f"{filename}.backup"
os.rename(filename, backup_name)
self.logger.info(f"Created backup of existing configuration as {backup_name}")
except Exception as e:
self.logger.error(f"Error creating backup: {e}")
# Save files
try:
with open(filename, 'w') as f:
json.dump(config, f, indent=4)
with open(deprecated_filename, 'w') as f:
json.dump(deprecated_config, f, indent=4)
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
print("\nDevice Status Summary:")
if excluded_devices:
print("\nExcluded Devices:")
for name in excluded_devices:
print(f"- {name}")
if moved_to_deprecated:
print("\nMoved to deprecated:")
for device in moved_to_deprecated:
print(f"- {device['name']}")
if removed_from_deprecated:
print("\nRestored from deprecated:")
for name in removed_from_deprecated:
print(f"- {name}")
print("\nCurrent Tasmota Devices:")
for device in new_devices:
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def get_unknown_devices(self, use_current_json=True):
"""Identify devices that match unknown_device_patterns from current.json."""
self.logger.info("Identifying unknown devices for processing...")
unknown_devices = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return []
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return []
# Identify devices matching unknown_device_patterns
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
# Check if pattern already starts with ^
if pattern.startswith('^'):
regex_pattern = pattern
else:
regex_pattern = f"^{pattern}"
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
self.logger.debug(f"Found unknown device: {name} ({hostname})")
unknown_devices.append(device)
break
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
return unknown_devices
def process_unknown_devices(self):
"""Process unknown devices by checking for toggle button and configuring them.
This method:
1. Identifies devices matching unknown_device_patterns
2. Checks if each device has a toggle button (indicating it's a light/switch)
3. Toggles the button at 1/2Hz while checking for hostname changes
4. Prompts the user to enter a new name for the device in the console
5. Once a name is entered, configures the device with the new hostname
"""
unknown_devices = self.get_unknown_devices()
if not unknown_devices:
self.logger.info("No unknown devices found to process")
return
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
for device in unknown_devices:
name = device.get('name', 'Unknown')
ip = device.get('ip')
connection = device.get('connection', 'Unknown')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Processing unknown device: {name} at {ip} with connection {connection}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
# Start toggling at 1/2Hz
original_hostname = device.get('hostname', '')
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {name} at IP: {ip} Connection: {connection}")
print(f"Current hostname: {original_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
print("(Enter nothing, 'unknown', or 'na' to assume device could not be found and end)")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
# Check for special inputs that indicate device could not be found
if not new_hostname or new_hostname.lower() == "unknown" or new_hostname.lower() == "na":
print("Assuming device could not be found, ending process")
return # End the entire process
if new_hostname != original_hostname:
print(f"Setting new hostname to: {new_hostname}")
else:
print("No valid hostname entered, skipping device")
new_hostname = ""
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
# If a new hostname was entered, configure the device
if new_hostname:
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
self.configure_unknown_device(ip, new_hostname)
else:
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
else:
self.logger.info(f"Device {name} does not have a toggle button, skipping")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
def check_and_update_template(self, ip, name):
"""Check and update device template based on config_other settings.
Algorithm:
1. Get the device name from the Configuration/Other page using Status 0
2. Get the current template using Template command
3. Check if any key in config_other matches the device name
4. If a match is found, check if the template matches the value
5. If the template doesn't match, write the value to the template
6. If no key matches, check if any value matches the template
7. If a value match is found, write the key to the device name
Args:
ip: The IP address of the device
name: The name/hostname of the device
Returns:
bool: True if template was updated, False otherwise
"""
try:
# Get config_other settings
config_other = self.config.get('config_other', {})
if not config_other:
self.logger.debug(f"{name}: No config_other settings found in configuration")
return False
# Get Status 0 for device name from Configuration/Other page with increased timeout
url_status0 = f"http://{ip}/cm?cmnd=Status%200"
try:
self.logger.debug(f"{name}: Getting Status 0 with increased timeout (10 seconds)")
response = requests.get(url_status0, timeout=10)
status0_data = response.json()
# Log the actual response format for debugging
self.logger.debug(f"{name}: Status 0 response: {status0_data}")
except requests.exceptions.Timeout:
self.logger.error(f"{name}: Timeout getting Status 0 (10 seconds) - device may be busy")
# Try one more time with even longer timeout
try:
self.logger.debug(f"{name}: Retrying Status 0 with 20 second timeout")
response = requests.get(url_status0, timeout=20)
status0_data = response.json()
self.logger.debug(f"{name}: Status 0 response on retry: {status0_data}")
except requests.exceptions.Timeout:
self.logger.error(f"{name}: Timeout getting Status 0 even with 20 second timeout")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error getting Status 0 on retry: {str(e)}")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error getting Status 0: {str(e)}")
return False
# Extract device name from Status 0 response
device_name = status0_data.get("Status", {}).get("DeviceName", "")
if not device_name:
self.logger.debug(f"{name}: Could not get device name from Status 0")
return False
self.logger.debug(f"{name}: Device name from Configuration/Other page: {device_name}")
# Get current template with increased timeout
url_template = f"http://{ip}/cm?cmnd=Template"
try:
self.logger.debug(f"{name}: Getting template with increased timeout (10 seconds)")
response = requests.get(url_template, timeout=10)
template_data = response.json()
# Log the actual response format for debugging
self.logger.debug(f"{name}: Template response: {template_data}")
except requests.exceptions.Timeout:
self.logger.error(f"{name}: Timeout getting template (10 seconds) - device may be busy")
# Try one more time with even longer timeout
try:
self.logger.debug(f"{name}: Retrying with 20 second timeout")
response = requests.get(url_template, timeout=20)
template_data = response.json()
self.logger.debug(f"{name}: Template response on retry: {template_data}")
except requests.exceptions.Timeout:
self.logger.error(f"{name}: Timeout getting template even with 20 second timeout")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error getting template on retry: {str(e)}")
return False
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error getting template: {str(e)}")
return False
# Extract current template - handle different response formats
current_template = ""
# Try different possible response formats
if "Template" in template_data:
current_template = template_data.get("Template", "")
elif isinstance(template_data, dict) and len(template_data) > 0:
# If there's no "Template" key but we have a dict, try to get the first value
# This handles cases where the response might be {"NAME":"...","GPIO":[...]}
first_key = next(iter(template_data))
if isinstance(template_data[first_key], str) and "{" in template_data[first_key]:
current_template = template_data[first_key]
self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}")
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
# Convert the dict to a JSON string to match the expected format
import json
current_template = json.dumps(template_data)
self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys")
if not current_template:
self.logger.debug(f"{name}: Could not get current template from response")
return False
self.logger.debug(f"{name}: Current template: {current_template}")
# Check if any key in config_other matches the device name
template_updated = False
if device_name in config_other:
# Key matches device name, check if value is blank or empty
template_value = config_other[device_name]
if not template_value or template_value.strip() == "":
# Value is blank or empty, print message and skip template check
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but value is blank or empty")
print(f"\nDevice {name} at {ip} must be set manually in Configuration/Module to: {device_name}")
print(f"The config_other entry has a blank value for key: {device_name}")
return False
elif current_template != template_value:
# Template doesn't match, write value to template with retry and post-verification
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but template doesn't match")
self.logger.info(f"{name}: Setting template to: {template_value}")
import urllib.parse
encoded_value = urllib.parse.quote(template_value)
max_attempts = 3
last_error = None
for attempt in range(1, max_attempts + 1):
try:
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
self.logger.debug(f"{name}: Setting template (attempt {attempt}/{max_attempts})")
response = requests.get(url, timeout=10)
if response.status_code != 200:
last_error = f"HTTP {response.status_code}"
self.logger.warning(f"{name}: Failed to update template: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
self.logger.info(f"{name}: Template command accepted")
# Activate the template by setting module to 0 (Template module)
module_url = f"http://{ip}/cm?cmnd=Module%200"
try:
module_response = requests.get(module_url, timeout=10)
if module_response.status_code != 200:
last_error = f"HTTP {module_response.status_code}"
self.logger.warning(f"{name}: Failed to set module to 0: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
self.logger.info(f"{name}: Module set to 0 successfully")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error setting module to 0: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
# Restart the device to apply the template
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
try:
restart_response = requests.get(restart_url, timeout=10)
if restart_response.status_code != 200:
last_error = f"HTTP {restart_response.status_code}"
self.logger.warning(f"{name}: Failed to restart device: {last_error}")
else:
self.logger.info(f"{name}: Device restart initiated successfully")
except requests.exceptions.Timeout:
# Restart may time out due to reboot; log and proceed to verification
last_error = "Timeout"
self.logger.info(f"{name}: Restart timed out (device rebooting); proceeding to verification")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error restarting device: {last_error}")
# Post-update verification: poll the device for the new template
verified = False
for vtry in range(1, 4):
try:
# Wait a bit to let device come back
time.sleep(2 if vtry == 1 else 3)
vt_resp = requests.get(f"http://{ip}/cm?cmnd=Template", timeout=10)
vt_data = vt_resp.json()
# Extract template similarly to initial parse
new_template = ""
if "Template" in vt_data:
new_template = vt_data.get("Template", "")
elif isinstance(vt_data, dict) and len(vt_data) > 0:
first_key = next(iter(vt_data))
if isinstance(vt_data[first_key], str) and "{" in vt_data[first_key]:
new_template = vt_data[first_key]
elif all(key in vt_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
new_template = json.dumps(vt_data)
if new_template == template_value:
self.logger.info(f"{name}: Template verification succeeded on attempt {vtry}")
template_updated = True
verified = True
break
except Exception as ve:
self.logger.debug(f"{name}: Template verification attempt {vtry} failed: {ve}")
if verified:
break
else:
last_error = last_error or "Verification failed"
self.logger.warning(f"{name}: Template verification failed (attempt {attempt}/{max_attempts})")
except requests.exceptions.Timeout:
last_error = "Timeout"
self.logger.warning(f"{name}: Timeout updating template (attempt {attempt}/{max_attempts})")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error updating template: {last_error} (attempt {attempt}/{max_attempts})")
if attempt < max_attempts:
time.sleep(1)
if not template_updated:
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": "Template <config_other>",
"error": last_error
})
else:
self.logger.debug(f"{name}: Device name '{device_name}' matches key in config_other and template matches value")
else:
# No key matches device name, check if any value matches the template
matching_key = None
for key, value in config_other.items():
if value == current_template:
matching_key = key
break
if matching_key:
# Value matches template, write key to device name
self.logger.info(f"{name}: Template matches value for key '{matching_key}' in config_other")
self.logger.info(f"{name}: Setting device name to: {matching_key}")
max_attempts = 3
last_error = None
for attempt in range(1, max_attempts + 1):
try:
url = f"http://{ip}/cm?cmnd=DeviceName%20{matching_key}"
self.logger.debug(f"{name}: Setting device name (attempt {attempt}/{max_attempts})")
response = requests.get(url, timeout=10)
if response.status_code != 200:
last_error = f"HTTP {response.status_code}"
self.logger.warning(f"{name}: Failed to update device name: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
self.logger.info(f"{name}: Device name command accepted")
# Activate the template by setting module to 0 (Template module)
module_url = f"http://{ip}/cm?cmnd=Module%200"
try:
module_response = requests.get(module_url, timeout=10)
if module_response.status_code != 200:
last_error = f"HTTP {module_response.status_code}"
self.logger.warning(f"{name}: Failed to set module to 0: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
self.logger.info(f"{name}: Module set to 0 successfully")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error setting module to 0: {last_error}")
if attempt < max_attempts:
time.sleep(1)
continue
# Restart the device to apply the template
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
try:
restart_response = requests.get(restart_url, timeout=10)
if restart_response.status_code != 200:
last_error = f"HTTP {restart_response.status_code}"
self.logger.warning(f"{name}: Failed to restart device: {last_error}")
else:
self.logger.info(f"{name}: Device restart initiated successfully")
except requests.exceptions.Timeout:
last_error = "Timeout"
self.logger.info(f"{name}: Restart timed out (device rebooting); proceeding to verification")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error restarting device: {last_error}")
# Post-update verification: poll Status 0 for the new device name
verified = False
for vtry in range(1, 4):
try:
time.sleep(2 if vtry == 1 else 3)
v_resp = requests.get(f"http://{ip}/cm?cmnd=Status%200", timeout=10)
v_data = v_resp.json()
new_name = v_data.get("Status", {}).get("DeviceName", "")
if new_name == matching_key:
self.logger.info(f"{name}: Device name verification succeeded on attempt {vtry}")
template_updated = True
verified = True
break
except Exception as ve:
self.logger.debug(f"{name}: Device name verification attempt {vtry} failed: {ve}")
if verified:
break
else:
last_error = last_error or "Verification failed"
self.logger.warning(f"{name}: Device name verification failed (attempt {attempt}/{max_attempts})")
except requests.exceptions.Timeout:
last_error = "Timeout"
self.logger.warning(f"{name}: Timeout updating device name (attempt {attempt}/{max_attempts})")
except requests.exceptions.RequestException as e:
last_error = str(e)
self.logger.warning(f"{name}: Error updating device name: {last_error} (attempt {attempt}/{max_attempts})")
if attempt < max_attempts:
time.sleep(1)
if not template_updated:
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"DeviceName {matching_key}",
"error": last_error
})
else:
# No matches found, print detailed information about what's on the device
self.logger.info(f"{name}: No matches found in config_other for either Device Name or Template")
self.logger.info(f"{name}: Current Device Name on device: '{device_name}'")
self.logger.info(f"{name}: Current Template on device: '{current_template}'")
print(f"\nNo template match found for device {name} at {ip}")
print(f" Device Name on device: '{device_name}'")
print(f" Template on device: '{current_template}'")
print("Please add an appropriate entry to config_other in your configuration file.")
return template_updated
except requests.exceptions.RequestException as e:
self.logger.error(f"Error checking/updating template for device at {ip}: {str(e)}")
return False
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False):
"""Configure MQTT settings for a device.
Args:
ip: The IP address of the device
name: The name/hostname of the device
mqtt_status: The current MQTT status of the device (from Status 6)
is_new_device: Whether this is a new device (True) or existing device (False)
set_friendly_name: Whether to set the friendly name
enable_mqtt: Whether to enable MQTT
with_retry: Whether to use retry logic
reboot: Whether to reboot the device after configuration
Returns:
bool: True if configuration was successful, False otherwise
"""
try:
# Set Friendly Name if requested
if set_friendly_name:
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{name}"
response = requests.get(friendly_name_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Set Friendly Name to {name}")
else:
self.logger.error(f"Failed to set Friendly Name to {name}")
# Enable MQTT if requested
if enable_mqtt:
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
response = requests.get(mqtt_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Enabled MQTT for {name}")
else:
self.logger.error(f"Failed to enable MQTT for {name}")
# Configure MQTT settings
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return False
# Get the base hostname (everything before the dash)
hostname_base = name.split('-')[0] if '-' in name else name
# Define MQTT fields
mqtt_fields = {
"MqttHost": mqtt_config.get('Host', ''),
"MqttPort": mqtt_config.get('Port', 1883),
"MqttUser": mqtt_config.get('User', ''),
"MqttPassword": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
# For existing devices, check if MQTT settings need to be updated
changes_needed = []
if not is_new_device and mqtt_status:
device_mqtt = mqtt_status.get('MqttHost', {})
force_password_update = False
# Check each MQTT setting
if device_mqtt.get('Host') != mqtt_fields['MqttHost']:
changes_needed.append(('MqttHost', mqtt_fields['MqttHost']))
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['MqttHost']}")
force_password_update = True
if device_mqtt.get('Port') != mqtt_fields['MqttPort']:
changes_needed.append(('MqttPort', mqtt_fields['MqttPort']))
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['MqttPort']}")
force_password_update = True
if device_mqtt.get('User') != mqtt_fields['MqttUser']:
changes_needed.append(('MqttUser', mqtt_fields['MqttUser']))
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['MqttUser']}")
force_password_update = True
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
changes_needed.append(('Topic', mqtt_fields['Topic']))
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
force_password_update = True
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
force_password_update = True
# Add password update if any MQTT setting changed or user was updated
if force_password_update:
changes_needed.append(('MqttPassword', mqtt_fields['MqttPassword']))
self.logger.debug(f"{name}: MQTT Password will be updated")
# Check NoRetain setting
no_retain = mqtt_config.get('NoRetain', False)
if no_retain:
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
else:
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
else:
# For new devices, set all MQTT settings
for field, value in mqtt_fields.items():
changes_needed.append((field, value))
# Apply MQTT settings
mqtt_updated = False
for setting, value in changes_needed:
try:
# For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value
if setting == "FullTopic":
url = f"http://{ip}/cm?cmnd={setting}{value}"
else:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.debug(f"{name}: Updated {setting} to {value}")
else:
self.logger.debug(f"{name}: Updated MQTT Password")
mqtt_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to update {setting} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout updating {setting} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error updating {setting}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to update {setting} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{setting} {value}",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.info(f"{name}: Set {setting} to {value}")
else:
self.logger.info(f"{name}: Set MQTT Password")
mqtt_updated = True
else:
self.logger.error(f"{name}: Failed to set {setting}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
# Apply console settings
console_updated = self.apply_console_settings(ip, name, with_retry)
# Reboot the device if requested
if reboot:
save_url = f"http://{ip}/cm?cmnd=Restart%201"
response = requests.get(save_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Saved configuration and rebooted {name}")
else:
self.logger.error(f"Failed to save configuration for {name}")
return mqtt_updated or console_updated
except requests.exceptions.RequestException as e:
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
return False
def apply_console_settings(self, ip, name, with_retry=False):
"""Apply console parameters from configuration to the device.
Returns True if any setting was updated, False otherwise.
"""
console_updated = False
console_params = self.config.get('console', {})
if not console_params:
return False
self.logger.info(f"{name}: Setting console parameters from configuration")
# Special handling for Retain parameters - need to send opposite state first, then final state
# This is necessary because the changes are what create the update of the Retain state at the MQTT server
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
# Process Retain parameters first
for param in retain_params:
if param in console_params:
try:
final_value = console_params[param]
# Pre-check current value; skip if already at desired state
current_val, ok = self._get_console_param_value(ip, name, param)
if ok:
desired_cmp = str(final_value).strip().lower()
current_cmp = str(current_val or "").strip().lower()
# Map 1/0 to on/off for retain-like responses
if current_cmp in ("1", "0") and desired_cmp in ("on", "off"):
current_cmp = "on" if current_cmp == "1" else "off"
if current_cmp == desired_cmp:
self.logger.debug(f"{name}: {param} already {final_value}, skipping retain toggle")
continue
# Set opposite state first
opposite_value = "On" if final_value.lower() == "off" else "Off"
if with_retry:
# First command (opposite state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {opposite_value}",
"error": last_error
})
else:
# First command (opposite state) - without retry logic
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{name}: Failed to set {param} to {opposite_value}")
# Small delay to ensure commands are processed in order
time.sleep(0.5)
if with_retry:
# Second command (final state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
# Verify the change took effect
verified = self._verify_console_param_value(ip, name, param, final_value)
if verified:
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Verification failed for {param} after update; retrying (attempt {attempts}/{max_attempts})")
if attempts < max_attempts:
time.sleep(1)
else:
self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {final_value}",
"error": last_error
})
else:
# Second command (final state) - without retry logic
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{name}: Failed to set {param} to {final_value}")
except Exception as e:
self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
# Track the failure for later reporting if using retry logic
if with_retry:
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} (both steps)",
"error": str(e)
})
# Process all other console parameters
# Track rules that need to be enabled
rules_to_enable = {}
for param, value in console_params.items():
# Skip Retain parameters as they're handled specially above
if param in retain_params:
continue
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# Store the rule number for later enabling
rule_num = param[-1]
rules_to_enable[rule_num] = True
if with_retry:
self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable")
else:
self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable")
# Skip Rule1, Rule2, etc. if we're auto-enabling rules and using retry logic
if with_retry and param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit():
# If this is in the config, we'll respect it, but log that it's not needed
self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature")
# Determine if update is needed before sending
should_send = True
try:
current_val, ok = self._get_console_param_value(ip, name, param)
if ok:
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# Compare rule definitions (whitespace-insensitive, case-insensitive)
import re as _re
def _norm_rule(s):
s = str(s or "")
s = s.strip().lower()
s = _re.sub(r"\s+", " ", s)
return s
if _norm_rule(current_val) == _norm_rule(value):
self.logger.debug(f"{name}: {param} rule already matches definition, skipping")
should_send = False
else:
# Generic comparison
desired_cmp = str(value).strip().lower()
current_cmp = str(current_val or "").strip().lower()
# Normalize common ON/OFF vs 1/0 forms
if current_cmp in ("on", "off") and desired_cmp in ("1", "0"):
current_cmp = "1" if current_cmp == "on" else "0"
if current_cmp in ("1", "0") and desired_cmp in ("on", "off"):
current_cmp = "on" if current_cmp == "1" else "off"
if current_cmp == desired_cmp:
self.logger.debug(f"{name}: {param} already set to {value}, skipping")
should_send = False
except Exception:
# If pre-check fails, fall back to sending command
pass
if not should_send:
continue
# Regular console parameter
# Special handling for rule parameters to properly encode the URL
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# For rule commands, we need to URL encode the entire value to preserve special characters
import urllib.parse
encoded_value = urllib.parse.quote(value)
url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}"
self.logger.info(f"{name}: Sending rule command: {url}")
else:
url = f"http://{ip}/cm?cmnd={param}%20{value}"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
# Special logging for rule parameters
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
self.logger.info(f"{name}: Rule command response: {response.text}")
self.logger.info(f"{name}: Set rule {param} to '{value}'")
else:
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
# Verify the change took effect before marking success
if self._verify_console_param_value(ip, name, param, value):
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Verification failed for {param} after update; retrying (attempt {attempts}/{max_attempts})")
if attempts < max_attempts:
time.sleep(1)
else:
self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {value}",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
self.logger.info(f"{name}: Rule command response: {response.text}")
self.logger.info(f"{name}: Set rule {param} to '{value}'")
else:
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
else:
self.logger.error(f"{name}: Failed to set console parameter {param}")
# Auto-enable any rules that were defined
if with_retry:
self.logger.info(f"{name}: Rules to enable: {rules_to_enable}")
for rule_num in rules_to_enable:
rule_enable_param = f"Rule{rule_num}"
# Skip if the rule enable command was already in the config
if with_retry:
# Check if the uppercase version (Rule1) is in the config
if rule_enable_param in console_params:
self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)")
continue
# If we're here, it means we found a rule definition earlier and added it to rules_to_enable
# No need to check again if it's in console_params
self.logger.info(f"{name}: Will enable {rule_enable_param} for rule definition found in config")
else:
# Simple check for any version of the rule enable command
if any(p.lower() == rule_enable_param.lower() for p in console_params):
continue
# Rule auto-enabling
# Pre-check if rule already enabled
try:
current_val, ok = self._get_console_param_value(ip, name, rule_enable_param)
if ok:
state = str(current_val or "").strip().lower()
if state in ("on", "1", "enabled", "active"):
self.logger.debug(f"{name}: {rule_enable_param} already enabled, skipping")
continue
except Exception:
pass
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
if self._verify_console_param_value(ip, name, rule_enable_param, "1"):
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Verification failed for {rule_enable_param} after update; retrying (attempt {attempts}/{max_attempts})")
if attempts < max_attempts:
time.sleep(1)
else:
self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{rule_enable_param} 1",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
else:
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}")
return console_updated
def _verify_console_param_value(self, ip, name, param, expected):
"""Post-update verification; returns True if device value equals expected.
Normalizes values:
- ruleN (lowercase) compare rules text (case/whitespace-insensitive)
- RuleN (uppercase) consider ON/1/enabled as True when expected is truthy ('1','on')
- Generic: normalize 'on'/'off' vs '1'/'0'
"""
try:
cur, ok = self._get_console_param_value(ip, name, param)
if not ok:
return False
# Rule definition verify
if param.lower().startswith('rule') and param.islower() and param[-1].isdigit():
import re as _re
def _norm(s):
s = str(s or '')
s = s.strip().lower()
s = _re.sub(r"\s+", " ", s)
return s
return _norm(cur) == _norm(expected)
# Rule enable verify (RuleN)
if param.lower().startswith('rule') and not param.islower() and param[-1].isdigit():
val = str(cur or '').strip().lower()
if val in ('on','enabled','active'):
val = '1'
if val in ('off','disabled','inactive'):
val = '0'
exp = str(expected or '').strip().lower()
if exp in ('on','enabled','active'):
exp = '1'
if exp in ('off','disabled','inactive'):
exp = '0'
return val == exp or val == '1'
# Generic verify
val = str(cur or '').strip().lower()
exp = str(expected or '').strip().lower()
if val in ('on','off') and exp in ('1','0'):
val = '1' if val=='on' else '0'
if val in ('1','0') and exp in ('on','off'):
val = 'on' if val=='1' else 'off'
return val == exp
except Exception:
return False
def _get_console_param_value(self, ip, name, param):
"""Query the device for the current value of a console parameter.
Returns (value, True) on success, (None, False) on failure.
Special handling:
- Lowercase ruleN: returns the current rule definition text using RuleN 5
- Uppercase RuleN: returns the current enable state (ON/OFF or 1/0)
"""
try:
# Rules handling
if param.lower().startswith('rule') and param[-1].isdigit():
rule_num = param[-1]
# If lowercase 'ruleN', fetch rule definition using 'RuleN 5'
if param.islower():
url = f"http://{ip}/cm?cmnd=Rule{rule_num}%205"
response = requests.get(url, timeout=5)
# Try to parse JSON first
try:
data = response.json()
# Common keys: 'Rules' may contain the definition
if isinstance(data, dict):
for k, v in data.items():
if str(k).lower() == 'rules':
return v, True
# Fallback: first string value
for v in data.values():
if isinstance(v, str):
return v, True
return str(data), True
except Exception:
# Fallback to text parsing
text = response.text or ''
return text, True
else:
# Uppercase 'RuleN' - fetch enable state
url = f"http://{ip}/cm?cmnd=Rule{rule_num}"
response = requests.get(url, timeout=5)
try:
data = response.json()
# Expect something like {"Rule1":"ON"}
if isinstance(data, dict):
# Try exact key first
key = f"Rule{rule_num}"
if key in data:
return data[key], True
# Fallback: any ON/OFF value
for v in data.values():
if isinstance(v, (str, int)):
return v, True
return str(data), True
except Exception:
text = response.text or ''
return text, True
# Generic parameter query - send the command name without a value
url = f"http://{ip}/cm?cmnd={param}"
response = requests.get(url, timeout=5)
try:
data = response.json()
if isinstance(data, dict) and data:
# Prefer an exact key match (case-insensitive)
param_lower = param.lower()
for k, v in data.items():
if str(k).lower() == param_lower:
return v, True
# Fallback: first value
first_val = next(iter(data.values()))
return first_val, True
# If not a dict, return as string
return str(data), True
except Exception:
# Fallback to raw text
return response.text, True
except requests.exceptions.RequestException as e:
self.logger.debug(f"{name}: Failed to query current value for {param}: {e}")
return None, False
def apply_config_other(self, ip, name):
"""Wrapper for applying config_other (template) settings."""
return self.check_and_update_template(ip, name)
def configure_unknown_device(self, ip, hostname):
"""Configure an unknown device with the given hostname and MQTT settings."""
return self.configure_mqtt_settings(
ip=ip,
name=hostname,
is_new_device=True,
set_friendly_name=True,
enable_mqtt=True,
with_retry=False,
reboot=True
)
def is_ip_in_network_filter(self, ip_address):
"""Check if an IP address is in any of the configured network filters.
Args:
ip_address: The IP address to check
Returns:
tuple: (is_in_network, target_network, network_name) where:
- is_in_network is a boolean indicating if the IP is in a network
- target_network is the network configuration dict or None
- network_name is the name of the network or None
"""
network_filters = self.config['unifi'].get('network_filter', {})
for network_name, network in network_filters.items():
if ip_address.startswith(network['subnet']):
self.logger.info(f"IP {ip_address} is in network: {network_name}")
return True, network, network_name
self.logger.error(f"IP {ip_address} is not in any configured network")
return False, None, None
def process_single_device(self, device_identifier):
"""Process a single device by hostname or IP address.
Args:
device_identifier: Either a hostname or IP address
Returns:
bool: True if device was processed successfully, False otherwise
"""
self.logger.info(f"Processing single device: {device_identifier}")
# Check if device_identifier is an IP address or hostname
is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier))
# If it's an IP address, check if it's in the network_filter first
if is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier)
if not in_network:
return False
# Setup Unifi client if not already done
if not self.unifi_client:
try:
self.setup_unifi_client()
except ConnectionError as e:
self.logger.error(f"Failed to connect to UniFi controller: {str(e)}")
return False
# Get all clients from Unifi
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return False
# Find the device in Unifi
target_device = None
if is_ip:
# Search by IP
self.logger.debug(f"Searching for device with IP: {device_identifier}")
target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None)
if not target_device:
self.logger.error(f"No device found with IP: {device_identifier}")
return False
else:
# Search by hostname - support partial and wildcard matches
self.logger.debug(f"Searching for device with hostname: {device_identifier}")
# Check if the identifier contains wildcards
has_wildcards = '*' in device_identifier
# Convert wildcards to regex pattern if present
if has_wildcards:
pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*')
self.logger.debug(f"Using wildcard pattern: {pattern}")
else:
# For partial matches, we'll use the identifier as a substring
pattern = device_identifier.lower()
self.logger.debug(f"Using partial match pattern: {pattern}")
# Find all matching devices
matching_devices = []
for device in all_clients:
hostname = device.get('hostname', '').lower()
name = device.get('name', '').lower()
if has_wildcards:
# For wildcard matches, use regex
if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)):
matching_devices.append(device)
else:
# For partial matches, check if pattern is a substring
if pattern in hostname or pattern in name:
matching_devices.append(device)
# Handle the results
if not matching_devices:
self.logger.error(f"No devices found matching: {device_identifier}")
return False
elif len(matching_devices) > 1:
# Multiple matches found - log them and use the first one
self.logger.warning(f"Multiple devices found matching '{device_identifier}':")
for i, device in enumerate(matching_devices, 1):
device_name = device.get('name', device.get('hostname', 'Unknown'))
device_ip = device.get('ip', '')
self.logger.warning(f" {i}. {device_name} (IP: {device_ip})")
self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}")
# Use the first (or only) matching device
target_device = matching_devices[0]
# Get device details
device_name = target_device.get('name', target_device.get('hostname', 'Unknown'))
device_hostname = target_device.get('hostname', '')
device_ip = target_device.get('ip', '')
device_mac = target_device.get('mac', '')
self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})")
# If we're processing a hostname (not an IP), check if the device's IP is in the network_filter
if not is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip)
if not in_network:
self.logger.error(f"Device {device_name} is not in any configured network")
return False
# For IP addresses, we already have the target_network from the earlier check
# Check if device is excluded
exclude_patterns = target_network.get('exclude_patterns', [])
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='error'):
return False
# Check if device is in unknown_device_patterns
unknown_patterns = target_network.get('unknown_device_patterns', [])
is_unknown = False
device_reported_hostname = None
# Initialize variables for hostname bug detection
unifi_name_matches_unknown = False
device_hostname_matches_unknown = False
for pattern in unknown_patterns:
pattern_lower = pattern.lower()
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
# Check if pattern already starts with ^
if pattern_regex.startswith('^'):
regex_pattern = pattern_regex
else:
regex_pattern = f"^{pattern_regex}"
if (re.match(regex_pattern, device_name.lower()) or
re.match(regex_pattern, device_hostname.lower())):
unifi_name_matches_unknown = True
self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}")
break
# If the name matches unknown patterns, check the device's self-reported hostname
# before declaring it unknown
#
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
# By checking the device's self-reported hostname, we can determine if the device
# actually has a real hostname that UniFi is not showing correctly.
if unifi_name_matches_unknown:
self.logger.info(f"Checking device's self-reported hostname before declaring unknown")
# Get the device's self-reported hostname using the common function
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='info')
if success and device_reported_hostname:
# Check if the self-reported hostname also matches unknown patterns
device_hostname_matches_unknown = False
# Use the base of the self-reported hostname up to the first '-' for bug detection
device_hostname_base = device_reported_hostname.split('-')[0].lower()
for pattern in unknown_patterns:
if self._match_pattern(device_hostname_base, pattern, match_entire_string=False):
device_hostname_matches_unknown = True
self.logger.info(f"Device's self-reported hostname base '{device_hostname_base}' (from '{device_reported_hostname}') matches unknown pattern: {pattern}")
break
# Only declare as unknown if both UniFi-reported and self-reported hostnames match unknown patterns
if device_hostname_matches_unknown:
is_unknown = True
self.logger.info("Device declared as unknown: both UniFi-reported and self-reported hostnames match unknown patterns")
else:
is_unknown = False
self.logger.info("Device NOT declared as unknown: self-reported hostname doesn't match unknown patterns (possible UniFi OS bug)")
else:
# No self-reported hostname found or error occurred, fall back to UniFi-reported name
is_unknown = unifi_name_matches_unknown
self.logger.info("Failed to get device's self-reported hostname, using UniFi-reported name")
else:
# Not in Device mode or name doesn't match unknown patterns, use the standard check
is_unknown = unifi_name_matches_unknown
# Determine connection type based on available fields
connection = "Unknown"
if target_device.get('essid'):
connection = f"Wireless - {target_device.get('essid')}"
elif target_device.get('radio') or target_device.get('wifi'):
connection = "Wireless"
elif target_device.get('port') or target_device.get('switch_port') or target_device.get('switch'):
connection = "Wired"
# Print the IP and Connection after verifying the device in UniFi
print(f"Verified in UniFi: IP: {device_ip} | Connection: {connection}")
# Create a device info dictionary
# Add unifi_hostname_bug_detected flag to indicate when the UniFi OS hostname bug is detected
# (when UniFi name matches unknown patterns but device's self-reported name doesn't)
unifi_hostname_bug_detected = (unifi_name_matches_unknown and not is_unknown and
not device_hostname_matches_unknown)
# If bug detected and we have the device-reported hostname, prefer it going forward
if unifi_hostname_bug_detected and device_reported_hostname:
self.logger.info(f"Using device self-reported hostname '{device_reported_hostname}' instead of UniFi '{device_name}' for IP {device_ip}")
device_name = device_reported_hostname
device_hostname = device_reported_hostname
device_info = {
"name": device_name,
"ip": device_ip,
"mac": device_mac,
"last_seen": target_device.get('last_seen', ''),
"hostname": device_hostname,
"notes": target_device.get('note', ''),
"connection": connection,
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
}
# Process the device based on whether it's unknown or not
if is_unknown:
self.logger.info(f"Processing unknown device: {device_name}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{device_ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate")
# Start toggling at 1/2Hz
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {device_name} at IP: {device_ip} Connection: {connection}")
print(f"Current hostname: {device_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
if new_hostname and new_hostname != device_hostname:
print(f"Setting new hostname to: {new_hostname}")
# Re-enable logging
logging.disable(logging.NOTSET)
return self.configure_unknown_device(device_ip, new_hostname)
else:
print("No valid hostname entered, skipping device")
# Re-enable logging
logging.disable(logging.NOTSET)
return False
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
else:
self.logger.info(f"Device {device_name} does not have a toggle button")
return self.configure_unknown_device(device_ip, device_hostname)
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}")
return False
else:
self.logger.info(f"Processing normal device: {device_name}")
# Create a temporary list with just this device
temp_devices = [device_info]
# Save to current.json temporarily
current_config = {"tasmota": {"devices": temp_devices}}
with open('current.json', 'w') as f:
json.dump(current_config, f, indent=2)
# Process the device - skip unknown device filtering in Device mode
self.get_device_details(use_current_json=True, skip_unknown_filter=True)
return True
def get_device_details(self, use_current_json=True, skip_unknown_filter=False):
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
Filters out devices matching unknown_device_patterns unless skip_unknown_filter is True.
Implements retry logic for console commands with up to 3 attempts and tracks failures.
Args:
use_current_json: Whether to use current.json instead of tasmota.json
skip_unknown_filter: If True, don't filter out unknown devices (used by --Device mode)
"""
self.logger.info("Starting to gather detailed device information...")
device_details = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return
# Determine which devices to process
if skip_unknown_filter:
# When using --Device parameter, don't filter out unknown devices
devices = all_devices
self.logger.debug("Skipping unknown device filtering (Device mode)")
else:
# Normal mode: Filter out devices matching unknown_device_patterns
devices = []
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
is_unknown = False
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
# Check if pattern already starts with ^
if pattern.startswith('^'):
regex_pattern = pattern
else:
regex_pattern = f"^{pattern}"
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
is_unknown = True
break
if not is_unknown:
devices.append(device)
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return
def check_mqtt_settings(ip, name, mqtt_status):
"""Check and update MQTT settings if they don't match config"""
# Use the unified MQTT configuration method
return self.configure_mqtt_settings(
ip=ip,
name=name,
mqtt_status=mqtt_status,
is_new_device=False,
set_friendly_name=False,
enable_mqtt=False,
with_retry=True,
reboot=False
)
for device in devices:
if not isinstance(device, dict):
self.logger.warning(f"Skipping invalid device entry: {device}")
continue
name = device.get('name', 'Unknown')
ip = device.get('ip')
mac = device.get('mac')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Checking device: {name} at {ip}")
try:
# Get Status 2 for firmware version
url_status = f"http://{ip}/cm?cmnd=Status%202"
response = requests.get(url_status, timeout=5)
status_data = response.json()
# Get Status 5 for network info using the common function
hostname, hostname_success = self.get_device_hostname(ip, name, timeout=5, log_level='info')
# Create a network_data structure for backward compatibility
network_data = {"StatusNET": {"Hostname": hostname if hostname_success else "Unknown"}}
# Get Status 6 for MQTT info
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
response = requests.get(url_mqtt, timeout=5)
mqtt_data = response.json()
# Decide the effective name to use for operations/logs
op_name = name
if device.get('unifi_hostname_bug_detected') and hostname_success and hostname:
op_name = hostname
self.logger.info(f"Using device self-reported hostname '{op_name}' for operations instead of UniFi name '{name}' (IP: {ip})")
# Check and update MQTT settings if needed
mqtt_updated = check_mqtt_settings(ip, op_name, mqtt_data)
# Check and update template (config_other) if needed
template_updated = self.apply_config_other(ip, op_name)
# Console settings are now applied in configure_mqtt_settings
console_updated = mqtt_updated
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
"mqtt_status": "Updated" if mqtt_updated else "Verified",
"console_status": "Updated" if console_updated else "Verified",
"template_status": "Updated" if template_updated else "Verified",
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "online"
}
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": "Unknown",
"status": "offline",
"error": str(e)
}
device_details.append(device_detail)
time.sleep(0.5)
# Save all device details at once
try:
with open('TasmotaDevices.json', 'w') as f:
json.dump(device_details, f, indent=2)
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
except Exception as e:
self.logger.error(f"Error saving device details: {e}")
# Print summary of command failures if any occurred
if hasattr(self, 'command_failures') and self.command_failures:
failure_count = len(self.command_failures)
print("\n" + "="*80)
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
print("="*80)
# Group failures by device for better readability
failures_by_device = {}
for failure in self.command_failures:
device_name = failure['device']
if device_name not in failures_by_device:
failures_by_device[device_name] = []
failures_by_device[device_name].append(failure)
# Print failures grouped by device
for device_name, failures in failures_by_device.items():
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
print("-" * 40)
for i, failure in enumerate(failures, 1):
print(f" {i}. Command: {failure['command']}")
print(f" Error: {failure['error']}")
print("\n" + "="*80)
def generate_unifi_hostname_report(self, timeout: int = 5, save_path: str = 'TasmotaHostnameReport.json', print_report: bool = True):
"""Generate a report comparing UniFi-reported hostnames with Tasmota device hostnames.
Filters devices using network_filter (via is_tasmota_device). For each device, queries
the device to retrieve its self-reported hostname and compares it to the UniFi-reported
hostname or name. Saves a JSON report and optionally prints a human-readable summary.
"""
if not self.unifi_client:
# Attempt to set up UniFi client (requires self.config already loaded)
try:
self.setup_unifi_client()
except Exception as e:
self.logger.error(f"Cannot set up UniFi client: {e}")
return []
try:
all_clients = self.unifi_client.get_clients()
except Exception as e:
self.logger.error(f"Failed to retrieve clients from UniFi: {e}")
return []
# Build AP name maps to resolve human-friendly AP names
ap_mac_to_name = {}
bssid_to_name = {}
try:
ap_devices = self.unifi_client.get_devices()
for ap in ap_devices or []:
ap_name = ap.get('name') or ap.get('device_name') or ap.get('hostname') or ''
mac = (ap.get('mac') or '').lower()
if mac and ap_name:
ap_mac_to_name[mac] = ap_name
# Map BSSIDs from vap_table and radio_table to the AP name
vap_table = ap.get('vap_table') or []
if isinstance(vap_table, dict):
vap_table = [vap_table]
for vap in vap_table:
b = (vap.get('bssid') or '').lower()
if b and ap_name:
bssid_to_name[b] = ap_name
radio_table = ap.get('radio_table') or []
if isinstance(radio_table, dict):
radio_table = [radio_table]
for r in radio_table:
b = (r.get('bssid') or '').lower()
if b and ap_name:
bssid_to_name[b] = ap_name
except Exception as e:
self.logger.debug(f"Unable to fetch UniFi devices for AP mapping: {e}")
report_entries = []
mismatches = []
total_considered = 0
for device in all_clients:
try:
if not self.is_tasmota_device(device):
continue
total_considered += 1
ip = device.get('ip', '')
mac = device.get('mac', '')
unifi_name = device.get('name') or device.get('hostname') or ''
unifi_hostname = device.get('hostname', '') or ''
# Resolve Connected AP name using UniFi device map; SSID is not needed for output
ssid = device.get('essid') or device.get('ssid') or ''
raw_ap_name = device.get('ap_name') or device.get('ap') or ''
bssid = (device.get('bssid') or '').lower()
ap_mac = (device.get('ap_mac') or '').lower()
resolved_ap = ''
# Prefer mapping by ap_mac, then by bssid, then raw_ap_name, otherwise Unknown
if ap_mac and ap_mac in ap_mac_to_name:
resolved_ap = ap_mac_to_name.get(ap_mac, '')
elif bssid and bssid in bssid_to_name:
resolved_ap = bssid_to_name.get(bssid, '')
elif raw_ap_name:
resolved_ap = raw_ap_name
else:
resolved_ap = 'Unknown'
device_hostname = ''
success = False
if ip:
try:
device_hostname, success = self.get_device_hostname(ip, unifi_name, timeout=timeout, log_level='info')
except Exception as e:
self.logger.debug(f"Error retrieving device hostname for {unifi_name} at {ip}: {e}")
# Compare against UniFi's hostname if present; otherwise, fall back to name
unifi_compare = (unifi_hostname or unifi_name or '').strip()
match = bool(success) and device_hostname.strip().lower() == unifi_compare.lower()
entry = {
'ip': ip,
'mac': mac,
'unifi_name': unifi_name,
'unifi_hostname': unifi_hostname,
'device_hostname': device_hostname,
'match': match,
'ssid': ssid,
'ap': resolved_ap
}
report_entries.append(entry)
if success and not match:
mismatches.append(entry)
except Exception as e:
self.logger.debug(f"Skipping device due to error: {e}")
continue
# Save JSON report
try:
summary = {
'generated_at': datetime.now().isoformat(),
'total_tasmota_devices': total_considered,
'mismatch_count': len(mismatches),
'devices': report_entries,
'mismatches': mismatches
}
with open(save_path, 'w') as f:
json.dump(summary, f, indent=2)
self.logger.info(f"Hostname report saved to {save_path}")
except Exception as e:
self.logger.error(f"Failed to save hostname report: {e}")
if print_report:
sep = " | " # 3-char separator
def pad(text, width):
s = str(text) if text is not None else ""
if len(s) > width:
return s[:width]
return s.ljust(width)
def row(c1, c2, c3, c4):
return f"{pad(c1,20)}{sep}{pad(c2,20)}{sep}{pad(c3,15)}{sep}{pad(c4,20)}"
header_sep = ("-"*20) + " + " + ("-"*20) + " + " + ("-"*15) + " + " + ("-"*20)
print("\n" + "="*80)
print("Tasmota UniFi Hostname Report")
print("="*80)
print(f"Total Tasmota devices considered: {total_considered}")
print(f"Hostname mismatches: {len([d for d in report_entries if not d.get('match')])}")
matched = [d for d in report_entries if d.get('match')]
unknown = [d for d in report_entries if not d.get('match')]
def conn_value(d):
ap = (d.get('ap') or '').strip()
if not ap or ap.lower() == 'unknown':
return 'Unknown'
import re as _re
m = _re.match(r'^\s*ap\s*-\s*(.*)$', ap, flags=_re.IGNORECASE)
if m:
remainder = m.group(1).strip()
return f"AP - {remainder}" if remainder else 'AP -'
return f"AP - {ap}"
# Matched section
print("\nMatched devices:")
print(row("Hostname", "Device Hostname", "IP", "Conncted"))
print(header_sep)
for d in matched:
uni = d.get('unifi_hostname') or d.get('unifi_name') or ''
dev = d.get('device_hostname') or 'Unknown'
ip = d.get('ip') or ''
conn = conn_value(d)
print(row(uni, dev, ip, conn))
# Unknown/Mismatched section
print("\nUnknown/Mismatched devices:")
print(row("Hostname", "Device Hostname", "IP", "Conncted"))
print(header_sep)
for d in unknown:
uni = d.get('unifi_hostname') or d.get('unifi_name') or ''
dev = d.get('device_hostname') or 'Unknown'
ip = d.get('ip') or ''
conn = conn_value(d)
print(row(uni, dev, ip, conn))
print("="*80 + "\n")
return report_entries
def main():
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
parser.add_argument('--config', default='network_configuration.json',
help='Path to configuration file')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--skip-unifi', action='store_true',
help='Skip UniFi discovery and use existing current.json')
parser.add_argument('--process-unknown', action='store_true',
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
parser.add_argument('--unifi-hostname-report', action='store_true',
help='Generate a report comparing UniFi and Tasmota device hostnames')
parser.add_argument('--Device',
help='Process a single device by hostname or IP address')
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.debug else logging.INFO
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if args.debug else '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=log_level,
format=log_format,
datefmt='%Y-%m-%d %H:%M:%S')
print("Starting Tasmota Device Discovery and Version Check...")
# Create TasmotaDiscovery instance
discovery = TasmotaDiscovery(debug=args.debug)
discovery.load_config(args.config)
try:
# Generate UniFi/Tasmota hostname report if requested
if args.unifi_hostname_report:
print("Generating UniFi/Tasmota hostname report...")
discovery.generate_unifi_hostname_report()
return 0
# Process a single device if --Device parameter is provided
if args.Device:
print(f"Processing single device: {args.Device}")
# Let process_single_device handle the UniFi client setup as needed
success = discovery.process_single_device(args.Device)
if success:
print(f"\nDevice {args.Device} processed successfully!")
print("- Detailed information saved to: TasmotaDevices.json")
else:
print(f"\nFailed to process device: {args.Device}")
return 1
else:
# Normal processing flow
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
else:
print("Skipping UniFi discovery, using existing current.json...")
if args.process_unknown:
print("\nStep 2: Processing unknown devices...")
discovery.process_unknown_devices()
else:
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
except ConnectionError as e:
print(f"Connection Error: {str(e)}")
print("\nTrying to proceed with existing current.json...")
try:
discovery.get_device_details(use_current_json=True)
print("\nSuccessfully retrieved device details from existing current.json")
except Exception as inner_e:
print(f"Error processing existing devices: {str(inner_e)}")
return 1
except Exception as e:
print(f"Error: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()