663 lines
30 KiB
Python
663 lines
30 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
import requests
|
|
from urllib3.exceptions import InsecureRequestWarning
|
|
import re # Import the regular expression module
|
|
import telnetlib
|
|
import time
|
|
import argparse
|
|
|
|
# Disable SSL warnings
|
|
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
|
|
|
class UnifiClient:
|
|
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.site_id = site_id
|
|
self.session = requests.Session()
|
|
self.session.verify = verify_ssl
|
|
|
|
# Initialize cookie jar
|
|
self.session.cookies.clear()
|
|
|
|
def _login(self) -> requests.Response: # Changed return type annotation
|
|
"""Authenticate with the UniFi Controller."""
|
|
login_url = f"{self.base_url}/api/auth/login"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
payload = {
|
|
"username": self.username,
|
|
"password": self.password,
|
|
"remember": False
|
|
}
|
|
try:
|
|
response = self.session.post(
|
|
login_url,
|
|
json=payload,
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
if 'X-CSRF-Token' in response.headers:
|
|
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
|
|
return response # Return the response object
|
|
except requests.exceptions.RequestException as e:
|
|
if hasattr(e, 'response') and e.response.status_code == 401:
|
|
raise Exception("Authentication failed. Please verify your username and password.") from e
|
|
raise
|
|
|
|
def get_clients(self) -> list:
|
|
"""Get all clients from the UniFi Controller."""
|
|
# Try the newer API endpoint first
|
|
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If the newer endpoint fails, try the legacy endpoint
|
|
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
|
|
try:
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
except requests.exceptions.RequestException as e:
|
|
# If both fail, try the v2 API endpoint
|
|
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
|
|
response = self.session.get(url)
|
|
response.raise_for_status()
|
|
return response.json().get('data', [])
|
|
|
|
class TasmotaDiscovery:
|
|
def __init__(self, debug: bool = False):
|
|
"""Initialize the TasmotaDiscovery with optional debug mode."""
|
|
log_level = logging.DEBUG if debug else logging.INFO
|
|
logging.basicConfig(
|
|
level=log_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
self.config = None
|
|
self.unifi_client = None
|
|
|
|
def load_config(self, config_path: Optional[str] = None) -> dict:
|
|
"""Load configuration from JSON file."""
|
|
if config_path is None:
|
|
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
|
|
|
self.logger.debug(f"Loading configuration from: {config_path}")
|
|
try:
|
|
with open(config_path, 'r') as config_file:
|
|
self.config = json.load(config_file)
|
|
self.logger.debug("Configuration loaded successfully from %s", config_path)
|
|
return self.config
|
|
except FileNotFoundError:
|
|
self.logger.error(f"Configuration file not found at {config_path}")
|
|
sys.exit(1)
|
|
except json.JSONDecodeError:
|
|
self.logger.error("Invalid JSON in configuration file")
|
|
sys.exit(1)
|
|
|
|
def setup_unifi_client(self):
|
|
"""Set up the UniFi client with better error handling"""
|
|
self.logger.debug("Setting up UniFi client")
|
|
|
|
if not self.config or 'unifi' not in self.config:
|
|
raise ValueError("Missing UniFi configuration")
|
|
|
|
unifi_config = self.config['unifi']
|
|
required_fields = ['host', 'username', 'password', 'site']
|
|
missing_fields = [field for field in required_fields if field not in unifi_config]
|
|
|
|
if missing_fields:
|
|
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
|
|
|
|
try:
|
|
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
|
|
self.unifi_client = UnifiClient(
|
|
base_url=unifi_config['host'],
|
|
username=unifi_config['username'],
|
|
password=unifi_config['password'],
|
|
site_id=unifi_config['site'],
|
|
verify_ssl=False # Add this if using self-signed certificates
|
|
)
|
|
|
|
# Test the connection by making a simple request
|
|
response = self.unifi_client._login()
|
|
if not response:
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
|
|
|
|
self.logger.debug("UniFi client setup successful")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error setting up UniFi client: {str(e)}")
|
|
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
|
|
|
|
def is_tasmota_device(self, device: dict) -> bool:
|
|
"""Determine if a device is a Tasmota device."""
|
|
name = device.get('name', '').lower()
|
|
hostname = device.get('hostname', '').lower()
|
|
ip = device.get('ip', '')
|
|
|
|
# Check if device is in the configured NoT network
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
for network in network_filters.values():
|
|
if ip.startswith(network['subnet']):
|
|
self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}")
|
|
|
|
# First check exclusion patterns
|
|
exclude_patterns = network.get('exclude_patterns', [])
|
|
for pattern in exclude_patterns:
|
|
pattern = pattern.lower()
|
|
# Convert glob pattern to regex pattern
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
|
|
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
|
|
return False
|
|
|
|
# If not excluded, check if it's a Tasmota device
|
|
matches = any([
|
|
name.startswith('tasmota'),
|
|
name.startswith('sonoff'),
|
|
name.endswith('-ts'),
|
|
hostname.startswith('tasmota'),
|
|
hostname.startswith('sonoff'),
|
|
hostname.startswith('esp-'),
|
|
any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch'])
|
|
])
|
|
if matches:
|
|
self.logger.debug(f"Found Tasmota device: {name}")
|
|
return True # Consider all non-excluded devices in NoT network as potential Tasmota devices
|
|
|
|
return False
|
|
|
|
def get_tasmota_devices(self) -> list:
|
|
"""Query UniFi controller and filter Tasmota devices."""
|
|
devices = []
|
|
self.logger.debug("Querying UniFi controller for devices")
|
|
try:
|
|
all_clients = self.unifi_client.get_clients()
|
|
self.logger.debug(f"Found {len(all_clients)} total devices")
|
|
|
|
for device in all_clients:
|
|
if self.is_tasmota_device(device):
|
|
device_info = {
|
|
"name": device.get('name', device.get('hostname', 'Unknown')),
|
|
"ip": device.get('ip', ''),
|
|
"mac": device.get('mac', ''),
|
|
"last_seen": device.get('last_seen', ''),
|
|
"hostname": device.get('hostname', ''),
|
|
"notes": device.get('note', ''),
|
|
}
|
|
devices.append(device_info)
|
|
|
|
self.logger.debug(f"Found {len(devices)} Tasmota devices")
|
|
return devices
|
|
except Exception as e:
|
|
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
|
return []
|
|
|
|
def save_tasmota_config(self, devices: list) -> None:
|
|
"""Save Tasmota device information to a JSON file with device tracking."""
|
|
filename = "current.json"
|
|
self.logger.debug(f"Saving Tasmota configuration to {filename}")
|
|
deprecated_filename = "deprecated.json"
|
|
|
|
current_devices = []
|
|
deprecated_devices = []
|
|
|
|
# Load existing devices if file exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
with open(filename, 'r') as f:
|
|
existing_config = json.load(f)
|
|
current_devices = existing_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {filename}, treating as empty")
|
|
current_devices = []
|
|
|
|
# Load deprecated devices if file exists
|
|
if os.path.exists(deprecated_filename):
|
|
try:
|
|
with open(deprecated_filename, 'r') as f:
|
|
deprecated_config = json.load(f)
|
|
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
|
|
deprecated_devices = []
|
|
|
|
# Create new config
|
|
new_devices = []
|
|
moved_to_deprecated = []
|
|
restored_from_deprecated = []
|
|
removed_from_deprecated = []
|
|
excluded_devices = []
|
|
|
|
# Check for excluded devices in current and deprecated lists
|
|
network_filters = self.config['unifi'].get('network_filter', {})
|
|
exclude_patterns = []
|
|
for network in network_filters.values():
|
|
exclude_patterns.extend(network.get('exclude_patterns', []))
|
|
|
|
# Function to check if device is excluded
|
|
def is_device_excluded(device_name: str, hostname: str = '') -> bool:
|
|
name = device_name.lower()
|
|
hostname = hostname.lower()
|
|
for pattern in exclude_patterns:
|
|
pattern = pattern.lower()
|
|
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
|
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
|
|
return True
|
|
return False
|
|
|
|
# Process current devices
|
|
for device in devices:
|
|
device_name = device['name']
|
|
device_hostname = device.get('hostname', '')
|
|
device_ip = device['ip']
|
|
device_mac = device['mac']
|
|
|
|
# Check if device should be excluded
|
|
if is_device_excluded(device_name, device_hostname):
|
|
print(f"Device {device_name} excluded by pattern - skipping")
|
|
excluded_devices.append(device_name)
|
|
continue
|
|
|
|
# Check in current devices
|
|
existing_device = next((d for d in current_devices
|
|
if d['name'] == device_name), None)
|
|
|
|
if existing_device:
|
|
# Device exists, check if IP or MAC changed
|
|
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
|
|
moved_to_deprecated.append(existing_device)
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
|
|
else:
|
|
new_devices.append(existing_device) # Keep existing device
|
|
else:
|
|
# New device, check if it was in deprecated
|
|
deprecated_device = next((d for d in deprecated_devices
|
|
if d['name'] == device_name), None)
|
|
if deprecated_device:
|
|
removed_from_deprecated.append(device_name)
|
|
print(f"Device {device_name} removed from deprecated (restored)")
|
|
new_devices.append(device)
|
|
print(f"Device {device_name} added to output file")
|
|
|
|
# Find devices that are no longer present
|
|
current_names = {d['name'] for d in devices}
|
|
for existing_device in current_devices:
|
|
if existing_device['name'] not in current_names:
|
|
if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')):
|
|
moved_to_deprecated.append(existing_device)
|
|
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
|
|
|
|
# Update deprecated devices list, excluding any excluded devices
|
|
final_deprecated = []
|
|
for device in deprecated_devices:
|
|
if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')):
|
|
final_deprecated.append(device)
|
|
elif is_device_excluded(device['name'], device.get('hostname', '')):
|
|
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
|
|
|
|
final_deprecated.extend(moved_to_deprecated)
|
|
|
|
# Save new configuration
|
|
config = {
|
|
"tasmota": {
|
|
"devices": new_devices,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(new_devices)
|
|
}
|
|
}
|
|
|
|
# Save deprecated configuration
|
|
deprecated_config = {
|
|
"tasmota": {
|
|
"devices": final_deprecated,
|
|
"generated_at": datetime.now().isoformat(),
|
|
"total_devices": len(final_deprecated)
|
|
}
|
|
}
|
|
|
|
# Backup existing file if it exists
|
|
if os.path.exists(filename):
|
|
try:
|
|
backup_name = f"{filename}.backup"
|
|
os.rename(filename, backup_name)
|
|
self.logger.info(f"Created backup of existing configuration as {backup_name}")
|
|
except Exception as e:
|
|
self.logger.error(f"Error creating backup: {e}")
|
|
|
|
# Save files
|
|
try:
|
|
with open(filename, 'w') as f:
|
|
json.dump(config, f, indent=4)
|
|
with open(deprecated_filename, 'w') as f:
|
|
json.dump(deprecated_config, f, indent=4)
|
|
|
|
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
|
|
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
|
|
|
|
print("\nDevice Status Summary:")
|
|
if excluded_devices:
|
|
print("\nExcluded Devices:")
|
|
for name in excluded_devices:
|
|
print(f"- {name}")
|
|
|
|
if moved_to_deprecated:
|
|
print("\nMoved to deprecated:")
|
|
for device in moved_to_deprecated:
|
|
print(f"- {device['name']}")
|
|
|
|
if removed_from_deprecated:
|
|
print("\nRestored from deprecated:")
|
|
for name in removed_from_deprecated:
|
|
print(f"- {name}")
|
|
|
|
print("\nCurrent Tasmota Devices:")
|
|
for device in new_devices:
|
|
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving configuration: {e}")
|
|
|
|
def get_device_details(self, use_current_json=True):
|
|
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings"""
|
|
self.logger.info("Starting to gather detailed device information...")
|
|
device_details = []
|
|
|
|
try:
|
|
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
|
with open(source_file, 'r') as f:
|
|
data = json.load(f)
|
|
devices = data.get('tasmota', {}).get('devices', [])
|
|
self.logger.debug(f"Loaded {len(devices)} devices from {source_file}")
|
|
except FileNotFoundError:
|
|
self.logger.error(f"{source_file} not found. Run discovery first.")
|
|
return
|
|
except json.JSONDecodeError:
|
|
self.logger.error(f"Invalid JSON format in {source_file}")
|
|
return
|
|
|
|
mqtt_config = self.config.get('mqtt', {})
|
|
if not mqtt_config:
|
|
self.logger.error("MQTT configuration missing from config file")
|
|
return
|
|
|
|
def check_mqtt_settings(ip, name, mqtt_status):
|
|
"""Check and update MQTT settings if they don't match config"""
|
|
# Get the base hostname (everything before the dash)
|
|
hostname_base = name.split('-')[0] if '-' in name else name
|
|
|
|
mqtt_fields = {
|
|
"Host": mqtt_config.get('Host', ''),
|
|
"Port": mqtt_config.get('Port', 1883),
|
|
"User": mqtt_config.get('User', ''),
|
|
"Password": mqtt_config.get('Password', ''),
|
|
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
|
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
|
}
|
|
|
|
device_mqtt = mqtt_status.get('MqttHost', {})
|
|
changes_needed = []
|
|
force_password_update = False
|
|
|
|
# Check each MQTT setting
|
|
if device_mqtt.get('Host') != mqtt_fields['Host']:
|
|
changes_needed.append(('MqttHost', mqtt_fields['Host']))
|
|
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Port') != mqtt_fields['Port']:
|
|
changes_needed.append(('MqttPort', mqtt_fields['Port']))
|
|
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('User') != mqtt_fields['User']:
|
|
changes_needed.append(('MqttUser', mqtt_fields['User']))
|
|
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
|
|
changes_needed.append(('Topic', mqtt_fields['Topic']))
|
|
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
|
|
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
|
|
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
|
|
force_password_update = True
|
|
|
|
# Add password update if any MQTT setting changed or user was updated
|
|
if force_password_update:
|
|
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
|
|
self.logger.debug(f"{name}: MQTT Password will be updated")
|
|
|
|
# Check NoRetain setting
|
|
if mqtt_config.get('NoRetain', True):
|
|
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
|
|
|
|
# Apply changes if needed
|
|
for setting, value in changes_needed:
|
|
try:
|
|
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.debug(f"{name}: Updated {setting} to {value}")
|
|
else:
|
|
self.logger.debug(f"{name}: Updated MQTT Password")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to update {setting}")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
|
|
|
|
return len(changes_needed) > 0
|
|
|
|
for device in devices:
|
|
if not isinstance(device, dict):
|
|
self.logger.warning(f"Skipping invalid device entry: {device}")
|
|
continue
|
|
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip')
|
|
mac = device.get('mac')
|
|
|
|
if not ip:
|
|
self.logger.warning(f"Skipping device {name} - no IP address")
|
|
continue
|
|
|
|
self.logger.info(f"Checking device: {name} at {ip}")
|
|
|
|
try:
|
|
# Get Status 2 for firmware version
|
|
url_status = f"http://{ip}/cm?cmnd=Status%202"
|
|
response = requests.get(url_status, timeout=5)
|
|
status_data = response.json()
|
|
|
|
# Get Status 5 for network info
|
|
url_network = f"http://{ip}/cm?cmnd=Status%205"
|
|
response = requests.get(url_network, timeout=5)
|
|
network_data = response.json()
|
|
|
|
# Get Status 6 for MQTT info
|
|
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
|
|
response = requests.get(url_mqtt, timeout=5)
|
|
mqtt_data = response.json()
|
|
|
|
# Check and update MQTT settings if needed
|
|
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
|
|
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
|
|
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
|
|
"mqtt_status": "Updated" if mqtt_updated else "Verified",
|
|
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
|
|
"status": "online"
|
|
}
|
|
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
|
device_detail = {
|
|
"name": name,
|
|
"ip": ip,
|
|
"mac": mac,
|
|
"version": "Unknown",
|
|
"status": "offline",
|
|
"error": str(e)
|
|
}
|
|
|
|
device_details.append(device_detail)
|
|
time.sleep(0.5)
|
|
|
|
# Save all device details at once
|
|
try:
|
|
with open('TasmotaDevices.json', 'w') as f:
|
|
json.dump(device_details, f, indent=2)
|
|
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving device details: {e}")
|
|
|
|
def check_mqtt_settings(ip, name, mqtt_status):
|
|
"""Check and update MQTT settings if they don't match config"""
|
|
# Get the base hostname (everything before the dash)
|
|
hostname_base = name.split('-')[0] if '-' in name else name
|
|
|
|
mqtt_fields = {
|
|
"Host": mqtt_config.get('Host', ''),
|
|
"Port": mqtt_config.get('Port', 1883),
|
|
"User": mqtt_config.get('User', ''),
|
|
"Password": mqtt_config.get('Password', ''),
|
|
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
|
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
|
}
|
|
|
|
device_mqtt = mqtt_status.get('MqttHost', {})
|
|
changes_needed = []
|
|
force_password_update = False
|
|
|
|
# Check each MQTT setting
|
|
if device_mqtt.get('Host') != mqtt_fields['Host']:
|
|
changes_needed.append(('MqttHost', mqtt_fields['Host']))
|
|
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Port') != mqtt_fields['Port']:
|
|
changes_needed.append(('MqttPort', mqtt_fields['Port']))
|
|
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('User') != mqtt_fields['User']:
|
|
changes_needed.append(('MqttUser', mqtt_fields['User']))
|
|
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
|
|
changes_needed.append(('Topic', mqtt_fields['Topic']))
|
|
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
|
|
force_password_update = True
|
|
|
|
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
|
|
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
|
|
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
|
|
force_password_update = True
|
|
|
|
# Add password update if any MQTT setting changed or user was updated
|
|
if force_password_update:
|
|
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
|
|
self.logger.debug(f"{name}: MQTT Password will be updated")
|
|
|
|
# Check NoRetain setting
|
|
if mqtt_config.get('NoRetain', True):
|
|
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
|
|
|
|
# Apply changes if needed
|
|
for setting, value in changes_needed:
|
|
try:
|
|
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
if setting != 'MqttPassword':
|
|
self.logger.debug(f"{name}: Updated {setting} to {value}")
|
|
else:
|
|
self.logger.debug(f"{name}: Updated MQTT Password")
|
|
else:
|
|
self.logger.error(f"{name}: Failed to update {setting}")
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
|
|
|
|
return len(changes_needed) > 0
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
|
parser.add_argument('--config', default='network_configuration.json',
|
|
help='Path to configuration file')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug logging')
|
|
parser.add_argument('--skip-unifi', action='store_true',
|
|
help='Skip UniFi discovery and use existing current.json')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set up logging
|
|
log_level = logging.DEBUG if args.debug else logging.INFO
|
|
logging.basicConfig(level=log_level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
print("Starting Tasmota Device Discovery and Version Check...")
|
|
|
|
# Create TasmotaDiscovery instance
|
|
discovery = TasmotaDiscovery(debug=args.debug)
|
|
discovery.load_config(args.config)
|
|
|
|
try:
|
|
if not args.skip_unifi:
|
|
print("Step 1: Discovering Tasmota devices...")
|
|
discovery.setup_unifi_client()
|
|
tasmota_devices = discovery.get_tasmota_devices()
|
|
discovery.save_tasmota_config(tasmota_devices)
|
|
else:
|
|
print("Skipping UniFi discovery, using existing current.json...")
|
|
|
|
print("\nStep 2: Getting detailed version information...")
|
|
discovery.get_device_details(use_current_json=True)
|
|
|
|
print("\nProcess completed successfully!")
|
|
print("- Device list saved to: current.json")
|
|
print("- Detailed information saved to: TasmotaDevices.json")
|
|
|
|
except ConnectionError as e:
|
|
print(f"Connection Error: {str(e)}")
|
|
print("\nTrying to proceed with existing current.json...")
|
|
try:
|
|
discovery.get_device_details(use_current_json=True)
|
|
print("\nSuccessfully retrieved device details from existing current.json")
|
|
except Exception as inner_e:
|
|
print(f"Error processing existing devices: {str(inner_e)}")
|
|
return 1
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
if args.debug:
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
return 0
|
|
|
|
if __name__ == '__main__':
|
|
main() |