TasmotaManager/TasmotaManager.py

1727 lines
88 KiB
Python

import json
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import re # Import the regular expression module
import time
import argparse
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class UnifiClient:
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.site_id = site_id
self.session = requests.Session()
self.session.verify = verify_ssl
# Initialize cookie jar
self.session.cookies.clear()
def _login(self) -> requests.Response: # Changed return type annotation
"""Authenticate with the UniFi Controller."""
login_url = f"{self.base_url}/api/auth/login"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
payload = {
"username": self.username,
"password": self.password,
"remember": False
}
try:
response = self.session.post(
login_url,
json=payload,
headers=headers
)
response.raise_for_status()
if 'X-CSRF-Token' in response.headers:
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
return response # Return the response object
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response.status_code == 401:
raise Exception("Authentication failed. Please verify your username and password.") from e
raise
def get_clients(self) -> list:
"""Get all clients from the UniFi Controller."""
# Try the newer API endpoint first
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If the newer endpoint fails, try the legacy endpoint
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If both fail, try the v2 API endpoint
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
class TasmotaDiscovery:
def __init__(self, debug: bool = False):
"""Initialize the TasmotaDiscovery with optional debug mode."""
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
self.config = None
self.unifi_client = None
def load_config(self, config_path: Optional[str] = None) -> dict:
"""Load configuration from JSON file."""
if config_path is None:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
self.logger.debug(f"Loading configuration from: {config_path}")
try:
with open(config_path, 'r') as config_file:
self.config = json.load(config_file)
self.logger.debug("Configuration loaded successfully from %s", config_path)
return self.config
except FileNotFoundError:
self.logger.error(f"Configuration file not found at {config_path}")
sys.exit(1)
except json.JSONDecodeError:
self.logger.error("Invalid JSON in configuration file")
sys.exit(1)
def setup_unifi_client(self):
"""Set up the UniFi client with better error handling"""
self.logger.debug("Setting up UniFi client")
if not self.config or 'unifi' not in self.config:
raise ValueError("Missing UniFi configuration")
unifi_config = self.config['unifi']
required_fields = ['host', 'username', 'password', 'site']
missing_fields = [field for field in required_fields if field not in unifi_config]
if missing_fields:
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
try:
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
self.unifi_client = UnifiClient(
base_url=unifi_config['host'],
username=unifi_config['username'],
password=unifi_config['password'],
site_id=unifi_config['site'],
verify_ssl=False # Add this if using self-signed certificates
)
# Test the connection by making a simple request
response = self.unifi_client._login()
if not response:
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
self.logger.debug("UniFi client setup successful")
except Exception as e:
self.logger.error(f"Error setting up UniFi client: {str(e)}")
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
def is_tasmota_device(self, device: dict) -> bool:
"""Determine if a device is in the network_filter and not in exclude_patterns."""
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
ip = device.get('ip', '')
# Check if device is in the configured network
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
if ip.startswith(network['subnet']):
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
# Check exclusion patterns
exclude_patterns = network.get('exclude_patterns', [])
for pattern in exclude_patterns:
pattern = pattern.lower()
# Convert glob pattern to regex pattern
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
return False
# Device is in the network and not excluded
self.logger.debug(f"Found device in network: {name}")
return True
return False
def get_tasmota_devices(self) -> list:
"""Query UniFi controller and filter Tasmota devices."""
devices = []
self.logger.debug("Querying UniFi controller for devices")
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
for device in all_clients:
if self.is_tasmota_device(device):
# Determine connection type based on available fields
connection = "Unknown"
if device.get('essid'):
connection = f"Wireless - {device.get('essid')}"
elif device.get('radio') or device.get('wifi'):
connection = "Wireless"
elif device.get('port') or device.get('switch_port') or device.get('switch'):
connection = "Wired"
device_info = {
"name": device.get('name', device.get('hostname', 'Unknown')),
"ip": device.get('ip', ''),
"mac": device.get('mac', ''),
"last_seen": device.get('last_seen', ''),
"hostname": device.get('hostname', ''),
"notes": device.get('note', ''),
"connection": connection,
}
devices.append(device_info)
self.logger.debug(f"Found {len(devices)} Tasmota devices")
return devices
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return []
def save_tasmota_config(self, devices: list) -> None:
"""Save Tasmota device information to a JSON file with device tracking."""
filename = "current.json"
self.logger.debug(f"Saving Tasmota configuration to {filename}")
deprecated_filename = "deprecated.json"
current_devices = []
deprecated_devices = []
# Load existing devices if file exists
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
existing_config = json.load(f)
current_devices = existing_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {filename}, treating as empty")
current_devices = []
# Load deprecated devices if file exists
if os.path.exists(deprecated_filename):
try:
with open(deprecated_filename, 'r') as f:
deprecated_config = json.load(f)
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
deprecated_devices = []
# Create new config
new_devices = []
moved_to_deprecated = []
restored_from_deprecated = []
removed_from_deprecated = []
excluded_devices = []
# Check for excluded devices in current and deprecated lists
network_filters = self.config['unifi'].get('network_filter', {})
exclude_patterns = []
for network in network_filters.values():
exclude_patterns.extend(network.get('exclude_patterns', []))
# Function to check if device is excluded
def is_device_excluded(device_name: str, hostname: str = '') -> bool:
name = device_name.lower()
hostname = hostname.lower()
for pattern in exclude_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
return True
return False
# Process current devices
for device in devices:
device_name = device['name']
device_hostname = device.get('hostname', '')
device_ip = device['ip']
device_mac = device['mac']
# Check if device should be excluded
if is_device_excluded(device_name, device_hostname):
print(f"Device {device_name} excluded by pattern - skipping")
excluded_devices.append(device_name)
continue
# Check in current devices
existing_device = next((d for d in current_devices
if d['name'] == device_name), None)
if existing_device:
# Device exists, check if IP or MAC changed
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
moved_to_deprecated.append(existing_device)
new_devices.append(device)
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
else:
new_devices.append(existing_device) # Keep existing device
else:
# New device, check if it was in deprecated
deprecated_device = next((d for d in deprecated_devices
if d['name'] == device_name), None)
if deprecated_device:
removed_from_deprecated.append(device_name)
print(f"Device {device_name} removed from deprecated (restored)")
new_devices.append(device)
print(f"Device {device_name} added to output file")
# Find devices that are no longer present
current_names = {d['name'] for d in devices}
for existing_device in current_devices:
if existing_device['name'] not in current_names:
if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')):
moved_to_deprecated.append(existing_device)
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
# Update deprecated devices list, excluding any excluded devices
final_deprecated = []
for device in deprecated_devices:
if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')):
final_deprecated.append(device)
elif is_device_excluded(device['name'], device.get('hostname', '')):
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
final_deprecated.extend(moved_to_deprecated)
# Save new configuration
config = {
"tasmota": {
"devices": new_devices,
"generated_at": datetime.now().isoformat(),
"total_devices": len(new_devices)
}
}
# Save deprecated configuration
deprecated_config = {
"tasmota": {
"devices": final_deprecated,
"generated_at": datetime.now().isoformat(),
"total_devices": len(final_deprecated)
}
}
# Backup existing file if it exists
if os.path.exists(filename):
try:
backup_name = f"{filename}.backup"
os.rename(filename, backup_name)
self.logger.info(f"Created backup of existing configuration as {backup_name}")
except Exception as e:
self.logger.error(f"Error creating backup: {e}")
# Save files
try:
with open(filename, 'w') as f:
json.dump(config, f, indent=4)
with open(deprecated_filename, 'w') as f:
json.dump(deprecated_config, f, indent=4)
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
print("\nDevice Status Summary:")
if excluded_devices:
print("\nExcluded Devices:")
for name in excluded_devices:
print(f"- {name}")
if moved_to_deprecated:
print("\nMoved to deprecated:")
for device in moved_to_deprecated:
print(f"- {device['name']}")
if removed_from_deprecated:
print("\nRestored from deprecated:")
for name in removed_from_deprecated:
print(f"- {name}")
print("\nCurrent Tasmota Devices:")
for device in new_devices:
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def get_unknown_devices(self, use_current_json=True):
"""Identify devices that match unknown_device_patterns from current.json."""
self.logger.info("Identifying unknown devices for processing...")
unknown_devices = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return []
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return []
# Identify devices matching unknown_device_patterns
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
self.logger.debug(f"Found unknown device: {name} ({hostname})")
unknown_devices.append(device)
break
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
return unknown_devices
def process_unknown_devices(self):
"""Process unknown devices by checking for toggle button and configuring them.
This method:
1. Identifies devices matching unknown_device_patterns
2. Checks if each device has a toggle button (indicating it's a light/switch)
3. Toggles the button at 1/2Hz while checking for hostname changes
4. Prompts the user to enter a new name for the device in the console
5. Once a name is entered, configures the device with the new hostname
"""
unknown_devices = self.get_unknown_devices()
if not unknown_devices:
self.logger.info("No unknown devices found to process")
return
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
for device in unknown_devices:
name = device.get('name', 'Unknown')
ip = device.get('ip')
connection = device.get('connection', 'Unknown')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Processing unknown device: {name} at {ip} with connection {connection}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
# Start toggling at 1/2Hz
original_hostname = device.get('hostname', '')
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {name} at IP: {ip} Connection: {connection}")
print(f"Current hostname: {original_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
print("(Enter nothing, 'unknown', or 'na' to assume device could not be found and end)")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
# Check for special inputs that indicate device could not be found
if not new_hostname or new_hostname.lower() == "unknown" or new_hostname.lower() == "na":
print("Assuming device could not be found, ending process")
return # End the entire process
if new_hostname != original_hostname:
print(f"Setting new hostname to: {new_hostname}")
else:
print("No valid hostname entered, skipping device")
new_hostname = ""
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
# If a new hostname was entered, configure the device
if new_hostname:
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
self.configure_unknown_device(ip, new_hostname)
else:
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
else:
self.logger.info(f"Device {name} does not have a toggle button, skipping")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
def check_and_update_template(self, ip, name):
"""Check and update device template based on mqtt.config_other settings.
Algorithm:
1. Get the device name from the Configuration/Other page using Status 0
2. Get the current template using Template command
3. Check if any key in mqtt.config_other matches the device name
4. If a match is found, check if the template matches the value
5. If the template doesn't match, write the value to the template
6. If no key matches, check if any value matches the template
7. If a value match is found, write the key to the device name
Args:
ip: The IP address of the device
name: The name/hostname of the device
Returns:
bool: True if template was updated, False otherwise
"""
try:
# Get mqtt.config_other settings
config_other = self.config.get('mqtt', {}).get('config_other', {})
if not config_other:
self.logger.debug(f"{name}: No mqtt.config_other settings found in configuration")
return False
# Get Status 0 for device name from Configuration/Other page
url_status0 = f"http://{ip}/cm?cmnd=Status%200"
response = requests.get(url_status0, timeout=5)
status0_data = response.json()
# Extract device name from Status 0 response
device_name = status0_data.get("Status", {}).get("DeviceName", "")
if not device_name:
self.logger.debug(f"{name}: Could not get device name from Status 0")
return False
self.logger.debug(f"{name}: Device name from Configuration/Other page: {device_name}")
# Get current template
url_template = f"http://{ip}/cm?cmnd=Template"
response = requests.get(url_template, timeout=5)
template_data = response.json()
# Log the actual response format for debugging
self.logger.debug(f"{name}: Template response: {template_data}")
# Extract current template - handle different response formats
current_template = ""
# Try different possible response formats
if "Template" in template_data:
current_template = template_data.get("Template", "")
elif isinstance(template_data, dict) and len(template_data) > 0:
# If there's no "Template" key but we have a dict, try to get the first value
# This handles cases where the response might be {"NAME":"...","GPIO":[...]}
first_key = next(iter(template_data))
if isinstance(template_data[first_key], str) and "{" in template_data[first_key]:
current_template = template_data[first_key]
self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}")
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
# Convert the dict to a JSON string to match the expected format
import json
current_template = json.dumps(template_data)
self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys")
if not current_template:
self.logger.debug(f"{name}: Could not get current template from response")
return False
self.logger.debug(f"{name}: Current template: {current_template}")
# Check if any key in mqtt.config_other matches the device name
template_updated = False
if device_name in config_other:
# Key matches device name, check if template matches value
template_value = config_other[device_name]
if current_template != template_value:
# Template doesn't match, write value to template
self.logger.info(f"{name}: Device name '{device_name}' matches key in config_other, but template doesn't match")
self.logger.info(f"{name}: Setting template to: {template_value}")
# URL encode the template value
import urllib.parse
encoded_value = urllib.parse.quote(template_value)
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Template updated successfully")
# Activate the template by setting module to 0 (Template module)
self.logger.info(f"{name}: Activating template by setting module to 0")
module_url = f"http://{ip}/cm?cmnd=Module%200"
module_response = requests.get(module_url, timeout=5)
if module_response.status_code == 200:
self.logger.info(f"{name}: Module set to 0 successfully")
# Restart the device to apply the template
self.logger.info(f"{name}: Restarting device to apply template")
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
restart_response = requests.get(restart_url, timeout=5)
if restart_response.status_code == 200:
self.logger.info(f"{name}: Device restart initiated successfully")
template_updated = True
else:
self.logger.error(f"{name}: Failed to restart device")
else:
self.logger.error(f"{name}: Failed to set module to 0")
else:
self.logger.error(f"{name}: Failed to update template")
else:
self.logger.debug(f"{name}: Device name '{device_name}' matches key in config_other and template matches value")
else:
# No key matches device name, check if any value matches the template
matching_key = None
for key, value in config_other.items():
if value == current_template:
matching_key = key
break
if matching_key:
# Value matches template, write key to device name
self.logger.info(f"{name}: Template matches value for key '{matching_key}' in config_other")
self.logger.info(f"{name}: Setting device name to: {matching_key}")
url = f"http://{ip}/cm?cmnd=DeviceName%20{matching_key}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Device name updated successfully")
# Activate the template by setting module to 0 (Template module)
self.logger.info(f"{name}: Activating template by setting module to 0")
module_url = f"http://{ip}/cm?cmnd=Module%200"
module_response = requests.get(module_url, timeout=5)
if module_response.status_code == 200:
self.logger.info(f"{name}: Module set to 0 successfully")
# Restart the device to apply the template
self.logger.info(f"{name}: Restarting device to apply template")
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
restart_response = requests.get(restart_url, timeout=5)
if restart_response.status_code == 200:
self.logger.info(f"{name}: Device restart initiated successfully")
template_updated = True
else:
self.logger.error(f"{name}: Failed to restart device")
else:
self.logger.error(f"{name}: Failed to set module to 0")
else:
self.logger.error(f"{name}: Failed to update device name")
else:
# No matches found, print detailed information about what's on the device
self.logger.info(f"{name}: No matches found in config_other for either Device Name or Template")
self.logger.info(f"{name}: Current Device Name on device: '{device_name}'")
self.logger.info(f"{name}: Current Template on device: '{current_template}'")
print(f"\nNo template match found for device {name} at {ip}")
print(f" Device Name on device: '{device_name}'")
print(f" Template on device: '{current_template}'")
print("Please add an appropriate entry to mqtt.config_other in your configuration file.")
return template_updated
except requests.exceptions.RequestException as e:
self.logger.error(f"Error checking/updating template for device at {ip}: {str(e)}")
return False
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False):
"""Configure MQTT settings for a device.
Args:
ip: The IP address of the device
name: The name/hostname of the device
mqtt_status: The current MQTT status of the device (from Status 6)
is_new_device: Whether this is a new device (True) or existing device (False)
set_friendly_name: Whether to set the friendly name
enable_mqtt: Whether to enable MQTT
with_retry: Whether to use retry logic
reboot: Whether to reboot the device after configuration
Returns:
bool: True if configuration was successful, False otherwise
"""
try:
# Set Friendly Name if requested
if set_friendly_name:
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{name}"
response = requests.get(friendly_name_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Set Friendly Name to {name}")
else:
self.logger.error(f"Failed to set Friendly Name to {name}")
# Enable MQTT if requested
if enable_mqtt:
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
response = requests.get(mqtt_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Enabled MQTT for {name}")
else:
self.logger.error(f"Failed to enable MQTT for {name}")
# Configure MQTT settings
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return False
# Get the base hostname (everything before the dash)
hostname_base = name.split('-')[0] if '-' in name else name
# Define MQTT fields
mqtt_fields = {
"MqttHost": mqtt_config.get('Host', ''),
"MqttPort": mqtt_config.get('Port', 1883),
"MqttUser": mqtt_config.get('User', ''),
"MqttPassword": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
# For existing devices, check if MQTT settings need to be updated
changes_needed = []
if not is_new_device and mqtt_status:
device_mqtt = mqtt_status.get('MqttHost', {})
force_password_update = False
# Check each MQTT setting
if device_mqtt.get('Host') != mqtt_fields['MqttHost']:
changes_needed.append(('MqttHost', mqtt_fields['MqttHost']))
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['MqttHost']}")
force_password_update = True
if device_mqtt.get('Port') != mqtt_fields['MqttPort']:
changes_needed.append(('MqttPort', mqtt_fields['MqttPort']))
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['MqttPort']}")
force_password_update = True
if device_mqtt.get('User') != mqtt_fields['MqttUser']:
changes_needed.append(('MqttUser', mqtt_fields['MqttUser']))
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['MqttUser']}")
force_password_update = True
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
changes_needed.append(('Topic', mqtt_fields['Topic']))
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
force_password_update = True
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
force_password_update = True
# Add password update if any MQTT setting changed or user was updated
if force_password_update:
changes_needed.append(('MqttPassword', mqtt_fields['MqttPassword']))
self.logger.debug(f"{name}: MQTT Password will be updated")
# Check NoRetain setting
no_retain = mqtt_config.get('NoRetain', False)
if no_retain:
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
else:
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
else:
# For new devices, set all MQTT settings
for field, value in mqtt_fields.items():
changes_needed.append((field, value))
# Apply MQTT settings
mqtt_updated = False
for setting, value in changes_needed:
try:
# For FullTopic, we need to avoid adding a space (%20) or equals sign between the command and value
if setting == "FullTopic":
url = f"http://{ip}/cm?cmnd={setting}{value}"
else:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.debug(f"{name}: Updated {setting} to {value}")
else:
self.logger.debug(f"{name}: Updated MQTT Password")
mqtt_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to update {setting} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout updating {setting} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error updating {setting}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to update {setting} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{setting} {value}",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.info(f"{name}: Set {setting} to {value}")
else:
self.logger.info(f"{name}: Set MQTT Password")
mqtt_updated = True
else:
self.logger.error(f"{name}: Failed to set {setting}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
# Apply console settings
console_updated = False
console_params = mqtt_config.get('console', {})
if console_params:
self.logger.info(f"{name}: Setting console parameters from configuration")
# Special handling for Retain parameters - need to send opposite state first, then final state
# This is necessary because the changes are what create the update of the Retain state at the MQTT server
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
# Process Retain parameters first
for param in retain_params:
if param in console_params:
try:
final_value = console_params[param]
# Set opposite state first
opposite_value = "On" if final_value.lower() == "off" else "Off"
if with_retry:
# First command (opposite state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to set {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting {param} to {opposite_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting {param} to {opposite_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set {param} to {opposite_value} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {opposite_value}",
"error": last_error
})
else:
# First command (opposite state) - without retry logic
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{name}: Failed to set {param} to {opposite_value}")
# Small delay to ensure commands are processed in order
time.sleep(0.5)
if with_retry:
# Second command (final state) - with retry logic
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
success = False
attempts = 0
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to set {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting {param} to {final_value} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting {param} to {final_value}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set {param} to {final_value} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {final_value}",
"error": last_error
})
else:
# Second command (final state) - without retry logic
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{name}: Failed to set {param} to {final_value}")
except Exception as e:
self.logger.error(f"{name}: Unexpected error setting {param} commands: {str(e)}")
# Track the failure for later reporting if using retry logic
if with_retry:
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} (both steps)",
"error": str(e)
})
# Process all other console parameters
# Track rules that need to be enabled
rules_to_enable = {}
for param, value in console_params.items():
# Skip Retain parameters as they're handled specially above
if param in retain_params:
continue
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# Store the rule number for later enabling
rule_num = param[-1]
rules_to_enable[rule_num] = True
if with_retry:
self.logger.info(f"{name}: Detected rule definition {param}='{value}', will auto-enable")
else:
self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable")
# Skip Rule1, Rule2, etc. if we're auto-enabling rules and using retry logic
if with_retry and param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit():
# If this is in the config, we'll respect it, but log that it's not needed
self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature")
# Regular console parameter
# Special handling for rule parameters to properly encode the URL
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# For rule commands, we need to URL encode the entire value to preserve special characters
import urllib.parse
encoded_value = urllib.parse.quote(value)
url = f"http://{ip}/cm?cmnd={param}%20{encoded_value}"
self.logger.info(f"{name}: Sending rule command: {url}")
else:
url = f"http://{ip}/cm?cmnd={param}%20{value}"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
# Special logging for rule parameters
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
self.logger.info(f"{name}: Rule command response: {response.text}")
self.logger.info(f"{name}: Set rule {param} to '{value}'")
else:
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to set console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout setting console parameter {param} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error setting console parameter {param}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to set console parameter {param} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{param} {value}",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
self.logger.info(f"{name}: Rule command response: {response.text}")
self.logger.info(f"{name}: Set rule {param} to '{value}'")
else:
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
else:
self.logger.error(f"{name}: Failed to set console parameter {param}")
# Auto-enable any rules that were defined
if with_retry:
self.logger.info(f"{name}: Rules to enable: {rules_to_enable}")
for rule_num in rules_to_enable:
rule_enable_param = f"Rule{rule_num}"
# Skip if the rule enable command was already in the config
if with_retry:
# Check if the uppercase version (Rule1) is in the config
if rule_enable_param in console_params:
self.logger.info(f"{name}: Skipping {rule_enable_param} as it's already in config (uppercase version)")
continue
# Check if the lowercase version (rule1) is in the config
lowercase_rule_param = f"rule{rule_num}"
if lowercase_rule_param in console_params:
self.logger.info(f"{name}: Found lowercase {lowercase_rule_param} in config, will enable {rule_enable_param}")
# Don't continue - we want to enable the rule
else:
self.logger.info(f"{name}: No rule definition found in config, skipping auto-enable")
continue
else:
# Simple check for any version of the rule enable command
if any(p.lower() == rule_enable_param.lower() for p in console_params):
continue
# Rule auto-enabling
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
if with_retry:
# With retry logic
success = False
attempts = 0
max_attempts = 3
last_error = None
while not success and attempts < max_attempts:
attempts += 1
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
console_updated = True
success = True
else:
self.logger.warning(f"{name}: Failed to auto-enable {rule_enable_param} (attempt {attempts}/{max_attempts})")
last_error = f"HTTP {response.status_code}"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.Timeout as e:
self.logger.warning(f"{name}: Timeout auto-enabling {rule_enable_param} (attempt {attempts}/{max_attempts})")
last_error = "Timeout"
if attempts < max_attempts:
time.sleep(1) # Wait before retry
except requests.exceptions.RequestException as e:
self.logger.warning(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)} (attempt {attempts}/{max_attempts})")
last_error = str(e)
if attempts < max_attempts:
time.sleep(1) # Wait before retry
if not success:
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param} after {max_attempts} attempts. Last error: {last_error}")
# Track the failure for later reporting
if not hasattr(self, 'command_failures'):
self.command_failures = []
self.command_failures.append({
"device": name,
"ip": ip,
"command": f"{rule_enable_param} 1",
"error": last_error
})
else:
# Without retry logic
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
else:
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}")
# Reboot the device if requested
if reboot:
save_url = f"http://{ip}/cm?cmnd=Restart%201"
response = requests.get(save_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Saved configuration and rebooted {name}")
else:
self.logger.error(f"Failed to save configuration for {name}")
return mqtt_updated or console_updated
except requests.exceptions.RequestException as e:
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
return False
def configure_unknown_device(self, ip, hostname):
"""Configure an unknown device with the given hostname and MQTT settings."""
return self.configure_mqtt_settings(
ip=ip,
name=hostname,
is_new_device=True,
set_friendly_name=True,
enable_mqtt=True,
with_retry=False,
reboot=True
)
def is_ip_in_network_filter(self, ip_address):
"""Check if an IP address is in any of the configured network filters.
Args:
ip_address: The IP address to check
Returns:
tuple: (is_in_network, target_network, network_name) where:
- is_in_network is a boolean indicating if the IP is in a network
- target_network is the network configuration dict or None
- network_name is the name of the network or None
"""
network_filters = self.config['unifi'].get('network_filter', {})
for network_name, network in network_filters.items():
if ip_address.startswith(network['subnet']):
self.logger.info(f"IP {ip_address} is in network: {network_name}")
return True, network, network_name
self.logger.error(f"IP {ip_address} is not in any configured network")
return False, None, None
def process_single_device(self, device_identifier):
"""Process a single device by hostname or IP address.
Args:
device_identifier: Either a hostname or IP address
Returns:
bool: True if device was processed successfully, False otherwise
"""
self.logger.info(f"Processing single device: {device_identifier}")
# Check if device_identifier is an IP address or hostname
is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier))
# If it's an IP address, check if it's in the network_filter first
if is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier)
if not in_network:
return False
# Setup Unifi client if not already done
if not self.unifi_client:
try:
self.setup_unifi_client()
except ConnectionError as e:
self.logger.error(f"Failed to connect to UniFi controller: {str(e)}")
return False
# Get all clients from Unifi
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return False
# Find the device in Unifi
target_device = None
if is_ip:
# Search by IP
self.logger.debug(f"Searching for device with IP: {device_identifier}")
target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None)
if not target_device:
self.logger.error(f"No device found with IP: {device_identifier}")
return False
else:
# Search by hostname - support partial and wildcard matches
self.logger.debug(f"Searching for device with hostname: {device_identifier}")
# Check if the identifier contains wildcards
has_wildcards = '*' in device_identifier
# Convert wildcards to regex pattern if present
if has_wildcards:
pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*')
self.logger.debug(f"Using wildcard pattern: {pattern}")
else:
# For partial matches, we'll use the identifier as a substring
pattern = device_identifier.lower()
self.logger.debug(f"Using partial match pattern: {pattern}")
# Find all matching devices
matching_devices = []
for device in all_clients:
hostname = device.get('hostname', '').lower()
name = device.get('name', '').lower()
if has_wildcards:
# For wildcard matches, use regex
if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)):
matching_devices.append(device)
else:
# For partial matches, check if pattern is a substring
if pattern in hostname or pattern in name:
matching_devices.append(device)
# Handle the results
if not matching_devices:
self.logger.error(f"No devices found matching: {device_identifier}")
return False
elif len(matching_devices) > 1:
# Multiple matches found - log them and use the first one
self.logger.warning(f"Multiple devices found matching '{device_identifier}':")
for i, device in enumerate(matching_devices, 1):
device_name = device.get('name', device.get('hostname', 'Unknown'))
device_ip = device.get('ip', '')
self.logger.warning(f" {i}. {device_name} (IP: {device_ip})")
self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}")
# Use the first (or only) matching device
target_device = matching_devices[0]
# Get device details
device_name = target_device.get('name', target_device.get('hostname', 'Unknown'))
device_hostname = target_device.get('hostname', '')
device_ip = target_device.get('ip', '')
device_mac = target_device.get('mac', '')
self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})")
# If we're processing a hostname (not an IP), check if the device's IP is in the network_filter
if not is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip)
if not in_network:
self.logger.error(f"Device {device_name} is not in any configured network")
return False
# For IP addresses, we already have the target_network from the earlier check
# Check if device is excluded
exclude_patterns = target_network.get('exclude_patterns', [])
for pattern in exclude_patterns:
pattern_lower = pattern.lower()
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
if (re.match(f"^{pattern_regex}$", device_name.lower()) or
re.match(f"^{pattern_regex}$", device_hostname.lower())):
self.logger.error(f"Device {device_name} is excluded by pattern: {pattern}")
return False
# Check if device is in unknown_device_patterns
unknown_patterns = target_network.get('unknown_device_patterns', [])
is_unknown = False
for pattern in unknown_patterns:
pattern_lower = pattern.lower()
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
if (re.match(f"^{pattern_regex}", device_name.lower()) or
re.match(f"^{pattern_regex}", device_hostname.lower())):
is_unknown = True
self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}")
break
# Determine connection type based on available fields
connection = "Unknown"
if target_device.get('essid'):
connection = f"Wireless - {target_device.get('essid')}"
elif target_device.get('radio') or target_device.get('wifi'):
connection = "Wireless"
elif target_device.get('port') or target_device.get('switch_port') or target_device.get('switch'):
connection = "Wired"
# Create a device info dictionary
device_info = {
"name": device_name,
"ip": device_ip,
"mac": device_mac,
"last_seen": target_device.get('last_seen', ''),
"hostname": device_hostname,
"notes": target_device.get('note', ''),
"connection": connection,
}
# Process the device based on whether it's unknown or not
if is_unknown:
self.logger.info(f"Processing unknown device: {device_name}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{device_ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate")
# Start toggling at 1/2Hz
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {device_name} at IP: {device_ip} Connection: {connection}")
print(f"Current hostname: {device_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
if new_hostname and new_hostname != device_hostname:
print(f"Setting new hostname to: {new_hostname}")
# Re-enable logging
logging.disable(logging.NOTSET)
return self.configure_unknown_device(device_ip, new_hostname)
else:
print("No valid hostname entered, skipping device")
# Re-enable logging
logging.disable(logging.NOTSET)
return False
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
else:
self.logger.info(f"Device {device_name} does not have a toggle button")
return self.configure_unknown_device(device_ip, device_hostname)
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}")
return False
else:
self.logger.info(f"Processing normal device: {device_name}")
# Create a temporary list with just this device
temp_devices = [device_info]
# Save to current.json temporarily
current_config = {"tasmota": {"devices": temp_devices}}
with open('current.json', 'w') as f:
json.dump(current_config, f, indent=2)
# Process the device - skip unknown device filtering in Device mode
self.get_device_details(use_current_json=True, skip_unknown_filter=True)
return True
def get_device_details(self, use_current_json=True, skip_unknown_filter=False):
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
Filters out devices matching unknown_device_patterns unless skip_unknown_filter is True.
Implements retry logic for console commands with up to 3 attempts and tracks failures.
Args:
use_current_json: Whether to use current.json instead of tasmota.json
skip_unknown_filter: If True, don't filter out unknown devices (used by --Device mode)
"""
self.logger.info("Starting to gather detailed device information...")
device_details = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return
# Determine which devices to process
if skip_unknown_filter:
# When using --Device parameter, don't filter out unknown devices
devices = all_devices
self.logger.debug("Skipping unknown device filtering (Device mode)")
else:
# Normal mode: Filter out devices matching unknown_device_patterns
devices = []
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
is_unknown = False
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
is_unknown = True
break
if not is_unknown:
devices.append(device)
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return
def check_mqtt_settings(ip, name, mqtt_status):
"""Check and update MQTT settings if they don't match config"""
# Use the unified MQTT configuration method
return self.configure_mqtt_settings(
ip=ip,
name=name,
mqtt_status=mqtt_status,
is_new_device=False,
set_friendly_name=False,
enable_mqtt=False,
with_retry=True,
reboot=False
)
for device in devices:
if not isinstance(device, dict):
self.logger.warning(f"Skipping invalid device entry: {device}")
continue
name = device.get('name', 'Unknown')
ip = device.get('ip')
mac = device.get('mac')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Checking device: {name} at {ip}")
try:
# Get Status 2 for firmware version
url_status = f"http://{ip}/cm?cmnd=Status%202"
response = requests.get(url_status, timeout=5)
status_data = response.json()
# Get Status 5 for network info
url_network = f"http://{ip}/cm?cmnd=Status%205"
response = requests.get(url_network, timeout=5)
network_data = response.json()
# Get Status 6 for MQTT info
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
response = requests.get(url_mqtt, timeout=5)
mqtt_data = response.json()
# Check and update MQTT settings if needed
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
# Check and update template if needed
template_updated = self.check_and_update_template(ip, name)
# Console settings are now applied in configure_mqtt_settings
console_updated = mqtt_updated
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
"mqtt_status": "Updated" if mqtt_updated else "Verified",
"console_status": "Updated" if console_updated else "Verified",
"template_status": "Updated" if template_updated else "Verified",
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "online"
}
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": "Unknown",
"status": "offline",
"error": str(e)
}
device_details.append(device_detail)
time.sleep(0.5)
# Save all device details at once
try:
with open('TasmotaDevices.json', 'w') as f:
json.dump(device_details, f, indent=2)
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
except Exception as e:
self.logger.error(f"Error saving device details: {e}")
# Print summary of command failures if any occurred
if hasattr(self, 'command_failures') and self.command_failures:
failure_count = len(self.command_failures)
print("\n" + "="*80)
print(f"COMMAND FAILURES SUMMARY: {failure_count} command(s) failed after 3 retry attempts")
print("="*80)
# Group failures by device for better readability
failures_by_device = {}
for failure in self.command_failures:
device_name = failure['device']
if device_name not in failures_by_device:
failures_by_device[device_name] = []
failures_by_device[device_name].append(failure)
# Print failures grouped by device
for device_name, failures in failures_by_device.items():
print(f"\nDevice: {device_name} ({failures[0]['ip']})")
print("-" * 40)
for i, failure in enumerate(failures, 1):
print(f" {i}. Command: {failure['command']}")
print(f" Error: {failure['error']}")
print("\n" + "="*80)
def main():
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
parser.add_argument('--config', default='network_configuration.json',
help='Path to configuration file')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--skip-unifi', action='store_true',
help='Skip UniFi discovery and use existing current.json')
parser.add_argument('--process-unknown', action='store_true',
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
parser.add_argument('--Device',
help='Process a single device by hostname or IP address')
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
print("Starting Tasmota Device Discovery and Version Check...")
# Create TasmotaDiscovery instance
discovery = TasmotaDiscovery(debug=args.debug)
discovery.load_config(args.config)
try:
# Process a single device if --Device parameter is provided
if args.Device:
print(f"Processing single device: {args.Device}")
# Let process_single_device handle the UniFi client setup as needed
success = discovery.process_single_device(args.Device)
if success:
print(f"\nDevice {args.Device} processed successfully!")
print("- Detailed information saved to: TasmotaDevices.json")
else:
print(f"\nFailed to process device: {args.Device}")
return 1
else:
# Normal processing flow
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
else:
print("Skipping UniFi discovery, using existing current.json...")
if args.process_unknown:
print("\nStep 2: Processing unknown devices...")
discovery.process_unknown_devices()
else:
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
except ConnectionError as e:
print(f"Connection Error: {str(e)}")
print("\nTrying to proceed with existing current.json...")
try:
discovery.get_device_details(use_current_json=True)
print("\nSuccessfully retrieved device details from existing current.json")
except Exception as inner_e:
print(f"Error processing existing devices: {str(inner_e)}")
return 1
except Exception as e:
print(f"Error: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()