TasmotaManager/TasmotaManager.py
Mike Geppert daa015b4c2 Add automatic rule enabling feature to simplify configuration
This commit implements automatic enabling of Tasmota rules when they are defined in the configuration.
Key changes:
- Modified TasmotaManager.py to detect rule definitions and automatically send enable commands
- Updated network_configuration.json to remove redundant Rule1 entry
- Updated documentation in README.md and CONSOLE_COMMANDS.md to explain the new feature
- Added test script to verify the automatic rule enabling functionality

This change simplifies the configuration by allowing users to define rules without needing to
explicitly enable them with a separate command.
2025-08-05 02:21:56 -05:00

902 lines
42 KiB
Python

import json
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import re # Import the regular expression module
import time
import argparse
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class UnifiClient:
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.site_id = site_id
self.session = requests.Session()
self.session.verify = verify_ssl
# Initialize cookie jar
self.session.cookies.clear()
def _login(self) -> requests.Response: # Changed return type annotation
"""Authenticate with the UniFi Controller."""
login_url = f"{self.base_url}/api/auth/login"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
payload = {
"username": self.username,
"password": self.password,
"remember": False
}
try:
response = self.session.post(
login_url,
json=payload,
headers=headers
)
response.raise_for_status()
if 'X-CSRF-Token' in response.headers:
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
return response # Return the response object
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response.status_code == 401:
raise Exception("Authentication failed. Please verify your username and password.") from e
raise
def get_clients(self) -> list:
"""Get all clients from the UniFi Controller."""
# Try the newer API endpoint first
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If the newer endpoint fails, try the legacy endpoint
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If both fail, try the v2 API endpoint
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
class TasmotaDiscovery:
def __init__(self, debug: bool = False):
"""Initialize the TasmotaDiscovery with optional debug mode."""
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
self.config = None
self.unifi_client = None
def load_config(self, config_path: Optional[str] = None) -> dict:
"""Load configuration from JSON file."""
if config_path is None:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
self.logger.debug(f"Loading configuration from: {config_path}")
try:
with open(config_path, 'r') as config_file:
self.config = json.load(config_file)
self.logger.debug("Configuration loaded successfully from %s", config_path)
return self.config
except FileNotFoundError:
self.logger.error(f"Configuration file not found at {config_path}")
sys.exit(1)
except json.JSONDecodeError:
self.logger.error("Invalid JSON in configuration file")
sys.exit(1)
def setup_unifi_client(self):
"""Set up the UniFi client with better error handling"""
self.logger.debug("Setting up UniFi client")
if not self.config or 'unifi' not in self.config:
raise ValueError("Missing UniFi configuration")
unifi_config = self.config['unifi']
required_fields = ['host', 'username', 'password', 'site']
missing_fields = [field for field in required_fields if field not in unifi_config]
if missing_fields:
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
try:
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
self.unifi_client = UnifiClient(
base_url=unifi_config['host'],
username=unifi_config['username'],
password=unifi_config['password'],
site_id=unifi_config['site'],
verify_ssl=False # Add this if using self-signed certificates
)
# Test the connection by making a simple request
response = self.unifi_client._login()
if not response:
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
self.logger.debug("UniFi client setup successful")
except Exception as e:
self.logger.error(f"Error setting up UniFi client: {str(e)}")
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
def is_tasmota_device(self, device: dict) -> bool:
"""Determine if a device is in the network_filter and not in exclude_patterns."""
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
ip = device.get('ip', '')
# Check if device is in the configured network
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
if ip.startswith(network['subnet']):
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
# Check exclusion patterns
exclude_patterns = network.get('exclude_patterns', [])
for pattern in exclude_patterns:
pattern = pattern.lower()
# Convert glob pattern to regex pattern
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
return False
# Device is in the network and not excluded
self.logger.debug(f"Found device in network: {name}")
return True
return False
def get_tasmota_devices(self) -> list:
"""Query UniFi controller and filter Tasmota devices."""
devices = []
self.logger.debug("Querying UniFi controller for devices")
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
for device in all_clients:
if self.is_tasmota_device(device):
device_info = {
"name": device.get('name', device.get('hostname', 'Unknown')),
"ip": device.get('ip', ''),
"mac": device.get('mac', ''),
"last_seen": device.get('last_seen', ''),
"hostname": device.get('hostname', ''),
"notes": device.get('note', ''),
}
devices.append(device_info)
self.logger.debug(f"Found {len(devices)} Tasmota devices")
return devices
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return []
def save_tasmota_config(self, devices: list) -> None:
"""Save Tasmota device information to a JSON file with device tracking."""
filename = "current.json"
self.logger.debug(f"Saving Tasmota configuration to {filename}")
deprecated_filename = "deprecated.json"
current_devices = []
deprecated_devices = []
# Load existing devices if file exists
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
existing_config = json.load(f)
current_devices = existing_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {filename}, treating as empty")
current_devices = []
# Load deprecated devices if file exists
if os.path.exists(deprecated_filename):
try:
with open(deprecated_filename, 'r') as f:
deprecated_config = json.load(f)
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
deprecated_devices = []
# Create new config
new_devices = []
moved_to_deprecated = []
restored_from_deprecated = []
removed_from_deprecated = []
excluded_devices = []
# Check for excluded devices in current and deprecated lists
network_filters = self.config['unifi'].get('network_filter', {})
exclude_patterns = []
for network in network_filters.values():
exclude_patterns.extend(network.get('exclude_patterns', []))
# Function to check if device is excluded
def is_device_excluded(device_name: str, hostname: str = '') -> bool:
name = device_name.lower()
hostname = hostname.lower()
for pattern in exclude_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
return True
return False
# Process current devices
for device in devices:
device_name = device['name']
device_hostname = device.get('hostname', '')
device_ip = device['ip']
device_mac = device['mac']
# Check if device should be excluded
if is_device_excluded(device_name, device_hostname):
print(f"Device {device_name} excluded by pattern - skipping")
excluded_devices.append(device_name)
continue
# Check in current devices
existing_device = next((d for d in current_devices
if d['name'] == device_name), None)
if existing_device:
# Device exists, check if IP or MAC changed
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
moved_to_deprecated.append(existing_device)
new_devices.append(device)
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
else:
new_devices.append(existing_device) # Keep existing device
else:
# New device, check if it was in deprecated
deprecated_device = next((d for d in deprecated_devices
if d['name'] == device_name), None)
if deprecated_device:
removed_from_deprecated.append(device_name)
print(f"Device {device_name} removed from deprecated (restored)")
new_devices.append(device)
print(f"Device {device_name} added to output file")
# Find devices that are no longer present
current_names = {d['name'] for d in devices}
for existing_device in current_devices:
if existing_device['name'] not in current_names:
if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')):
moved_to_deprecated.append(existing_device)
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
# Update deprecated devices list, excluding any excluded devices
final_deprecated = []
for device in deprecated_devices:
if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')):
final_deprecated.append(device)
elif is_device_excluded(device['name'], device.get('hostname', '')):
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
final_deprecated.extend(moved_to_deprecated)
# Save new configuration
config = {
"tasmota": {
"devices": new_devices,
"generated_at": datetime.now().isoformat(),
"total_devices": len(new_devices)
}
}
# Save deprecated configuration
deprecated_config = {
"tasmota": {
"devices": final_deprecated,
"generated_at": datetime.now().isoformat(),
"total_devices": len(final_deprecated)
}
}
# Backup existing file if it exists
if os.path.exists(filename):
try:
backup_name = f"{filename}.backup"
os.rename(filename, backup_name)
self.logger.info(f"Created backup of existing configuration as {backup_name}")
except Exception as e:
self.logger.error(f"Error creating backup: {e}")
# Save files
try:
with open(filename, 'w') as f:
json.dump(config, f, indent=4)
with open(deprecated_filename, 'w') as f:
json.dump(deprecated_config, f, indent=4)
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
print("\nDevice Status Summary:")
if excluded_devices:
print("\nExcluded Devices:")
for name in excluded_devices:
print(f"- {name}")
if moved_to_deprecated:
print("\nMoved to deprecated:")
for device in moved_to_deprecated:
print(f"- {device['name']}")
if removed_from_deprecated:
print("\nRestored from deprecated:")
for name in removed_from_deprecated:
print(f"- {name}")
print("\nCurrent Tasmota Devices:")
for device in new_devices:
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def get_unknown_devices(self, use_current_json=True):
"""Identify devices that match unknown_device_patterns from current.json."""
self.logger.info("Identifying unknown devices for processing...")
unknown_devices = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return []
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return []
# Identify devices matching unknown_device_patterns
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
self.logger.debug(f"Found unknown device: {name} ({hostname})")
unknown_devices.append(device)
break
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
return unknown_devices
def process_unknown_devices(self):
"""Process unknown devices by checking for toggle button and configuring them.
This method:
1. Identifies devices matching unknown_device_patterns
2. Checks if each device has a toggle button (indicating it's a light/switch)
3. Toggles the button at 1/2Hz while checking for hostname changes
4. Prompts the user to enter a new name for the device in the console
5. Once a name is entered, configures the device with the new hostname
"""
unknown_devices = self.get_unknown_devices()
if not unknown_devices:
self.logger.info("No unknown devices found to process")
return
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
for device in unknown_devices:
name = device.get('name', 'Unknown')
ip = device.get('ip')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Processing unknown device: {name} at {ip}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
# Start toggling at 1/2Hz
original_hostname = device.get('hostname', '')
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {name} at IP: {ip}")
print(f"Current hostname: {original_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
if new_hostname and new_hostname != original_hostname:
print(f"Setting new hostname to: {new_hostname}")
else:
print("No valid hostname entered, skipping device")
new_hostname = ""
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
# If a new hostname was entered, configure the device
if new_hostname:
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
self.configure_unknown_device(ip, new_hostname)
else:
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
else:
self.logger.info(f"Device {name} does not have a toggle button, skipping")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
def configure_unknown_device(self, ip, hostname):
"""Configure an unknown device with the given hostname and MQTT settings."""
try:
# Set Friendly Name
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}"
response = requests.get(friendly_name_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Set Friendly Name to {hostname}")
else:
self.logger.error(f"Failed to set Friendly Name to {hostname}")
# Enable MQTT if not already enabled
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
response = requests.get(mqtt_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Enabled MQTT for {hostname}")
else:
self.logger.error(f"Failed to enable MQTT for {hostname}")
# Configure MQTT settings
mqtt_config = self.config.get('mqtt', {})
if mqtt_config:
# Get the base hostname (everything before the dash)
hostname_base = hostname.split('-')[0] if '-' in hostname else hostname
mqtt_fields = {
"MqttHost": mqtt_config.get('Host', ''),
"MqttPort": mqtt_config.get('Port', 1883),
"MqttUser": mqtt_config.get('User', ''),
"MqttPassword": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
for setting, value in mqtt_fields.items():
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.info(f"{hostname}: Set {setting} to {value}")
else:
self.logger.info(f"{hostname}: Set MQTT Password")
else:
self.logger.error(f"{hostname}: Failed to set {setting}")
# Save configuration (will reboot the device)
save_url = f"http://{ip}/cm?cmnd=Restart%201"
response = requests.get(save_url, timeout=5)
if response.status_code == 200:
self.logger.info(f"Saved configuration and rebooted {hostname}")
else:
self.logger.error(f"Failed to save configuration for {hostname}")
return True
except requests.exceptions.RequestException as e:
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
return False
def get_device_details(self, use_current_json=True):
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
Filters out devices matching unknown_device_patterns."""
self.logger.info("Starting to gather detailed device information...")
device_details = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
all_devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return
# Filter out devices matching unknown_device_patterns
devices = []
network_filters = self.config['unifi'].get('network_filter', {})
unknown_patterns = []
for network in network_filters.values():
unknown_patterns.extend(network.get('unknown_device_patterns', []))
for device in all_devices:
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
is_unknown = False
for pattern in unknown_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
is_unknown = True
break
if not is_unknown:
devices.append(device)
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return
def check_mqtt_settings(ip, name, mqtt_status):
"""Check and update MQTT settings if they don't match config"""
# Get the base hostname (everything before the dash)
hostname_base = name.split('-')[0] if '-' in name else name
mqtt_fields = {
"Host": mqtt_config.get('Host', ''),
"Port": mqtt_config.get('Port', 1883),
"User": mqtt_config.get('User', ''),
"Password": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
device_mqtt = mqtt_status.get('MqttHost', {})
changes_needed = []
force_password_update = False
# Check each MQTT setting
if device_mqtt.get('Host') != mqtt_fields['Host']:
changes_needed.append(('MqttHost', mqtt_fields['Host']))
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
force_password_update = True
if device_mqtt.get('Port') != mqtt_fields['Port']:
changes_needed.append(('MqttPort', mqtt_fields['Port']))
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
force_password_update = True
if device_mqtt.get('User') != mqtt_fields['User']:
changes_needed.append(('MqttUser', mqtt_fields['User']))
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
force_password_update = True
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
changes_needed.append(('Topic', mqtt_fields['Topic']))
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
force_password_update = True
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
force_password_update = True
# Add password update if any MQTT setting changed or user was updated
if force_password_update:
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
self.logger.debug(f"{name}: MQTT Password will be updated")
# Check NoRetain setting - FIXED: Use the actual value from config with default of False
no_retain = mqtt_config.get('NoRetain', False)
if no_retain:
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
else:
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
# Apply changes if needed
for setting, value in changes_needed:
try:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.debug(f"{name}: Updated {setting} to {value}")
else:
self.logger.debug(f"{name}: Updated MQTT Password")
else:
self.logger.error(f"{name}: Failed to update {setting}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
return len(changes_needed) > 0
for device in devices:
if not isinstance(device, dict):
self.logger.warning(f"Skipping invalid device entry: {device}")
continue
name = device.get('name', 'Unknown')
ip = device.get('ip')
mac = device.get('mac')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Checking device: {name} at {ip}")
try:
# Get Status 2 for firmware version
url_status = f"http://{ip}/cm?cmnd=Status%202"
response = requests.get(url_status, timeout=5)
status_data = response.json()
# Get Status 5 for network info
url_network = f"http://{ip}/cm?cmnd=Status%205"
response = requests.get(url_network, timeout=5)
network_data = response.json()
# Get Status 6 for MQTT info
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
response = requests.get(url_mqtt, timeout=5)
mqtt_data = response.json()
# Check and update MQTT settings if needed
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
# Set console parameters from config
console_updated = False
console_params = mqtt_config.get('console', {})
if console_params:
self.logger.info(f"{name}: Setting console parameters from configuration")
# Special handling for ButtonRetain - need to send "On" first, then "Off" to clear MQTT broker retain settings
try:
# First ButtonRetain command (On)
url = f"http://{ip}/cm?cmnd=ButtonRetain%20On"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set ButtonRetain to On (step 1 of 2 to clear MQTT broker retain settings)")
console_updated = True
else:
self.logger.error(f"{name}: Failed to set ButtonRetain to On")
# Small delay to ensure commands are processed in order
time.sleep(0.5)
# Second ButtonRetain command (Off)
url = f"http://{ip}/cm?cmnd=ButtonRetain%20Off"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set ButtonRetain to Off (step 2 of 2 to clear MQTT broker retain settings)")
console_updated = True
else:
self.logger.error(f"{name}: Failed to set ButtonRetain to Off")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error setting ButtonRetain commands: {str(e)}")
# Process all other console parameters
# Track rules that need to be enabled
rules_to_enable = {}
for param, value in console_params.items():
# Skip ButtonRetain as it's handled specially above
if param == "ButtonRetain":
continue
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# Store the rule number for later enabling
rule_num = param[-1]
rules_to_enable[rule_num] = True
self.logger.debug(f"{name}: Detected rule definition {param}, will auto-enable")
# Skip Rule1, Rule2, etc. if we're auto-enabling rules
if param.lower().startswith('rule') and param.lower() != param and param[-1].isdigit():
# If this is in the config, we'll respect it, but log that it's not needed
self.logger.debug(f"{name}: Note: {param} is not needed with auto-enable feature")
try:
url = f"http://{ip}/cm?cmnd={param}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{name}: Set console parameter {param} to {value}")
console_updated = True
else:
self.logger.error(f"{name}: Failed to set console parameter {param}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error setting console parameter {param}: {str(e)}")
# Auto-enable any rules that were defined
for rule_num in rules_to_enable:
rule_enable_param = f"Rule{rule_num}"
# Skip if the rule enable command was already in the config
if any(p.lower() == rule_enable_param.lower() for p in console_params):
continue
try:
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{name}: Auto-enabled {rule_enable_param}")
console_updated = True
else:
self.logger.error(f"{name}: Failed to auto-enable {rule_enable_param}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error auto-enabling {rule_enable_param}: {str(e)}")
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
"mqtt_status": "Updated" if mqtt_updated else "Verified",
"console_status": "Updated" if console_updated else "Verified",
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "online"
}
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": "Unknown",
"status": "offline",
"error": str(e)
}
device_details.append(device_detail)
time.sleep(0.5)
# Save all device details at once
try:
with open('TasmotaDevices.json', 'w') as f:
json.dump(device_details, f, indent=2)
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
except Exception as e:
self.logger.error(f"Error saving device details: {e}")
def main():
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
parser.add_argument('--config', default='network_configuration.json',
help='Path to configuration file')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--skip-unifi', action='store_true',
help='Skip UniFi discovery and use existing current.json')
parser.add_argument('--process-unknown', action='store_true',
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
print("Starting Tasmota Device Discovery and Version Check...")
# Create TasmotaDiscovery instance
discovery = TasmotaDiscovery(debug=args.debug)
discovery.load_config(args.config)
try:
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
else:
print("Skipping UniFi discovery, using existing current.json...")
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
if args.process_unknown:
print("\nStep 3: Processing unknown devices...")
discovery.process_unknown_devices()
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
except ConnectionError as e:
print(f"Connection Error: {str(e)}")
print("\nTrying to proceed with existing current.json...")
try:
discovery.get_device_details(use_current_json=True)
print("\nSuccessfully retrieved device details from existing current.json")
except Exception as inner_e:
print(f"Error processing existing devices: {str(inner_e)}")
return 1
except Exception as e:
print(f"Error: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()