1. Restructured configuration: Moved config_other and console to top level 2. Added common _match_pattern function for regex pattern matching 3. Implemented Unifi Hostname bug fix in is_hostname_unknown 4. Created common get_device_hostname function to eliminate code duplication 5. Added comprehensive test scripts for all new functionality 6. Added detailed documentation for all changes
2235 lines
118 KiB
Python
2235 lines
118 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
import re # Import the regular expression module
|
|
import time
|
|
import argparse
|
|
|
|
# Disable SSL warnings
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
|
|
|
class UnifiClient:
|
|
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.site_id = site_id
|
|
self.session = requests.Session()
|
|
self.session.verify = verify_ssl
|
|
|
|
# Initialize cookie jar
|
|
self.session.cookies.clear()
|
|
|
|
def _login(self) -> requests.Response: # Changed return type annotation
|
|
"""Authenticate with the UniFi Controller."""
|
|
login_url = f"{self.base_url}/api/auth/login"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
payload = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"remember": False
|
|
}
|
|
try:
|
|
response = self.session.post(
|
|
login_url,
|
|
json=payload,
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
if 'X-CSRF-Token' in response.headers:
|
|
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
|
|
return response # Return the response object
|
|
except requests.exceptions.RequestException as e:
|
|
if hasattr(e, 'response') and e.response.status_code == 401:
|
|
raise Exception("Authentication failed. Please verify your username and password.") from e
|
|
raise
|
|
|
|
def get_clients(self) -> list:
|
|
"""Get all clients from the UniFi Controller."""
|
|
# Try the newer API endpoint first
|
|
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If the newer endpoint fails, try the legacy endpoint
|
|
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If both fail, try the v2 API endpoint
|
|
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
|
|
class TasmotaDiscovery:
|
|
def __init__(self, debug: bool = False):
|
|
"""Initialize the TasmotaDiscovery with optional debug mode."""
|
|
log_level = logging.DEBUG if debug else logging.INFO
|
|
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if debug else '%(asctime)s - %(levelname)s - %(message)s'
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format=log_format,
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.config = None
|
|
self.unifi_client = None
|
|
|
|
def load_config(self, config_path: Optional[str] = None) -> dict:
|
|
"""Load configuration from JSON file."""
|
|
if config_path is None:
|
|
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
|
|
|
self.logger.debug(f"Loading configuration from: {config_path}")
|
|
try:
|
|
with open(config_path, 'r') as config_file:
|
|
self.config = json.load(config_file)
|
|
self.logger.debug("Configuration loaded successfully from %s", config_path)
|
|
return self.config
|
|
except FileNotFoundError:
|
|
self.logger.error(f"Configuration file not found at {config_path}")
|
|
sys.exit(1)
|
|
except json.JSONDecodeError:
|
|
self.logger.error("Invalid JSON in configuration file")
|
|
sys.exit(1)
|
|
|
|
def setup_unifi_client(self):
|
|
"""Set up the UniFi client with better error handling"""
|
|
self.logger.debug("Setting up UniFi client")
|
|
|
|
if not self.config or 'unifi' not in self.config:
|
|
raise ValueError("Missing UniFi configuration")
|
|
|
|
unifi_config = self.config['unifi']
|
|
required_fields = ['host', 'username', 'password', 'site']
|
|
missing_fields = [field for field in required_fields if field not in unifi_config]
|
|
|
|
if missing_fields:
|
|
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
|
|
|
|
try:
|
|
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
|
|
self.unifi_client = UnifiClient(
|
|
base_url=unifi_config['host'],
|
|
username=unifi_config['username'],
|
|
password=unifi_config['password'],
|
|
site_id=unifi_config['site'],
|
|
verify_ssl=False # Add this if using self-signed certificates
|
|
)
|
|
|
|
# Test the connection by making a simple request
|
|
response = self.unifi_client._login()
|
|
if not response:
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
|
|
|
|
self.logger.debug("UniFi client setup successful")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error setting up UniFi client: {str(e)}")
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
|
|
|
|
def is_tasmota_device(self, device: dict) -> bool:
|
|
"""Determine if a device is in the network_filter and not in exclude_patterns.
|
|
|
|
The function checks:
|
|
1. If the device's IP address starts with any subnet in network_filter
|
|
2. If the device's name or hostname does NOT match any exclude_patterns
|
|
|
|
Both checks use case-insensitive matching, and glob patterns (with *) in
|
|
exclude_patterns are converted to regex patterns for matching.
|
|
"""
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
ip = device.get('ip', '')
|
|
|
|
# Check if device is in the configured network
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
if ip.startswith(network['subnet']):
|
|
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
|
|
|
|
# Check if device should be excluded based on exclude_patterns
|
|
exclude_patterns = network.get('exclude_patterns', [])
|
|
if self.is_device_excluded(name, hostname, exclude_patterns, log_level='debug'):
|
|
return False
|
|
|
|
# Device is in the network and not excluded
|
|
self.logger.debug(f"Found device in network: {name}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def _match_pattern(self, text_or_texts, pattern: str, use_complex_matching: bool = False, match_entire_string: bool = False, log_level: str = 'debug') -> bool:
|
|
"""Common function to match a string or multiple strings against a pattern.
|
|
|
|
This function handles the regex pattern matching logic for both is_hostname_unknown
|
|
and is_device_excluded functions. It supports both simple prefix matching and more
|
|
complex matching for patterns that should match anywhere in the string.
|
|
|
|
Args:
|
|
text_or_texts: The string or list of strings to match against the pattern.
|
|
If a list is provided, the function returns True if any string matches.
|
|
pattern: The pattern to match against
|
|
use_complex_matching: Whether to use the more complex matching logic for patterns
|
|
starting with ^.* (default: False)
|
|
match_entire_string: Whether to match the entire string by adding $ at the end
|
|
of the regex pattern (default: False)
|
|
log_level: The logging level to use (default: 'debug')
|
|
|
|
Returns:
|
|
bool: True if any of the provided texts match the pattern, False otherwise
|
|
"""
|
|
# Convert pattern to lowercase for case-insensitive matching
|
|
pattern_lower = pattern.lower()
|
|
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Handle single string or list of strings
|
|
if isinstance(text_or_texts, str):
|
|
texts = [text_or_texts] if text_or_texts else []
|
|
else:
|
|
texts = [t for t in text_or_texts if t]
|
|
|
|
# If no valid texts to match, return False
|
|
if not texts:
|
|
return False
|
|
|
|
# Special case for patterns like ^.*something.* which should match anywhere in the string
|
|
if use_complex_matching and pattern_lower.startswith('^.*'):
|
|
# Extract the part after ^.* to use with re.search
|
|
search_part = pattern_lower[3:] # Remove the ^.* prefix
|
|
|
|
# For patterns that should match anywhere, we need to handle wildcards differently
|
|
# We want "sonos.*" to match "sonos", "mysonos", "sonosdevice", etc.
|
|
# We also want "sonos" to match exactly, without requiring characters after it
|
|
|
|
# First, handle the case where the search part ends with .*
|
|
if search_part.endswith('.*'):
|
|
# Remove the .* at the end and make it optional
|
|
base_part = search_part[:-2] # Remove the .* at the end
|
|
search_regex = base_part
|
|
else:
|
|
# For other patterns, just use the search part as is
|
|
search_regex = search_part
|
|
|
|
# Replace any remaining * with .* but not escape the dots
|
|
search_regex = search_regex.replace('*', '.*')
|
|
|
|
# Check each text
|
|
for text in texts:
|
|
# Use re.search to find the pattern anywhere in the string
|
|
if re.search(search_regex, text):
|
|
return True
|
|
else:
|
|
# Standard pattern matching (prefix matching)
|
|
# Convert glob pattern to regex pattern
|
|
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
|
|
|
|
# Check if pattern already starts with ^
|
|
if pattern_regex.startswith('^'):
|
|
regex_pattern = pattern_regex
|
|
else:
|
|
regex_pattern = f"^{pattern_regex}"
|
|
|
|
# Add $ at the end if matching entire string
|
|
if match_entire_string:
|
|
regex_pattern = f"{regex_pattern}$"
|
|
|
|
# Check each text
|
|
for text in texts:
|
|
# Use re.match to check if the string starts with the pattern
|
|
if re.match(regex_pattern, text):
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_device_hostname(self, ip: str, device_name: str = None, timeout: int = 5, log_level: str = 'debug') -> tuple:
|
|
"""Retrieve the hostname from a Tasmota device.
|
|
|
|
This function makes an HTTP request to a Tasmota device to retrieve its self-reported
|
|
hostname using the Status 5 command. It handles error conditions and provides
|
|
consistent logging.
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
device_name: Optional name of the device for logging purposes
|
|
timeout: Timeout for the HTTP request in seconds (default: 5)
|
|
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
|
|
|
|
Returns:
|
|
tuple: (hostname, success)
|
|
- hostname: The device's self-reported hostname, or empty string if not found
|
|
- success: Boolean indicating whether the hostname was successfully retrieved
|
|
|
|
Examples:
|
|
# Basic usage
|
|
hostname, success = manager.get_device_hostname("192.168.1.100")
|
|
if success:
|
|
print(f"Device hostname: {hostname}")
|
|
|
|
# With device name for better logging
|
|
hostname, success = manager.get_device_hostname("192.168.1.100", "Living Room Light")
|
|
|
|
# With custom timeout and log level
|
|
hostname, success = manager.get_device_hostname("192.168.1.100", timeout=10, log_level='info')
|
|
"""
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Use device_name in logs if provided, otherwise use IP
|
|
device_id = device_name if device_name else ip
|
|
|
|
hostname = ""
|
|
success = False
|
|
|
|
try:
|
|
# Log attempt to retrieve hostname
|
|
log_func(f"Retrieving hostname for {device_id} at {ip}")
|
|
|
|
# Make HTTP request to the device
|
|
url = f"http://{ip}/cm?cmnd=Status%205"
|
|
response = requests.get(url, timeout=timeout)
|
|
|
|
# Check if response is successful
|
|
if response.status_code == 200:
|
|
try:
|
|
# Parse JSON response
|
|
status_data = response.json()
|
|
|
|
# Extract hostname from response
|
|
hostname = status_data.get('StatusNET', {}).get('Hostname', '')
|
|
|
|
if hostname:
|
|
log_func(f"Successfully retrieved hostname for {device_id}: {hostname}")
|
|
success = True
|
|
else:
|
|
log_func(f"No hostname found in response for {device_id}")
|
|
except ValueError:
|
|
log_func(f"Failed to parse JSON response from {device_id}")
|
|
else:
|
|
log_func(f"Failed to get hostname for {device_id}: HTTP {response.status_code}")
|
|
except requests.exceptions.RequestException as e:
|
|
log_func(f"Error retrieving hostname for {device_id}: {str(e)}")
|
|
|
|
return hostname, success
|
|
|
|
def is_hostname_unknown(self, hostname: str, patterns: list = None, from_unifi_os: bool = False, ip: str = None) -> bool:
|
|
"""Check if a hostname matches any pattern in unknown_device_patterns.
|
|
|
|
This function provides a centralized way to check if a hostname matches any of the
|
|
unknown_device_patterns defined in the configuration. It uses case-insensitive
|
|
matching and supports glob patterns (with *) in the patterns list.
|
|
|
|
Args:
|
|
hostname: The hostname to check against unknown_device_patterns
|
|
patterns: Optional list of patterns to check against. If not provided,
|
|
patterns will be loaded from the configuration.
|
|
from_unifi_os: Whether the hostname is from Unifi OS (handles Unifi Hostname bug)
|
|
ip: IP address of the device. If provided, hostname validation is skipped.
|
|
|
|
Returns:
|
|
bool: True if the hostname matches any pattern, False otherwise
|
|
|
|
Examples:
|
|
# Check if a hostname matches any unknown_device_patterns in the config
|
|
if manager.is_hostname_unknown("tasmota_device123"):
|
|
print("This is an unknown device")
|
|
|
|
# Check against a specific list of patterns
|
|
custom_patterns = ["esp-*", "tasmota_*"]
|
|
if manager.is_hostname_unknown("esp-abcd", custom_patterns):
|
|
print("This matches a custom pattern")
|
|
|
|
# Check with Unifi Hostname bug handling
|
|
if manager.is_hostname_unknown("tasmota_device123", from_unifi_os=True):
|
|
print("This is an unknown device from Unifi OS")
|
|
|
|
# Skip hostname validation when IP is provided
|
|
if manager.is_hostname_unknown("", ip="192.168.1.100"):
|
|
print("This is an unknown device with IP")
|
|
"""
|
|
# If IP is provided and from_unifi_os is False, we can skip hostname validation
|
|
if ip and not from_unifi_os:
|
|
self.logger.debug(f"IP provided ({ip}) and from_unifi_os is False, skipping hostname validation")
|
|
return True
|
|
|
|
# If no patterns provided, get them from the configuration
|
|
if patterns is None:
|
|
patterns = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
# Convert hostname to lowercase for case-insensitive matching
|
|
hostname_lower = hostname.lower()
|
|
|
|
# Handle Unifi Hostname bug if hostname is from Unifi OS
|
|
if from_unifi_os and ip:
|
|
self.logger.debug(f"Handling hostname '{hostname}' from Unifi OS (bug handling enabled)")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
device_reported_hostname, success = self.get_device_hostname(ip, hostname, timeout=5, log_level='debug')
|
|
|
|
if success:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
for pattern in patterns:
|
|
if self._match_pattern(device_reported_hostname.lower(), pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.debug(f"Device's self-reported hostname '{device_reported_hostname}' matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
|
|
# this indicates the UniFi OS hostname bug
|
|
if not device_hostname_matches_unknown:
|
|
# First check if the UniFi-reported hostname matches unknown patterns
|
|
unifi_hostname_matches_unknown = False
|
|
for pattern in patterns:
|
|
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
|
|
unifi_hostname_matches_unknown = True
|
|
break
|
|
|
|
if unifi_hostname_matches_unknown:
|
|
self.logger.info(f"UniFi OS hostname bug detected for {hostname}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
|
|
return False # Not an unknown device despite what UniFi reports
|
|
elif from_unifi_os:
|
|
self.logger.debug(f"Cannot check device's self-reported hostname for {hostname}: No IP address provided")
|
|
|
|
# Check if hostname matches any pattern
|
|
for pattern in patterns:
|
|
if self._match_pattern(hostname_lower, pattern, match_entire_string=False):
|
|
self.logger.debug(f"Hostname '{hostname}' matches unknown device pattern: {pattern}")
|
|
return True
|
|
|
|
return False
|
|
|
|
def is_device_excluded(self, device_name: str, hostname: str = '', patterns: list = None, log_level: str = 'debug') -> bool:
|
|
"""Check if a device name or hostname matches any pattern in exclude_patterns.
|
|
|
|
This function provides a centralized way to check if a device should be excluded
|
|
based on its name or hostname matching any of the exclude_patterns defined in the
|
|
configuration. It uses case-insensitive matching and supports glob patterns (with *)
|
|
in the patterns list.
|
|
|
|
Args:
|
|
device_name: The device name to check against exclude_patterns
|
|
hostname: The device hostname to check against exclude_patterns (optional)
|
|
patterns: Optional list of patterns to check against. If not provided,
|
|
patterns will be loaded from the configuration.
|
|
log_level: The logging level to use ('debug', 'info', 'warning', 'error'). Default is 'debug'.
|
|
|
|
Returns:
|
|
bool: True if the device should be excluded (name or hostname matches any pattern),
|
|
False otherwise
|
|
|
|
Examples:
|
|
# Check if a device should be excluded based on patterns in the config
|
|
if manager.is_device_excluded("homeassistant", "homeassistant.local"):
|
|
print("This device should be excluded")
|
|
|
|
# Check against a specific list of patterns
|
|
custom_patterns = ["^homeassistant*", "^.*sonos.*"]
|
|
if manager.is_device_excluded("sonos-speaker", "sonos.local", custom_patterns, log_level='info'):
|
|
print("This device matches a custom exclude pattern")
|
|
"""
|
|
# If no patterns provided, get them from the configuration
|
|
if patterns is None:
|
|
patterns = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
patterns.extend(network.get('exclude_patterns', []))
|
|
|
|
# Convert device_name and hostname to lowercase for case-insensitive matching
|
|
name = device_name.lower() if device_name else ''
|
|
hostname_lower = hostname.lower() if hostname else ''
|
|
|
|
# Set up logging based on the specified level
|
|
log_func = getattr(self.logger, log_level)
|
|
|
|
# Create a list of texts to check (device name and hostname)
|
|
texts = [name, hostname_lower]
|
|
|
|
# Check if device name or hostname matches any pattern
|
|
for pattern in patterns:
|
|
pattern_lower = pattern.lower()
|
|
|
|
# Special case for patterns like ^.*something.* which should match anywhere in the string
|
|
if pattern_lower.startswith('^.*'):
|
|
# Use complex matching for patterns starting with ^.*
|
|
if self._match_pattern(texts, pattern, use_complex_matching=True, log_level=log_level):
|
|
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
|
|
return True
|
|
continue
|
|
|
|
# For normal patterns, match the entire string
|
|
if self._match_pattern(texts, pattern, match_entire_string=True, log_level=log_level):
|
|
log_func(f"Excluding device due to pattern '{pattern}': {device_name} ({hostname})")
|
|
return True
|
|
|
|
return False
|
|
|
|
def get_tasmota_devices(self) -> list:
|
|
"""Query UniFi controller and filter Tasmota devices.
|
|
|
|
This function:
|
|
1. Retrieves all clients from the UniFi controller
|
|
2. Filters them using is_tasmota_device() to find devices in the network_filter
|
|
that are not in exclude_patterns
|
|
3. Determines each device's connection type (Wireless/Wired)
|
|
4. Checks for the UniFi OS hostname bug whenever a device matches unknown_device_patterns
|
|
5. Creates a standardized device_info dictionary for each device
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing device information with fields:
|
|
- name: Device name or hostname
|
|
- ip: IP address
|
|
- mac: MAC address
|
|
- last_seen: When the device was last seen by UniFi
|
|
- hostname: Device hostname
|
|
- notes: Any notes from UniFi
|
|
- connection: Connection type (Wireless/Wired)
|
|
- unifi_hostname_bug_detected: Flag indicating if the UniFi OS hostname bug was detected
|
|
|
|
If an error occurs during the process, an empty list is returned.
|
|
"""
|
|
devices = []
|
|
self.logger.debug("Querying UniFi controller for devices")
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
|
|
# Get unknown device patterns for hostname bug detection
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_clients:
|
|
if self.is_tasmota_device(device):
|
|
# Determine connection type based on available fields
|
|
connection = "Unknown"
|
|
if device.get('essid'):
|
|
connection = f"Wireless - {device.get('essid')}"
|
|
elif device.get('radio') or device.get('wifi'):
|
|
connection = "Wireless"
|
|
elif device.get('port') or device.get('switch_port') or device.get('switch'):
|
|
connection = "Wired"
|
|
|
|
# Get device details
|
|
device_name = device.get('name', device.get('hostname', 'Unknown'))
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device.get('ip', '')
|
|
|
|
# Initialize the UniFi hostname bug flag
|
|
unifi_hostname_bug_detected = False
|
|
|
|
# Check if device name or hostname matches unknown patterns
|
|
unifi_name_matches_unknown = (
|
|
self.is_hostname_unknown(device_name, unknown_patterns) or
|
|
self.is_hostname_unknown(device_hostname, unknown_patterns)
|
|
)
|
|
if unifi_name_matches_unknown:
|
|
self.logger.debug(f"Device {device_name} matches unknown device pattern")
|
|
|
|
# If the name matches unknown patterns, check the device's self-reported hostname
|
|
# before declaring it unknown
|
|
#
|
|
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
|
|
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
|
|
if unifi_name_matches_unknown and device_ip:
|
|
self.logger.debug(f"Checking device's self-reported hostname for {device_name}")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='debug')
|
|
|
|
if success:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
for pattern in unknown_patterns:
|
|
if self._match_pattern(device_reported_hostname.lower(), pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.debug(f"Device's self-reported hostname '{device_reported_hostname}' matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# If UniFi name matches unknown patterns but device's self-reported name doesn't,
|
|
# this indicates the UniFi OS hostname bug
|
|
if not device_hostname_matches_unknown:
|
|
unifi_hostname_bug_detected = True
|
|
self.logger.info(f"UniFi OS hostname bug detected for {device_name}: self-reported hostname '{device_reported_hostname}' doesn't match unknown patterns")
|
|
|
|
device_info = {
|
|
"name": device_name,
|
|
"ip": device_ip,
|
|
"mac": device.get('mac', ''),
|
|
"last_seen": device.get('last_seen', ''),
|
|
"hostname": device_hostname,
|
|
"notes": device.get('note', ''),
|
|
"connection": connection,
|
|
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
|
|
}
|
|
devices.append(device_info)
|
|
|
|
self.logger.debug(f"Found {len(devices)} Tasmota devices")
|
|
return devices
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
return []
|
|
|
|
def save_tasmota_config(self, devices: list) -> None:
|
|
"""Save Tasmota device information to a JSON file with device tracking."""
|
|
filename = "current.json"
|
|
self.logger.debug(f"Saving Tasmota configuration to {filename}")
|
|
deprecated_filename = "deprecated.json"
|
|
|
|
current_devices = []
|
|
deprecated_devices = []
|
|
|
|
# Load existing devices if file exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
existing_config = json.load(f)
|
|
current_devices = existing_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {filename}, treating as empty")
|
|
current_devices = []
|
|
|
|
# Load deprecated devices if file exists
|
|
if os.path.exists(deprecated_filename):
|
|
try:
|
|
with open(deprecated_filename, 'r') as f:
|
|
deprecated_config = json.load(f)
|
|
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
|
|
deprecated_devices = []
|
|
|
|
# Create new config
|
|
new_devices = []
|
|
moved_to_deprecated = []
|
|
restored_from_deprecated = []
|
|
removed_from_deprecated = []
|
|
excluded_devices = []
|
|
|
|
# Get exclude_patterns for checking excluded devices
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
exclude_patterns = []
|
|
for network in network_filters.values():
|
|
exclude_patterns.extend(network.get('exclude_patterns', []))
|
|
|
|
# Process current devices
|
|
for device in devices:
|
|
device_name = device['name']
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device['ip']
|
|
device_mac = device['mac']
|
|
|
|
# Check if device should be excluded
|
|
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='info'):
|
|
print(f"Device {device_name} excluded by pattern - skipping")
|
|
excluded_devices.append(device_name)
|
|
continue
|
|
|
|
# Check in current devices
|
|
existing_device = next((d for d in current_devices
|
|
if d['name'] == device_name), None)
|
|
|
|
if existing_device:
|
|
# Device exists, check if IP or MAC changed
|
|
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
|
|
moved_to_deprecated.append(existing_device)
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
|
|
else:
|
|
new_devices.append(existing_device) # Keep existing device
|
|
else:
|
|
# New device, check if it was in deprecated
|
|
deprecated_device = next((d for d in deprecated_devices
|
|
if d['name'] == device_name), None)
|
|
if deprecated_device:
|
|
removed_from_deprecated.append(device_name)
|
|
print(f"Device {device_name} removed from deprecated (restored)")
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} added to output file")
|
|
|
|
# Find devices that are no longer present
|
|
current_names = {d['name'] for d in devices}
|
|
for existing_device in current_devices:
|
|
if existing_device['name'] not in current_names:
|
|
if not self.is_device_excluded(existing_device['name'], existing_device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
moved_to_deprecated.append(existing_device)
|
|
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
|
|
|
|
# Update deprecated devices list, excluding any excluded devices
|
|
final_deprecated = []
|
|
for device in deprecated_devices:
|
|
if device['name'] not in removed_from_deprecated and not self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
final_deprecated.append(device)
|
|
elif self.is_device_excluded(device['name'], device.get('hostname', ''), exclude_patterns, log_level='info'):
|
|
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
|
|
|
|
final_deprecated.extend(moved_to_deprecated)
|
|
|
|
# Save new configuration
|
|
config = {
|
|
"tasmota": {
|
|
"devices": new_devices,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(new_devices)
|
|
}
|
|
}
|
|
|
|
# Save deprecated configuration
|
|
deprecated_config = {
|
|
"tasmota": {
|
|
"devices": final_deprecated,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(final_deprecated)
|
|
}
|
|
}
|
|
|
|
# Backup existing file if it exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
backup_name = f"{filename}.backup"
|
|
os.rename(filename, backup_name)
|
|
self.logger.info(f"Created backup of existing configuration as {backup_name}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating backup: {e}")
|
|
|
|
# Save files
|
|
try:
|
|
with open(filename, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
with open(deprecated_filename, 'w') as f:
|
|
json.dump(deprecated_config, f, indent=4)
|
|
|
|
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
|
|
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
|
|
|
|
print("\nDevice Status Summary:")
|
|
if excluded_devices:
|
|
print("\nExcluded Devices:")
|
|
for name in excluded_devices:
|
|
print(f"- {name}")
|
|
|
|
if moved_to_deprecated:
|
|
print("\nMoved to deprecated:")
|
|
for device in moved_to_deprecated:
|
|
print(f"- {device['name']}")
|
|
|
|
if removed_from_deprecated:
|
|
print("\nRestored from deprecated:")
|
|
for name in removed_from_deprecated:
|
|
print(f"- {name}")
|
|
|
|
print("\nCurrent Tasmota Devices:")
|
|
for device in new_devices:
|
|
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving configuration: {e}")
|
|
|
|
def get_unknown_devices(self, use_current_json=True):
|
|
"""Identify devices that match unknown_device_patterns from current.json."""
|
|
self.logger.info("Identifying unknown devices for processing...")
|
|
unknown_devices = []
|
|
|
|
try:
|
|
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
|
with open(source_file, 'r') as f:
|
|
data = json.load(f)
|
|
all_devices = data.get('tasmota', {}).get('devices', [])
|
|
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
|
except FileNotFoundError:
|
|
self.logger.error(f"{source_file} not found. Run discovery first.")
|
|
return []
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Invalid JSON format in {source_file}")
|
|
return []
|
|
|
|
# Identify devices matching unknown_device_patterns
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_devices:
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
|
|
for pattern in unknown_patterns:
|
|
pattern = pattern.lower()
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern.startswith('^'):
|
|
regex_pattern = pattern
|
|
else:
|
|
regex_pattern = f"^{pattern}"
|
|
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
|
|
self.logger.debug(f"Found unknown device: {name} ({hostname})")
|
|
unknown_devices.append(device)
|
|
break
|
|
|
|
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
|
|
return unknown_devices
|
|
|
|
def process_unknown_devices(self):
|
|
"""Process unknown devices by checking for toggle button and configuring them.
|
|
|
|
This method:
|
|
1. Identifies devices matching unknown_device_patterns
|
|
2. Checks if each device has a toggle button (indicating it's a light/switch)
|
|
3. Toggles the button at 1/2Hz while checking for hostname changes
|
|
4. Prompts the user to enter a new name for the device in the console
|
|
5. Once a name is entered, configures the device with the new hostname
|
|
"""
|
|
unknown_devices = self.get_unknown_devices()
|
|
if not unknown_devices:
|
|
self.logger.info("No unknown devices found to process")
|
|
return
|
|
|
|
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
|
|
|
|
for device in unknown_devices:
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
connection = device.get('connection', 'Unknown')
|
|
|
|
if not ip:
|
|
self.logger.warning(f"Skipping device {name} - no IP address")
|
|
continue
|
|
|
|
self.logger.info(f"Processing unknown device: {name} at {ip} with connection {connection}")
|
|
|
|
# Check if device has a toggle button
|
|
try:
|
|
# Get the main page to check for toggle button
|
|
url = f"http://{ip}/"
|
|
response = requests.get(url, timeout=5)
|
|
|
|
# Check if there's a toggle button in the response
|
|
has_toggle = "toggle" in response.text.lower()
|
|
|
|
if has_toggle:
|
|
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
|
|
|
|
# Start toggling at 1/2Hz
|
|
original_hostname = device.get('hostname', '')
|
|
toggle_state = False
|
|
|
|
# Temporarily disable all logging during toggling
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
try:
|
|
# Clear console output and show prompt
|
|
print("\n" + "="*50)
|
|
print(f"DEVICE: {name} at IP: {ip} Connection: {connection}")
|
|
print(f"Current hostname: {original_hostname}")
|
|
print("="*50)
|
|
print("The device is now toggling to help you identify it.")
|
|
|
|
# Start toggling in background while waiting for input
|
|
import threading
|
|
stop_toggle = threading.Event()
|
|
|
|
def toggle_device():
|
|
toggle_state = False
|
|
while not stop_toggle.is_set():
|
|
toggle_state = not toggle_state
|
|
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
|
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
|
|
try:
|
|
requests.get(toggle_url, timeout=2)
|
|
except:
|
|
pass
|
|
time.sleep(2.0) # 1/2Hz rate
|
|
|
|
# Start toggle thread
|
|
toggle_thread = threading.Thread(target=toggle_device)
|
|
toggle_thread.daemon = True
|
|
toggle_thread.start()
|
|
|
|
# Prompt for new hostname
|
|
print("\nPlease enter a new name for this device:")
|
|
print("(Enter nothing, 'unknown', or 'na' to assume device could not be found and end)")
|
|
new_hostname = input("> ").strip()
|
|
|
|
# Stop toggling
|
|
stop_toggle.set()
|
|
toggle_thread.join(timeout=3)
|
|
|
|
# Check for special inputs that indicate device could not be found
|
|
if not new_hostname or new_hostname.lower() == "unknown" or new_hostname.lower() == "na":
|
|
print("Assuming device could not be found, ending process")
|
|
return # End the entire process
|
|
|
|
if new_hostname != original_hostname:
|
|
print(f"Setting new hostname to: {new_hostname}")
|
|
else:
|
|
print("No valid hostname entered, skipping device")
|
|
new_hostname = ""
|
|
|
|
finally:
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
|
|
# If a new hostname was entered, configure the device
|
|
if new_hostname:
|
|
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
|
|
self.configure_unknown_device(ip, new_hostname)
|
|
else:
|
|
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
|
|
else:
|
|
self.logger.info(f"Device {name} does not have a toggle button, skipping")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
|
|
|
def check_and_update_template(self, ip, name):
|
|
"""Check and update device template based on config_other settings.
|
|
|
|
Algorithm:
|
|
1. Get the device name from the Configuration/Other page using Status 0
|
|
2. Get the current template using Template command
|
|
3. Check if any key in config_other matches the device name
|
|
4. If a match is found, check if the template matches the value
|
|
5. If the template doesn't match, write the value to the template
|
|
6. If no key matches, check if any value matches the template
|
|
7. If a value match is found, write the key to the device name
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
name: The name/hostname of the device
|
|
|
|
Returns:
|
|
bool: True if template was updated, False otherwise
|
|
"""
|
|
try:
|
|
# Get config_other settings
|
|
config_other = self.config.get('config_other', {})
|
|
if not config_other:
|
|
self.logger.debug(f"{name}: No config_other settings found in configuration")
|
|
return False
|
|
|
|
# Get Status 0 for device name from Configuration/Other page with increased timeout
|
|
url_status0 = f"http://{ip}/cm?cmnd=Status%200"
|
|
try:
|
|
self.logger.debug(f"{name}: Getting Status 0 with increased timeout (10 seconds)")
|
|
response = requests.get(url_status0, timeout=10)
|
|
status0_data = response.json()
|
|
|
|
# Log the actual response format for debugging
|
|
self.logger.debug(f"{name}: Status 0 response: {status0_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting Status 0 (10 seconds) - device may be busy")
|
|
# Try one more time with even longer timeout
|
|
try:
|
|
self.logger.debug(f"{name}: Retrying Status 0 with 20 second timeout")
|
|
response = requests.get(url_status0, timeout=20)
|
|
status0_data = response.json()
|
|
self.logger.debug(f"{name}: Status 0 response on retry: {status0_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting Status 0 even with 20 second timeout")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting Status 0 on retry: {str(e)}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting Status 0: {str(e)}")
|
|
return False
|
|
|
|
# Extract device name from Status 0 response
|
|
device_name = status0_data.get("Status", {}).get("DeviceName", "")
|
|
if not device_name:
|
|
self.logger.debug(f"{name}: Could not get device name from Status 0")
|
|
return False
|
|
|
|
self.logger.debug(f"{name}: Device name from Configuration/Other page: {device_name}")
|
|
|
|
# Get current template with increased timeout
|
|
url_template = f"http://{ip}/cm?cmnd=Template"
|
|
try:
|
|
self.logger.debug(f"{name}: Getting template with increased timeout (10 seconds)")
|
|
response = requests.get(url_template, timeout=10)
|
|
template_data = response.json()
|
|
|
|
# Log the actual response format for debugging
|
|
self.logger.debug(f"{name}: Template response: {template_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting template (10 seconds) - device may be busy")
|
|
# Try one more time with even longer timeout
|
|
try:
|
|
self.logger.debug(f"{name}: Retrying with 20 second timeout")
|
|
response = requests.get(url_template, timeout=20)
|
|
template_data = response.json()
|
|
self.logger.debug(f"{name}: Template response on retry: {template_data}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout getting template even with 20 second timeout")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting template on retry: {str(e)}")
|
|
return False
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error getting template: {str(e)}")
|
|
return False
|
|
|
|
# Extract current template - handle different response formats
|
|
current_template = ""
|
|
|
|
# Try different possible response formats
|
|
if "Template" in template_data:
|
|
current_template = template_data.get("Template", "")
|
|
elif isinstance(template_data, dict) and len(template_data) > 0:
|
|
# If there's no "Template" key but we have a dict, try to get the first value
|
|
# This handles cases where the response might be {"NAME":"...","GPIO":[...]}
|
|
first_key = next(iter(template_data))
|
|
if isinstance(template_data[first_key], str) and "{" in template_data[first_key]:
|
|
current_template = template_data[first_key]
|
|
self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}")
|
|
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
|
|
elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
|
# Convert the dict to a JSON string to match the expected format
|
|
import json
|
|
current_template = json.dumps(template_data)
|
|
self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys")
|
|
|
|
if not current_template:
|
|
self.logger.debug(f"{name}: Could not get current template from response")
|
|
return False
|
|
|
|
self.logger.debug(f"{name}: Current template: {current_template}")
|
|
|
|
# Check if any key in config_other matches the device name
|
|
template_updated = False
|
|
if device_name in config_other:
|
|
# Key matches device name, check if value is blank or empty
|
|
template_value = config_other[device_name]
|
|
if not template_value or template_value.strip() == "":
|
|
# Value is blank or empty, print message and skip template check
|
|
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but value is blank or empty")
|
|
print(f"\nDevice {name} at {ip} must be set manually in Configuration/Module to: {device_name}")
|
|
print(f"The config_other entry has a blank value for key: {device_name}")
|
|
return False
|
|
elif current_template != template_value:
|
|
# Template doesn't match, write value to template
|
|
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but template doesn't match")
|
|
self.logger.info(f"{name}: Setting template to: {template_value}")
|
|
|
|
# URL encode the template value
|
|
import urllib.parse
|
|
encoded_value = urllib.parse.quote(template_value)
|
|
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
|
|
|
|
try:
|
|
self.logger.debug(f"{name}: Setting template with 10 second timeout")
|
|
response = requests.get(url, timeout=10)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Template updated successfully")
|
|
|
|
# Activate the template by setting module to 0 (Template module)
|
|
self.logger.info(f"{name}: Activating template by setting module to 0")
|
|
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
|
try:
|
|
module_response = requests.get(module_url, timeout=10)
|
|
if module_response.status_code == 200:
|
|
self.logger.info(f"{name}: Module set to 0 successfully")
|
|
|
|
# Restart the device to apply the template
|
|
self.logger.info(f"{name}: Restarting device to apply template")
|
|
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
try:
|
|
restart_response = requests.get(restart_url, timeout=10)
|
|
if restart_response.status_code == 200:
|
|
self.logger.info(f"{name}: Device restart initiated successfully")
|
|
template_updated = True
|
|
else:
|
|
self.logger.error(f"{name}: Failed to restart device: HTTP {restart_response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout restarting device (10 seconds)")
|
|
# Even though restart timed out, it might have worked
|
|
self.logger.info(f"{name}: Assuming restart was successful despite timeout")
|
|
template_updated = True
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error restarting device: {str(e)}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set module to 0: HTTP {module_response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout setting module to 0 (10 seconds)")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error setting module to 0: {str(e)}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to update template: HTTP {response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout updating template (10 seconds)")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating template: {str(e)}")
|
|
else:
|
|
self.logger.debug(f"{name}: Device name '{device_name}' matches key in config_other and template matches value")
|
|
else:
|
|
# No key matches device name, check if any value matches the template
|
|
matching_key = None
|
|
for key, value in config_other.items():
|
|
if value == current_template:
|
|
matching_key = key
|
|
break
|
|
|
|
if matching_key:
|
|
# Value matches template, write key to device name
|
|
self.logger.info(f"{name}: Template matches value for key '{matching_key}' in config_other")
|
|
self.logger.info(f"{name}: Setting device name to: {matching_key}")
|
|
|
|
url = f"http://{ip}/cm?cmnd=DeviceName%20{matching_key}"
|
|
try:
|
|
self.logger.debug(f"{name}: Setting device name with 10 second timeout")
|
|
response = requests.get(url, timeout=10)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Device name updated successfully")
|
|
|
|
# Activate the template by setting module to 0 (Template module)
|
|
self.logger.info(f"{name}: Activating template by setting module to 0")
|
|
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
|
try:
|
|
module_response = requests.get(module_url, timeout=10)
|
|
if module_response.status_code == 200:
|
|
self.logger.info(f"{name}: Module set to 0 successfully")
|
|
|
|
# Restart the device to apply the template
|
|
self.logger.info(f"{name}: Restarting device to apply template")
|
|
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
try:
|
|
restart_response = requests.get(restart_url, timeout=10)
|
|
if restart_response.status_code == 200:
|
|
self.logger.info(f"{name}: Device restart initiated successfully")
|
|
template_updated = True
|
|
else:
|
|
self.logger.error(f"{name}: Failed to restart device: HTTP {restart_response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout restarting device (10 seconds)")
|
|
# Even though restart timed out, it might have worked
|
|
self.logger.info(f"{name}: Assuming restart was successful despite timeout")
|
|
template_updated = True
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error restarting device: {str(e)}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set module to 0: HTTP {module_response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout setting module to 0 (10 seconds)")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error setting module to 0: {str(e)}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to update device name: HTTP {response.status_code}")
|
|
except requests.exceptions.Timeout:
|
|
self.logger.error(f"{name}: Timeout updating device name (10 seconds)")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating device name: {str(e)}")
|
|
else:
|
|
# No matches found, print detailed information about what's on the device
|
|
self.logger.info(f"{name}: No matches found in config_other for either Device Name or Template")
|
|
self.logger.info(f"{name}: Current Device Name on device: '{device_name}'")
|
|
self.logger.info(f"{name}: Current Template on device: '{current_template}'")
|
|
print(f"\nNo template match found for device {name} at {ip}")
|
|
print(f" Device Name on device: '{device_name}'")
|
|
print(f" Template on device: '{current_template}'")
|
|
print("Please add an appropriate entry to config_other in your configuration file.")
|
|
|
|
return template_updated
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error checking/updating template for device at {ip}: {str(e)}")
|
|
return False
|
|
|
|
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False):
|
|
"""Configure MQTT settings for a device.
|
|
|
|
Args:
|
|
ip: The IP address of the device
|
|
name: The name/hostname of the device
|
|
mqtt_status: The current MQTT status of the device (from Status 6)
|
|
is_new_device: Whether this is a new device (True) or existing device (False)
|
|
set_friendly_name: Whether to set the friendly name
|
|
enable_mqtt: Whether to enable MQTT
|
|
with_retry: Whether to use retry logic
|
|
reboot: Whether to reboot the device after configuration
|
|
|
|
Returns:
|
|
bool: True if configuration was successful, False otherwise
|
|
"""
|
|
try:
|
|
# Set Friendly Name if requested
|
|
if set_friendly_name:
|
|
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{name}"
|
|
response = requests.get(friendly_name_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Set Friendly Name to {name}")
|
|
else:
|
|
self.logger.error(f"Failed to set Friendly Name to {name}")
|
|
|
|
# Enable MQTT if requested
|
|
if enable_mqtt:
|
|
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
|
|
response = requests.get(mqtt_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Enabled MQTT for {name}")
|
|
else:
|
|
self.logger.error(f"Failed to enable MQTT for {name}")
|
|
|
|
# Configure MQTT settings
|
|
mqtt_config = self.config.get('mqtt', {})
|
|
if not mqtt_config:
|
|
self.logger.error("MQTT configuration missing from config file")
|
|
return False
|
|
|
|
# Get the base hostname (everything before the dash)
|
|
hostname_base = name.split('-')[0] if '-' in name else name
|
|
|
|
# Define MQTT fields
|
|
mqtt_fields = {
|
|
"MqttHost": mqtt_config.get('Host', ''),
|
|
"MqttPort": mqtt_config.get('Port', 1883),
|
|
"MqttUser": mqtt_config.get('User', ''),
|
|
"MqttPassword": mqtt_config.get('Password', ''),
|
|
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
|
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
|
}
|
|
|
|
# For existing devices, check if MQTT settings need to be updated
|
|
changes_needed = []
|
|
if not is_new_device and mqtt_status:
|
|
device_mqtt = mqtt_status.get('MqttHost', {})
|
|
force_password_update = False
|
|
|
|
# Check each MQTT setting
|
|
if device_mqtt.get('Host') != mqtt_fields['MqttHost']:
|
|
changes_needed.append(('MqttHost', mqtt_fields['MqttHost']))
|
|
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['MqttHost']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Port') != mqtt_fields['MqttPort']:
|
|
changes_needed.append(('MqttPort', mqtt_fields['MqttPort']))
|
|
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['MqttPort']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('User') != mqtt_fields['MqttUser']:
|
|
changes_needed.append(('MqttUser', mqtt_fields['MqttUser']))
|
|
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['MqttUser']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
|
|
changes_needed.append(('Topic', mqtt_fields['Topic']))
|
|
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
|
|
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
|
|
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
|
|
force_password_update = True
|
|
|
|
# Add password update if any MQTT setting changed or user was updated
|
|
if force_password_update:
|
|
changes_needed.append(('MqttPassword', mqtt_fields['MqttPassword']))
|
|
self.logger.debug(f"{name}: MQTT Password will be updated")
|
|
|
|
# Check NoRetain setting
|
|
no_retain = mqtt_config.get('NoRetain', False)
|
|
if no_retain:
|
|
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
|
|
else:
|
|
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
|
|
else:
|
|
# For new devices, set all MQTT settings
|
|
for field, value in mqtt_fields.items():
|
|
changes_needed.append((field, value))
|
|
|
|
# Apply MQTT settings
|
|
mqtt_updated = False
|
|
for setting, value in changes_needed:
|
|
try:
|
|
# For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value
|
|
if setting == "FullTopic":
|
|
url = f"http://{ip}/cm?cmnd={setting}{value}"
|
|
else:
|
|
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.debug(f"{name}: Updated {setting} to {value}")
|
|
else:
|
|
self.logger.debug(f"{name}: Updated MQTT Password")
|
|
mqtt_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to update {setting} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout updating {setting} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error updating {setting}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to update {setting} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{setting} {value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.info(f"{name}: Set {setting} to {value}")
|
|
else:
|
|
self.logger.info(f"{name}: Set MQTT Password")
|
|
mqtt_updated = True
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {setting}")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
|
|
|
|
# Apply console settings
|
|
console_updated = False
|
|
console_params = self.config.get('console', {})
|
|
if console_params:
|
|
self.logger.info(f"{name}: Setting console parameters from configuration")
|
|
|
|
# Special handling for Retain parameters - need to send opposite state first, then final state
|
|
# This is necessary because the changes are what create the update of the Retain state at the MQTT server
|
|
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
|
|
|
|
# Process Retain parameters first
|
|
for param in retain_params:
|
|
if param in console_params:
|
|
try:
|
|
final_value = console_params[param]
|
|
# Set opposite state first
|
|
opposite_value = "On" if final_value.lower() == "off" else "Off"
|
|
|
|
if with_retry:
|
|
# First command (opposite state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {opposite_value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# First command (opposite state) - without retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {param} to {opposite_value}")
|
|
|
|
# Small delay to ensure commands are processed in order
|
|
time.sleep(0.5)
|
|
|
|
if with_retry:
|
|
# Second command (final state) - with retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
|
|
success = False
|
|
attempts = 0
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {final_value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Second command (final state) - without retry logic
|
|
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set {param} to {final_value}")
|
|
except Exception as e:
|
|
self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
|
|
# Track the failure for later reporting if using retry logic
|
|
if with_retry:
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} (both steps)",
|
|
"error": str(e)
|
|
})
|
|
|
|
# Process all other console parameters
|
|
# Track rules that need to be enabled
|
|
rules_to_enable = {}
|
|
|
|
for param, value in console_params.items():
|
|
# Skip Retain parameters as they're handled specially above
|
|
if param in retain_params:
|
|
continue
|
|
|
|
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
# Store the rule number for later enabling
|
|
rule_num = param[-1]
|
|
rules_to_enable[rule_num] = True
|
|
if with_retry:
|
|
self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable")
|
|
else:
|
|
self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable")
|
|
|
|
# Skip Rule1, Rule2, etc. if we're auto-enabling rules and using retry logic
|
|
if with_retry and param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit():
|
|
# If this is in the config, we'll respect it, but log that it's not needed
|
|
self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature")
|
|
|
|
# Regular console parameter
|
|
# Special handling for rule parameters to properly encode the URL
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
# For rule commands, we need to URL encode the entire value to preserve special characters
|
|
import urllib.parse
|
|
encoded_value = urllib.parse.quote(value)
|
|
url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}"
|
|
self.logger.info(f"{name}: Sending rule command: {url}")
|
|
else:
|
|
url = f"http://{ip}/cm?cmnd={param}%20{value}"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
# Special logging for rule parameters
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
self.logger.info(f"{name}: Rule command response: {response.text}")
|
|
self.logger.info(f"{name}: Set rule {param} to '{value}'")
|
|
else:
|
|
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{param} {value}",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
|
|
self.logger.info(f"{name}: Rule command response: {response.text}")
|
|
self.logger.info(f"{name}: Set rule {param} to '{value}'")
|
|
else:
|
|
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to set console parameter {param}")
|
|
|
|
# Auto-enable any rules that were defined
|
|
if with_retry:
|
|
self.logger.info(f"{name}: Rules to enable: {rules_to_enable}")
|
|
|
|
for rule_num in rules_to_enable:
|
|
rule_enable_param = f"Rule{rule_num}"
|
|
|
|
# Skip if the rule enable command was already in the config
|
|
if with_retry:
|
|
# Check if the uppercase version (Rule1) is in the config
|
|
if rule_enable_param in console_params:
|
|
self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)")
|
|
continue
|
|
|
|
# If we're here, it means we found a rule definition earlier and added it to rules_to_enable
|
|
# No need to check again if it's in console_params
|
|
self.logger.info(f"{name}: Will enable {rule_enable_param} for rule definition found in config")
|
|
else:
|
|
# Simple check for any version of the rule enable command
|
|
if any(p.lower() == rule_enable_param.lower() for p in console_params):
|
|
continue
|
|
|
|
# Rule auto-enabling
|
|
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
|
|
|
|
if with_retry:
|
|
# With retry logic
|
|
success = False
|
|
attempts = 0
|
|
max_attempts = 3
|
|
last_error = None
|
|
|
|
while not success and attempts < max_attempts:
|
|
attempts += 1
|
|
try:
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
|
|
console_updated = True
|
|
success = True
|
|
else:
|
|
self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})")
|
|
last_error = f"HTTP {response.status_code}"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.Timeout as e:
|
|
self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})")
|
|
last_error = "Timeout"
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})")
|
|
last_error = str(e)
|
|
if attempts < max_attempts:
|
|
time.sleep(1) # Wait before retry
|
|
|
|
if not success:
|
|
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}")
|
|
# Track the failure for later reporting
|
|
if not hasattr(self, 'command_failures'):
|
|
self.command_failures = []
|
|
self.command_failures.append({
|
|
"device": name,
|
|
"ip": ip,
|
|
"command": f"{rule_enable_param} 1",
|
|
"error": last_error
|
|
})
|
|
else:
|
|
# Without retry logic
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}")
|
|
|
|
# Reboot the device if requested
|
|
if reboot:
|
|
save_url = f"http://{ip}/cm?cmnd=Restart%201"
|
|
response = requests.get(save_url, timeout=5)
|
|
if response.status_code == 200:
|
|
self.logger.info(f"Saved configuration and rebooted {name}")
|
|
else:
|
|
self.logger.error(f"Failed to save configuration for {name}")
|
|
|
|
return mqtt_updated or console_updated
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
|
|
return False
|
|
|
|
def configure_unknown_device(self, ip, hostname):
|
|
"""Configure an unknown device with the given hostname and MQTT settings."""
|
|
return self.configure_mqtt_settings(
|
|
ip=ip,
|
|
name=hostname,
|
|
is_new_device=True,
|
|
set_friendly_name=True,
|
|
enable_mqtt=True,
|
|
with_retry=False,
|
|
reboot=True
|
|
)
|
|
|
|
def is_ip_in_network_filter(self, ip_address):
|
|
"""Check if an IP address is in any of the configured network filters.
|
|
|
|
Args:
|
|
ip_address: The IP address to check
|
|
|
|
Returns:
|
|
tuple: (is_in_network, target_network, network_name) where:
|
|
- is_in_network is a boolean indicating if the IP is in a network
|
|
- target_network is the network configuration dict or None
|
|
- network_name is the name of the network or None
|
|
"""
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
|
|
for network_name, network in network_filters.items():
|
|
if ip_address.startswith(network['subnet']):
|
|
self.logger.info(f"IP {ip_address} is in network: {network_name}")
|
|
return True, network, network_name
|
|
|
|
self.logger.error(f"IP {ip_address} is not in any configured network")
|
|
return False, None, None
|
|
|
|
def process_single_device(self, device_identifier):
|
|
"""Process a single device by hostname or IP address.
|
|
|
|
Args:
|
|
device_identifier: Either a hostname or IP address
|
|
|
|
Returns:
|
|
bool: True if device was processed successfully, False otherwise
|
|
"""
|
|
self.logger.info(f"Processing single device: {device_identifier}")
|
|
|
|
# Check if device_identifier is an IP address or hostname
|
|
is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier))
|
|
|
|
# If it's an IP address, check if it's in the network_filter first
|
|
if is_ip:
|
|
in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier)
|
|
if not in_network:
|
|
return False
|
|
|
|
# Setup Unifi client if not already done
|
|
if not self.unifi_client:
|
|
try:
|
|
self.setup_unifi_client()
|
|
except ConnectionError as e:
|
|
self.logger.error(f"Failed to connect to UniFi controller: {str(e)}")
|
|
return False
|
|
|
|
# Get all clients from Unifi
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
return False
|
|
|
|
# Find the device in Unifi
|
|
target_device = None
|
|
if is_ip:
|
|
# Search by IP
|
|
self.logger.debug(f"Searching for device with IP: {device_identifier}")
|
|
target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None)
|
|
if not target_device:
|
|
self.logger.error(f"No device found with IP: {device_identifier}")
|
|
return False
|
|
else:
|
|
# Search by hostname - support partial and wildcard matches
|
|
self.logger.debug(f"Searching for device with hostname: {device_identifier}")
|
|
|
|
# Check if the identifier contains wildcards
|
|
has_wildcards = '*' in device_identifier
|
|
|
|
# Convert wildcards to regex pattern if present
|
|
if has_wildcards:
|
|
pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*')
|
|
self.logger.debug(f"Using wildcard pattern: {pattern}")
|
|
else:
|
|
# For partial matches, we'll use the identifier as a substring
|
|
pattern = device_identifier.lower()
|
|
self.logger.debug(f"Using partial match pattern: {pattern}")
|
|
|
|
# Find all matching devices
|
|
matching_devices = []
|
|
for device in all_clients:
|
|
hostname = device.get('hostname', '').lower()
|
|
name = device.get('name', '').lower()
|
|
|
|
if has_wildcards:
|
|
# For wildcard matches, use regex
|
|
if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)):
|
|
matching_devices.append(device)
|
|
else:
|
|
# For partial matches, check if pattern is a substring
|
|
if pattern in hostname or pattern in name:
|
|
matching_devices.append(device)
|
|
|
|
# Handle the results
|
|
if not matching_devices:
|
|
self.logger.error(f"No devices found matching: {device_identifier}")
|
|
return False
|
|
elif len(matching_devices) > 1:
|
|
# Multiple matches found - log them and use the first one
|
|
self.logger.warning(f"Multiple devices found matching '{device_identifier}':")
|
|
for i, device in enumerate(matching_devices, 1):
|
|
device_name = device.get('name', device.get('hostname', 'Unknown'))
|
|
device_ip = device.get('ip', '')
|
|
self.logger.warning(f" {i}. {device_name} (IP: {device_ip})")
|
|
self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}")
|
|
|
|
# Use the first (or only) matching device
|
|
target_device = matching_devices[0]
|
|
|
|
# Get device details
|
|
device_name = target_device.get('name', target_device.get('hostname', 'Unknown'))
|
|
device_hostname = target_device.get('hostname', '')
|
|
device_ip = target_device.get('ip', '')
|
|
device_mac = target_device.get('mac', '')
|
|
|
|
self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})")
|
|
|
|
# If we're processing a hostname (not an IP), check if the device's IP is in the network_filter
|
|
if not is_ip:
|
|
in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip)
|
|
if not in_network:
|
|
self.logger.error(f"Device {device_name} is not in any configured network")
|
|
return False
|
|
# For IP addresses, we already have the target_network from the earlier check
|
|
|
|
# Check if device is excluded
|
|
exclude_patterns = target_network.get('exclude_patterns', [])
|
|
if self.is_device_excluded(device_name, device_hostname, exclude_patterns, log_level='error'):
|
|
return False
|
|
|
|
# Check if device is in unknown_device_patterns
|
|
unknown_patterns = target_network.get('unknown_device_patterns', [])
|
|
is_unknown = False
|
|
|
|
# Initialize variables for hostname bug detection
|
|
unifi_name_matches_unknown = False
|
|
device_hostname_matches_unknown = False
|
|
for pattern in unknown_patterns:
|
|
pattern_lower = pattern.lower()
|
|
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern_regex.startswith('^'):
|
|
regex_pattern = pattern_regex
|
|
else:
|
|
regex_pattern = f"^{pattern_regex}"
|
|
if (re.match(regex_pattern, device_name.lower()) or
|
|
re.match(regex_pattern, device_hostname.lower())):
|
|
unifi_name_matches_unknown = True
|
|
self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}")
|
|
break
|
|
|
|
# If the name matches unknown patterns, check the device's self-reported hostname
|
|
# before declaring it unknown
|
|
#
|
|
# This addresses a UniFi OS bug where it doesn't keep track of updated hostnames.
|
|
# When a hostname is updated and the connection reset, UniFi may not pick up the new name.
|
|
# By checking the device's self-reported hostname, we can determine if the device
|
|
# actually has a real hostname that UniFi is not showing correctly.
|
|
if unifi_name_matches_unknown:
|
|
self.logger.info(f"Checking device's self-reported hostname before declaring unknown")
|
|
|
|
# Get the device's self-reported hostname using the common function
|
|
device_reported_hostname, success = self.get_device_hostname(device_ip, device_name, timeout=5, log_level='info')
|
|
|
|
if success and device_reported_hostname:
|
|
# Check if the self-reported hostname also matches unknown patterns
|
|
device_hostname_matches_unknown = False
|
|
for pattern in unknown_patterns:
|
|
if self._match_pattern(device_reported_hostname.lower(), pattern, match_entire_string=False):
|
|
device_hostname_matches_unknown = True
|
|
self.logger.info(f"Device's self-reported hostname '{device_reported_hostname}' matches unknown pattern: {pattern}")
|
|
break
|
|
|
|
# Only declare as unknown if both UniFi-reported and self-reported hostnames match unknown patterns
|
|
if device_hostname_matches_unknown:
|
|
is_unknown = True
|
|
self.logger.info("Device declared as unknown: both UniFi-reported and self-reported hostnames match unknown patterns")
|
|
else:
|
|
is_unknown = False
|
|
self.logger.info("Device NOT declared as unknown: self-reported hostname doesn't match unknown patterns (possible UniFi OS bug)")
|
|
else:
|
|
# No self-reported hostname found or error occurred, fall back to UniFi-reported name
|
|
is_unknown = unifi_name_matches_unknown
|
|
self.logger.info("Failed to get device's self-reported hostname, using UniFi-reported name")
|
|
else:
|
|
# Not in Device mode or name doesn't match unknown patterns, use the standard check
|
|
is_unknown = unifi_name_matches_unknown
|
|
|
|
# Determine connection type based on available fields
|
|
connection = "Unknown"
|
|
if target_device.get('essid'):
|
|
connection = f"Wireless - {target_device.get('essid')}"
|
|
elif target_device.get('radio') or target_device.get('wifi'):
|
|
connection = "Wireless"
|
|
elif target_device.get('port') or target_device.get('switch_port') or target_device.get('switch'):
|
|
connection = "Wired"
|
|
|
|
# Create a device info dictionary
|
|
# Add unifi_hostname_bug_detected flag to indicate when the UniFi OS hostname bug is detected
|
|
# (when UniFi name matches unknown patterns but device's self-reported name doesn't)
|
|
unifi_hostname_bug_detected = (unifi_name_matches_unknown and not is_unknown and
|
|
not device_hostname_matches_unknown)
|
|
|
|
device_info = {
|
|
"name": device_name,
|
|
"ip": device_ip,
|
|
"mac": device_mac,
|
|
"last_seen": target_device.get('last_seen', ''),
|
|
"hostname": device_hostname,
|
|
"notes": target_device.get('note', ''),
|
|
"connection": connection,
|
|
"unifi_hostname_bug_detected": unifi_hostname_bug_detected
|
|
}
|
|
|
|
# Process the device based on whether it's unknown or not
|
|
if is_unknown:
|
|
self.logger.info(f"Processing unknown device: {device_name}")
|
|
|
|
# Check if device has a toggle button
|
|
try:
|
|
# Get the main page to check for toggle button
|
|
url = f"http://{device_ip}/"
|
|
response = requests.get(url, timeout=5)
|
|
|
|
# Check if there's a toggle button in the response
|
|
has_toggle = "toggle" in response.text.lower()
|
|
|
|
if has_toggle:
|
|
self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate")
|
|
|
|
# Start toggling at 1/2Hz
|
|
toggle_state = False
|
|
|
|
# Temporarily disable all logging during toggling
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
try:
|
|
# Clear console output and show prompt
|
|
print("\n" + "="*50)
|
|
print(f"DEVICE: {device_name} at IP: {device_ip} Connection: {connection}")
|
|
print(f"Current hostname: {device_hostname}")
|
|
print("="*50)
|
|
print("The device is now toggling to help you identify it.")
|
|
|
|
# Start toggling in background while waiting for input
|
|
import threading
|
|
stop_toggle = threading.Event()
|
|
|
|
def toggle_device():
|
|
toggle_state = False
|
|
while not stop_toggle.is_set():
|
|
toggle_state = not toggle_state
|
|
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
|
toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}"
|
|
try:
|
|
requests.get(toggle_url, timeout=2)
|
|
except:
|
|
pass
|
|
time.sleep(2.0) # 1/2Hz rate
|
|
|
|
# Start toggle thread
|
|
toggle_thread = threading.Thread(target=toggle_device)
|
|
toggle_thread.daemon = True
|
|
toggle_thread.start()
|
|
|
|
# Prompt for new hostname
|
|
print("\nPlease enter a new name for this device:")
|
|
new_hostname = input("> ").strip()
|
|
|
|
# Stop toggling
|
|
stop_toggle.set()
|
|
toggle_thread.join(timeout=3)
|
|
|
|
if new_hostname and new_hostname != device_hostname:
|
|
print(f"Setting new hostname to: {new_hostname}")
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
return self.configure_unknown_device(device_ip, new_hostname)
|
|
else:
|
|
print("No valid hostname entered, skipping device")
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
return False
|
|
|
|
finally:
|
|
# Re-enable logging
|
|
logging.disable(logging.NOTSET)
|
|
else:
|
|
self.logger.info(f"Device {device_name} does not have a toggle button")
|
|
return self.configure_unknown_device(device_ip, device_hostname)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}")
|
|
return False
|
|
else:
|
|
self.logger.info(f"Processing normal device: {device_name}")
|
|
# Create a temporary list with just this device
|
|
temp_devices = [device_info]
|
|
|
|
# Save to current.json temporarily
|
|
current_config = {"tasmota": {"devices": temp_devices}}
|
|
with open('current.json', 'w') as f:
|
|
json.dump(current_config, f, indent=2)
|
|
|
|
# Process the device - skip unknown device filtering in Device mode
|
|
self.get_device_details(use_current_json=True, skip_unknown_filter=True)
|
|
return True
|
|
|
|
def get_device_details(self, use_current_json=True, skip_unknown_filter=False):
|
|
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
|
|
Filters out devices matching unknown_device_patterns unless skip_unknown_filter is True.
|
|
|
|
Implements retry logic for console commands with up to 3 attempts and tracks failures.
|
|
|
|
Args:
|
|
use_current_json: Whether to use current.json instead of tasmota.json
|
|
skip_unknown_filter: If True, don't filter out unknown devices (used by --Device mode)
|
|
"""
|
|
self.logger.info("Starting to gather detailed device information...")
|
|
device_details = []
|
|
|
|
try:
|
|
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
|
with open(source_file, 'r') as f:
|
|
data = json.load(f)
|
|
all_devices = data.get('tasmota', {}).get('devices', [])
|
|
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
|
except FileNotFoundError:
|
|
self.logger.error(f"{source_file} not found. Run discovery first.")
|
|
return
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Invalid JSON format in {source_file}")
|
|
return
|
|
|
|
# Determine which devices to process
|
|
if skip_unknown_filter:
|
|
# When using --Device parameter, don't filter out unknown devices
|
|
devices = all_devices
|
|
self.logger.debug("Skipping unknown device filtering (Device mode)")
|
|
else:
|
|
# Normal mode: Filter out devices matching unknown_device_patterns
|
|
devices = []
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
unknown_patterns = []
|
|
for network in network_filters.values():
|
|
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
|
|
|
for device in all_devices:
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
|
|
is_unknown = False
|
|
for pattern in unknown_patterns:
|
|
pattern = pattern.lower()
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
# Check if pattern already starts with ^
|
|
if pattern.startswith('^'):
|
|
regex_pattern = pattern
|
|
else:
|
|
regex_pattern = f"^{pattern}"
|
|
if re.match(regex_pattern, name) or re.match(regex_pattern, hostname):
|
|
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
|
|
is_unknown = True
|
|
break
|
|
|
|
if not is_unknown:
|
|
devices.append(device)
|
|
|
|
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
|
|
|
|
mqtt_config = self.config.get('mqtt', {})
|
|
if not mqtt_config:
|
|
self.logger.error("MQTT configuration missing from config file")
|
|
return
|
|
|
|
def check_mqtt_settings(ip, name, mqtt_status):
|
|
"""Check and update MQTT settings if they don't match config"""
|
|
# Use the unified MQTT configuration method
|
|
return self.configure_mqtt_settings(
|
|
ip=ip,
|
|
name=name,
|
|
mqtt_status=mqtt_status,
|
|
is_new_device=False,
|
|
set_friendly_name=False,
|
|
enable_mqtt=False,
|
|
with_retry=True,
|
|
reboot=False
|
|
)
|
|
|
|
for device in devices:
|
|
if not isinstance(device, dict):
|
|
self.logger.warning(f"Skipping invalid device entry: {device}")
|
|
continue
|
|
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
mac = device.get('mac')
|
|
|
|
if not ip:
|
|
self.logger.warning(f"Skipping device {name} - no IP address")
|
|
continue
|
|
|
|
self.logger.info(f"Checking device: {name} at {ip}")
|
|
|
|
try:
|
|
# Get Status 2 for firmware version
|
|
url_status = f"http://{ip}/cm?cmnd=Status%202"
|
|
response = requests.get(url_status, timeout=5)
|
|
status_data = response.json()
|
|
|
|
# Get Status 5 for network info using the common function
|
|
hostname, hostname_success = self.get_device_hostname(ip, name, timeout=5, log_level='info')
|
|
|
|
# Create a network_data structure for backward compatibility
|
|
network_data = {"StatusNET": {"Hostname": hostname if hostname_success else "Unknown"}}
|
|
|
|
# Get Status 6 for MQTT info
|
|
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
|
|
response = requests.get(url_mqtt, timeout=5)
|
|
mqtt_data = response.json()
|
|
|
|
# Check and update MQTT settings if needed
|
|
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
|
|
|
|
# Check and update template if needed
|
|
template_updated = self.check_and_update_template(ip, name)
|
|
|
|
# Console settings are now applied in configure_mqtt_settings
|
|
console_updated = mqtt_updated
|
|
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
|
|
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
|
|
"mqtt_status": "Updated" if mqtt_updated else "Verified",
|
|
"console_status": "Updated" if console_updated else "Verified",
|
|
"template_status": "Updated" if template_updated else "Verified",
|
|
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"status": "online"
|
|
}
|
|
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": "Unknown",
|
|
"status": "offline",
|
|
"error": str(e)
|
|
}
|
|
|
|
device_details.append(device_detail)
|
|
time.sleep(0.5)
|
|
|
|
# Save all device details at once
|
|
try:
|
|
with open('TasmotaDevices.json', 'w') as f:
|
|
json.dump(device_details, f, indent=2)
|
|
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving device details: {e}")
|
|
|
|
# Print summary of command failures if any occurred
|
|
if hasattr(self, 'command_failures') and self.command_failures:
|
|
failure_count = len(self.command_failures)
|
|
print("\n" + "="*80)
|
|
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
|
|
print("="*80)
|
|
|
|
# Group failures by device for better readability
|
|
failures_by_device = {}
|
|
for failure in self.command_failures:
|
|
device_name = failure['device']
|
|
if device_name not in failures_by_device:
|
|
failures_by_device[device_name] = []
|
|
failures_by_device[device_name].append(failure)
|
|
|
|
# Print failures grouped by device
|
|
for device_name, failures in failures_by_device.items():
|
|
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
|
|
print("-" * 40)
|
|
for i, failure in enumerate(failures, 1):
|
|
print(f" {i}. Command: {failure['command']}")
|
|
print(f" Error: {failure['error']}")
|
|
|
|
print("\n" + "="*80)
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
|
parser.add_argument('--config', default='network_configuration.json',
|
|
help='Path to configuration file')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug logging')
|
|
parser.add_argument('--skip-unifi', action='store_true',
|
|
help='Skip UniFi discovery and use existing current.json')
|
|
parser.add_argument('--process-unknown', action='store_true',
|
|
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
|
|
parser.add_argument('--Device',
|
|
help='Process a single device by hostname or IP address')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set up logging
|
|
log_level = logging.DEBUG if args.debug else logging.INFO
|
|
log_format = '%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s' if args.debug else '%(asctime)s - %(levelname)s - %(message)s'
|
|
logging.basicConfig(level=log_level,
|
|
format=log_format,
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
print("Starting Tasmota Device Discovery and Version Check...")
|
|
|
|
# Create TasmotaDiscovery instance
|
|
discovery = TasmotaDiscovery(debug=args.debug)
|
|
discovery.load_config(args.config)
|
|
|
|
try:
|
|
# Process a single device if --Device parameter is provided
|
|
if args.Device:
|
|
print(f"Processing single device: {args.Device}")
|
|
# Let process_single_device handle the UniFi client setup as needed
|
|
success = discovery.process_single_device(args.Device)
|
|
if success:
|
|
print(f"\nDevice {args.Device} processed successfully!")
|
|
print("- Detailed information saved to: TasmotaDevices.json")
|
|
else:
|
|
print(f"\nFailed to process device: {args.Device}")
|
|
return 1
|
|
else:
|
|
# Normal processing flow
|
|
if not args.skip_unifi:
|
|
print("Step 1: Discovering Tasmota devices...")
|
|
discovery.setup_unifi_client()
|
|
tasmota_devices = discovery.get_tasmota_devices()
|
|
discovery.save_tasmota_config(tasmota_devices)
|
|
else:
|
|
print("Skipping UniFi discovery, using existing current.json...")
|
|
|
|
if args.process_unknown:
|
|
print("\nStep 2: Processing unknown devices...")
|
|
discovery.process_unknown_devices()
|
|
else:
|
|
print("\nStep 2: Getting detailed version information...")
|
|
discovery.get_device_details(use_current_json=True)
|
|
|
|
print("\nProcess completed successfully!")
|
|
print("- Device list saved to: current.json")
|
|
print("- Detailed information saved to: TasmotaDevices.json")
|
|
|
|
except ConnectionError as e:
|
|
print(f"Connection Error: {str(e)}")
|
|
print("\nTrying to proceed with existing current.json...")
|
|
try:
|
|
discovery.get_device_details(use_current_json=True)
|
|
print("\nSuccessfully retrieved device details from existing current.json")
|
|
except Exception as inner_e:
|
|
print(f"Error processing existing devices: {str(inner_e)}")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
main() |