Compare commits
No commits in common. "master" and "AuthenticationError" have entirely different histories.
master
...
Authentica
@ -4,7 +4,6 @@ import argparse
|
|||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
|
|
||||||
from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern
|
from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern
|
||||||
from unifi_client import UnifiClient, AuthenticationError
|
from unifi_client import UnifiClient, AuthenticationError
|
||||||
@ -13,7 +12,6 @@ from configuration import ConfigurationManager
|
|||||||
from console_settings import ConsoleSettingsManager
|
from console_settings import ConsoleSettingsManager
|
||||||
from unknown_devices import UnknownDeviceProcessor
|
from unknown_devices import UnknownDeviceProcessor
|
||||||
from reporting import ReportGenerator
|
from reporting import ReportGenerator
|
||||||
from device_diff import DeviceComparison
|
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(debug: bool = False) -> logging.Logger:
|
def setup_logging(debug: bool = False) -> logging.Logger:
|
||||||
@ -96,69 +94,53 @@ def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiCl
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def process_single_device(device: dict, config_manager: ConfigurationManager,
|
def process_devices(devices: list, config_manager: ConfigurationManager,
|
||||||
console_manager: ConsoleSettingsManager,
|
console_manager: ConsoleSettingsManager, logger: logging.Logger):
|
||||||
logger: logging.Logger,
|
|
||||||
report_gen: 'ReportGenerator') -> tuple:
|
|
||||||
"""
|
"""
|
||||||
Process a single device for configuration.
|
Process all devices for configuration.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
device: Device to process
|
devices: List of devices to process
|
||||||
config_manager: Configuration manager instance
|
config_manager: Configuration manager instance
|
||||||
console_manager: Console settings manager instance
|
console_manager: Console settings manager instance
|
||||||
logger: Logger instance
|
logger: Logger instance
|
||||||
report_gen: Report generator for error collection
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Tuple of (device_info, success, messages)
|
|
||||||
"""
|
"""
|
||||||
|
device_details_list = []
|
||||||
|
stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0}
|
||||||
|
|
||||||
|
for device in devices:
|
||||||
device_name = device.get('name', 'Unknown')
|
device_name = device.get('name', 'Unknown')
|
||||||
device_ip = device.get('ip', '')
|
device_ip = device.get('ip', '')
|
||||||
messages = []
|
|
||||||
|
logger.info(f"\nProcessing: {device_name} ({device_ip})")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
logger.debug(f"Processing: {device_name} ({device_ip})")
|
|
||||||
|
|
||||||
# Get device details
|
# Get device details
|
||||||
device_details = config_manager.get_device_details(device_ip, device_name)
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
||||||
|
|
||||||
if not device_details:
|
if not device_details:
|
||||||
messages.append(f" {device_name}: Could not get device details, skipping")
|
logger.warning(f"{device_name}: Could not get device details, skipping")
|
||||||
report_gen.add_error(device_name, f"Connection failed to {device_ip}")
|
stats['failed'] += 1
|
||||||
return None, False, messages
|
continue
|
||||||
|
|
||||||
# Check and update template
|
# Check and update template
|
||||||
template_success = config_manager.check_and_update_template(device, device_details)
|
template_success = config_manager.check_and_update_template(device, device_details)
|
||||||
|
|
||||||
if template_success:
|
|
||||||
logger.debug(f" {device_name}: Template checked/updated")
|
|
||||||
# Refresh device details after template update
|
# Refresh device details after template update
|
||||||
|
if template_success:
|
||||||
device_details = config_manager.get_device_details(device_ip, device_name)
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
||||||
|
|
||||||
# Configure MQTT
|
# Configure MQTT
|
||||||
mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details)
|
mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details)
|
||||||
|
|
||||||
if mqtt_success:
|
if mqtt_success and mqtt_status == "Updated":
|
||||||
if mqtt_status == "Updated":
|
stats['mqtt_updated'] += 1
|
||||||
logger.debug(f" {device_name}: MQTT updated")
|
|
||||||
else:
|
|
||||||
logger.debug(f" {device_name}: MQTT already configured")
|
|
||||||
else:
|
|
||||||
messages.append(f" {device_name}: MQTT configuration failed - {mqtt_status}")
|
|
||||||
report_gen.add_error(device_name, f"MQTT configuration failed: {mqtt_status}")
|
|
||||||
|
|
||||||
# Apply console settings
|
# Apply console settings
|
||||||
console_success, console_status = console_manager.apply_console_settings(device, device_details)
|
console_success, console_status = console_manager.apply_console_settings(device, device_details)
|
||||||
|
|
||||||
if console_success:
|
if console_success and console_status == "Applied":
|
||||||
if console_status == "Applied":
|
stats['console_updated'] += 1
|
||||||
logger.debug(f" {device_name}: Console settings applied")
|
|
||||||
elif console_status != "No console settings" and console_status != "Empty console set":
|
|
||||||
logger.debug(f" {device_name}: Console settings - {console_status}")
|
|
||||||
else:
|
|
||||||
messages.append(f" {device_name}: Console settings failed - {console_status}")
|
|
||||||
report_gen.add_warning(device_name, f"Console settings failed: {console_status}")
|
|
||||||
|
|
||||||
# Save device details
|
# Save device details
|
||||||
device_info = {
|
device_info = {
|
||||||
@ -167,83 +149,17 @@ def process_single_device(device: dict, config_manager: ConfigurationManager,
|
|||||||
'console_status': console_status,
|
'console_status': console_status,
|
||||||
'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown')
|
'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown')
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.debug(f" {device_name}: ✓ Processing completed successfully")
|
|
||||||
return device_info, True, messages
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
messages.append(f" {device_name}: ✗ Error during processing: {e}")
|
|
||||||
report_gen.add_error(device_name, f"Unexpected error: {str(e)}")
|
|
||||||
return None, False, messages
|
|
||||||
|
|
||||||
|
|
||||||
def process_devices(devices: list, config_manager: ConfigurationManager,
|
|
||||||
console_manager: ConsoleSettingsManager, logger: logging.Logger,
|
|
||||||
report_gen: 'ReportGenerator', max_workers: int = 10):
|
|
||||||
"""
|
|
||||||
Process all devices for configuration in parallel.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
devices: List of devices to process
|
|
||||||
config_manager: Configuration manager instance
|
|
||||||
console_manager: Console settings manager instance
|
|
||||||
logger: Logger instance
|
|
||||||
report_gen: Report generator for error collection
|
|
||||||
max_workers: Maximum number of parallel workers (default: 10)
|
|
||||||
"""
|
|
||||||
device_details_list = []
|
|
||||||
stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0}
|
|
||||||
all_messages = []
|
|
||||||
|
|
||||||
logger.info(f"\nProcessing {len(devices)} devices in parallel (max {max_workers} workers)...")
|
|
||||||
|
|
||||||
# Process devices in parallel using ThreadPoolExecutor
|
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
||||||
# Submit all device processing tasks
|
|
||||||
future_to_device = {
|
|
||||||
executor.submit(process_single_device, device, config_manager,
|
|
||||||
console_manager, logger, report_gen): device
|
|
||||||
for device in devices
|
|
||||||
}
|
|
||||||
|
|
||||||
# Collect results as they complete
|
|
||||||
for future in as_completed(future_to_device):
|
|
||||||
device = future_to_device[future]
|
|
||||||
device_name = device.get('name', 'Unknown')
|
|
||||||
|
|
||||||
try:
|
|
||||||
device_info, success, messages = future.result()
|
|
||||||
|
|
||||||
# Store messages for reporting (only errors/warnings)
|
|
||||||
all_messages.extend(messages)
|
|
||||||
|
|
||||||
if success and device_info:
|
|
||||||
device_details_list.append(device_info)
|
device_details_list.append(device_info)
|
||||||
|
|
||||||
stats['processed'] += 1
|
stats['processed'] += 1
|
||||||
|
|
||||||
# Track MQTT and console updates
|
|
||||||
if device_info.get('mqtt_status') == 'Updated':
|
|
||||||
stats['mqtt_updated'] += 1
|
|
||||||
if device_info.get('console_status') == 'Applied':
|
|
||||||
stats['console_updated'] += 1
|
|
||||||
else:
|
|
||||||
stats['failed'] += 1
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
all_messages.append(f"{device_name}: Unexpected error: {e}")
|
logger.error(f"{device_name}: Error during processing: {e}")
|
||||||
report_gen.add_error(device_name, f"Processing exception: {str(e)}")
|
|
||||||
stats['failed'] += 1
|
stats['failed'] += 1
|
||||||
|
|
||||||
# Print only error/warning messages (messages list now only contains issues)
|
|
||||||
if all_messages:
|
|
||||||
logger.info("\n" + "=" * 60)
|
|
||||||
logger.info("PROCESSING ISSUES")
|
|
||||||
logger.info("=" * 60)
|
|
||||||
for message in all_messages:
|
|
||||||
logger.info(message)
|
|
||||||
|
|
||||||
return device_details_list, stats
|
return device_details_list, stats
|
||||||
|
|
||||||
|
|
||||||
def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]:
|
def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]:
|
||||||
"""
|
"""
|
||||||
Find a device by IP address or hostname.
|
Find a device by IP address or hostname.
|
||||||
@ -308,14 +224,12 @@ def main():
|
|||||||
help='Generate UniFi hostname comparison report')
|
help='Generate UniFi hostname comparison report')
|
||||||
parser.add_argument('--Device', type=str,
|
parser.add_argument('--Device', type=str,
|
||||||
help='Process single device by IP or hostname')
|
help='Process single device by IP or hostname')
|
||||||
parser.add_argument('--diff', nargs='+', metavar='DEVICE',
|
|
||||||
help='Compare devices: --diff DEVICE (vs config) or --diff DEVICE1 DEVICE2 (vs each other)')
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
# Setup logging
|
# Setup logging
|
||||||
logger = setup_logging(args.debug)
|
logger = setup_logging(args.debug)
|
||||||
logger.info("TasmotaManager v2.1 starting")
|
logger.info("TasmotaManager v2.0 starting")
|
||||||
|
|
||||||
# Ensure data directory exists
|
# Ensure data directory exists
|
||||||
ensure_data_directory()
|
ensure_data_directory()
|
||||||
@ -337,56 +251,6 @@ def main():
|
|||||||
unknown_processor = UnknownDeviceProcessor(config, config_manager, logger)
|
unknown_processor = UnknownDeviceProcessor(config, config_manager, logger)
|
||||||
report_gen = ReportGenerator(config, discovery, logger)
|
report_gen = ReportGenerator(config, discovery, logger)
|
||||||
|
|
||||||
# Handle device comparison mode
|
|
||||||
if args.diff:
|
|
||||||
if len(args.diff) < 1 or len(args.diff) > 2:
|
|
||||||
logger.error("--diff requires 1 or 2 device arguments")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
device_comp = DeviceComparison(config, logger)
|
|
||||||
|
|
||||||
# Get devices list to resolve names/IPs
|
|
||||||
devices = discovery.get_tasmota_devices()
|
|
||||||
|
|
||||||
# Find device 1
|
|
||||||
device1 = find_device_by_identifier(devices, args.diff[0], logger)
|
|
||||||
if not device1:
|
|
||||||
logger.error(f"Device not found: {args.diff[0]}")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
if len(args.diff) == 1:
|
|
||||||
# Single device: compare to config file
|
|
||||||
# Get device type from Status
|
|
||||||
from utils import send_tasmota_command
|
|
||||||
result, success = send_tasmota_command(device1['ip'], "Status%200", timeout=10, logger=logger)
|
|
||||||
if not success or not result:
|
|
||||||
logger.error(f"Failed to get device status for {device1['name']}")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
device_type = result.get('Status', {}).get('DeviceName', '')
|
|
||||||
if not device_type:
|
|
||||||
logger.error(f"Could not determine device type for {device1['name']}")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
comparison = device_comp.compare_device_to_config(
|
|
||||||
device1['ip'], device1['name'], device_type
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
# Two devices: compare to each other
|
|
||||||
device2 = find_device_by_identifier(devices, args.diff[1], logger)
|
|
||||||
if not device2:
|
|
||||||
logger.error(f"Device 2 not found: {args.diff[1]}")
|
|
||||||
return 1
|
|
||||||
|
|
||||||
comparison = device_comp.compare_devices(
|
|
||||||
device1['ip'], device1['name'],
|
|
||||||
device2['ip'], device2['name']
|
|
||||||
)
|
|
||||||
|
|
||||||
# Print report
|
|
||||||
device_comp.print_comparison_report(comparison)
|
|
||||||
return 0
|
|
||||||
|
|
||||||
# Handle hostname report mode
|
# Handle hostname report mode
|
||||||
if args.unifi_hostname_report:
|
if args.unifi_hostname_report:
|
||||||
report_gen.generate_unifi_hostname_report()
|
report_gen.generate_unifi_hostname_report()
|
||||||
@ -422,7 +286,7 @@ def main():
|
|||||||
|
|
||||||
# Process all devices
|
# Process all devices
|
||||||
logger.info(f"\nProcessing {len(devices)} devices...")
|
logger.info(f"\nProcessing {len(devices)} devices...")
|
||||||
device_details_list, stats = process_devices(devices, config_manager, console_manager, logger, report_gen)
|
device_details_list, stats = process_devices(devices, config_manager, console_manager, logger)
|
||||||
|
|
||||||
# Save device details
|
# Save device details
|
||||||
report_gen.save_device_details(device_details_list)
|
report_gen.save_device_details(device_details_list)
|
||||||
@ -437,9 +301,6 @@ def main():
|
|||||||
|
|
||||||
console_manager.print_failure_summary()
|
console_manager.print_failure_summary()
|
||||||
|
|
||||||
# Print errors and warnings summary
|
|
||||||
report_gen.print_errors_and_warnings_summary()
|
|
||||||
|
|
||||||
logger.info("TasmotaManager completed")
|
logger.info("TasmotaManager completed")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|||||||
@ -128,14 +128,13 @@ class ConfigurationManager:
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def configure_mqtt_settings(self, device: dict, device_details: dict, force_password_update: bool = False) -> Tuple[bool, str]:
|
def configure_mqtt_settings(self, device: dict, device_details: dict) -> Tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
Configure MQTT settings on a device.
|
Configure MQTT settings on a device.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
device: Device info dictionary
|
device: Device info dictionary
|
||||||
device_details: Detailed device information
|
device_details: Detailed device information
|
||||||
force_password_update: Force update of password even if user hasn't changed
|
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of (success, status_message)
|
Tuple of (success, status_message)
|
||||||
@ -155,9 +154,6 @@ class ConfigurationManager:
|
|||||||
# Get current MQTT settings
|
# Get current MQTT settings
|
||||||
current_mqtt = device_details.get('StatusMQT', {})
|
current_mqtt = device_details.get('StatusMQT', {})
|
||||||
|
|
||||||
# Log current MQTT state for debugging
|
|
||||||
self.logger.debug(f"{device_name}: Current MQTT settings from device: {current_mqtt}")
|
|
||||||
|
|
||||||
# Check if MQTT needs to be enabled
|
# Check if MQTT needs to be enabled
|
||||||
mqtt_enabled = current_mqtt.get('MqttHost', '') != ''
|
mqtt_enabled = current_mqtt.get('MqttHost', '') != ''
|
||||||
|
|
||||||
@ -172,83 +168,39 @@ class ConfigurationManager:
|
|||||||
|
|
||||||
# Build list of settings to update
|
# Build list of settings to update
|
||||||
updates_needed = []
|
updates_needed = []
|
||||||
password_needs_update = False
|
|
||||||
|
|
||||||
# Check each MQTT setting
|
# Check each MQTT setting
|
||||||
mqtt_host = mqtt_config.get('Host', '')
|
mqtt_host = mqtt_config.get('Host', '')
|
||||||
current_host = current_mqtt.get('MqttHost', '')
|
if mqtt_host and current_mqtt.get('MqttHost', '') != mqtt_host:
|
||||||
self.logger.debug(f"{device_name}: Comparing MqttHost: current='{current_host}' vs expected='{mqtt_host}'")
|
|
||||||
if mqtt_host and current_host != mqtt_host:
|
|
||||||
updates_needed.append(('MqttHost', mqtt_host))
|
updates_needed.append(('MqttHost', mqtt_host))
|
||||||
|
|
||||||
mqtt_port = mqtt_config.get('Port', 1883)
|
mqtt_port = mqtt_config.get('Port', 1883)
|
||||||
current_port = current_mqtt.get('MqttPort', 0)
|
if current_mqtt.get('MqttPort', 0) != mqtt_port:
|
||||||
self.logger.debug(f"{device_name}: Comparing MqttPort: current={current_port} vs expected={mqtt_port}")
|
|
||||||
if current_port != mqtt_port:
|
|
||||||
updates_needed.append(('MqttPort', mqtt_port))
|
updates_needed.append(('MqttPort', mqtt_port))
|
||||||
|
|
||||||
mqtt_user = mqtt_config.get('User', '')
|
mqtt_user = mqtt_config.get('User', '')
|
||||||
current_user = current_mqtt.get('MqttUser', '')
|
if mqtt_user and current_mqtt.get('MqttUser', '') != mqtt_user:
|
||||||
self.logger.debug(f"{device_name}: Comparing MqttUser: current='{current_user}' vs expected='{mqtt_user}'")
|
|
||||||
if mqtt_user and current_user != mqtt_user:
|
|
||||||
updates_needed.append(('MqttUser', mqtt_user))
|
updates_needed.append(('MqttUser', mqtt_user))
|
||||||
password_needs_update = True # If user changed, update password too
|
|
||||||
|
|
||||||
# Only update password if:
|
|
||||||
# 1. force_password_update is True, OR
|
|
||||||
# 2. The username is being updated (password likely needs to match)
|
|
||||||
mqtt_password = mqtt_config.get('Password', '')
|
mqtt_password = mqtt_config.get('Password', '')
|
||||||
if mqtt_password and (force_password_update or password_needs_update):
|
# Note: Can't verify password from status, so always set it
|
||||||
|
if mqtt_password:
|
||||||
updates_needed.append(('MqttPassword', mqtt_password))
|
updates_needed.append(('MqttPassword', mqtt_password))
|
||||||
self.logger.debug(f"{device_name}: Password will be updated (force={force_password_update}, user_changed={password_needs_update})")
|
|
||||||
|
|
||||||
# Handle Topic with %hostname_base% substitution
|
# Handle Topic with %hostname_base% substitution
|
||||||
# Note: Topic is not always in StatusMQT, so query it directly
|
|
||||||
mqtt_topic = mqtt_config.get('Topic', '')
|
mqtt_topic = mqtt_config.get('Topic', '')
|
||||||
if mqtt_topic:
|
if mqtt_topic:
|
||||||
mqtt_topic = mqtt_topic.replace('%hostname_base%', hostname_base)
|
mqtt_topic = mqtt_topic.replace('%hostname_base%', hostname_base)
|
||||||
|
if current_mqtt.get('Topic', '') != mqtt_topic:
|
||||||
# Query current Topic value directly
|
|
||||||
result, success = send_tasmota_command(device_ip, "Topic", timeout=5, logger=self.logger)
|
|
||||||
current_topic = result.get('Topic', '') if success and result else ''
|
|
||||||
|
|
||||||
self.logger.debug(f"{device_name}: Comparing Topic: current='{current_topic}' vs expected='{mqtt_topic}'")
|
|
||||||
if current_topic != mqtt_topic:
|
|
||||||
updates_needed.append(('Topic', mqtt_topic))
|
updates_needed.append(('Topic', mqtt_topic))
|
||||||
|
|
||||||
# Handle FullTopic
|
|
||||||
# Note: FullTopic is not always in StatusMQT, so query it directly
|
|
||||||
mqtt_full_topic = mqtt_config.get('FullTopic', '')
|
mqtt_full_topic = mqtt_config.get('FullTopic', '')
|
||||||
if mqtt_full_topic:
|
if mqtt_full_topic and current_mqtt.get('FullTopic', '') != mqtt_full_topic:
|
||||||
# Query current FullTopic value directly
|
updates_needed.append(('FullTopic', mqtt_full_topic))
|
||||||
result, success = send_tasmota_command(device_ip, "FullTopic", timeout=5, logger=self.logger)
|
|
||||||
current_full_topic = result.get('FullTopic', '') if success and result else ''
|
|
||||||
|
|
||||||
self.logger.debug(f"{device_name}: Raw FullTopic from device: '{current_full_topic}'")
|
|
||||||
|
|
||||||
# Check if device has URL-encoded spaces at the beginning (broken state)
|
|
||||||
has_leading_encoded_space = current_full_topic.startswith('%20')
|
|
||||||
|
|
||||||
# Normalize: remove any URL-encoded spaces from the beginning of current value
|
|
||||||
# This handles the case where the device returns '%20%prefix%' instead of '%prefix%'
|
|
||||||
while current_full_topic.startswith('%20'):
|
|
||||||
current_full_topic = current_full_topic[3:]
|
|
||||||
|
|
||||||
# Also normalize expected value in case config has leading spaces
|
|
||||||
mqtt_full_topic_normalized = mqtt_full_topic.lstrip()
|
|
||||||
|
|
||||||
self.logger.debug(f"{device_name}: Comparing FullTopic: current='{current_full_topic}' vs expected='{mqtt_full_topic_normalized}'")
|
|
||||||
|
|
||||||
# Update if values don't match OR if device had leading encoded space (needs fixing)
|
|
||||||
if current_full_topic != mqtt_full_topic_normalized or has_leading_encoded_space:
|
|
||||||
if has_leading_encoded_space:
|
|
||||||
self.logger.info(f"{device_name}: FullTopic has invalid leading space, will fix")
|
|
||||||
updates_needed.append(('FullTopic', mqtt_full_topic_normalized))
|
|
||||||
|
|
||||||
# Handle NoRetain (SetOption62)
|
# Handle NoRetain (SetOption62)
|
||||||
no_retain = mqtt_config.get('NoRetain', False)
|
no_retain = mqtt_config.get('NoRetain', False)
|
||||||
current_no_retain = current_mqtt.get('NoRetain', False)
|
current_no_retain = current_mqtt.get('NoRetain', False)
|
||||||
self.logger.debug(f"{device_name}: Comparing NoRetain: current={current_no_retain} vs expected={no_retain}")
|
|
||||||
if no_retain != current_no_retain:
|
if no_retain != current_no_retain:
|
||||||
updates_needed.append(('SetOption62', '1' if no_retain else '0'))
|
updates_needed.append(('SetOption62', '1' if no_retain else '0'))
|
||||||
|
|
||||||
@ -256,18 +208,12 @@ class ConfigurationManager:
|
|||||||
self.logger.debug(f"{device_name}: MQTT settings already correct")
|
self.logger.debug(f"{device_name}: MQTT settings already correct")
|
||||||
return True, "Already configured"
|
return True, "Already configured"
|
||||||
|
|
||||||
# Log what will be updated
|
# Apply updates
|
||||||
self.logger.info(f"{device_name}: Updating {len(updates_needed)} MQTT settings: {[name for name, _ in updates_needed]}")
|
self.logger.info(f"{device_name}: Updating {len(updates_needed)} MQTT settings")
|
||||||
|
|
||||||
failed_updates = []
|
failed_updates = []
|
||||||
for setting_name, setting_value in updates_needed:
|
for setting_name, setting_value in updates_needed:
|
||||||
# URL encode the value, especially important for FullTopic which contains % and /
|
command = f"{setting_name}%20{setting_value}"
|
||||||
from urllib.parse import quote
|
|
||||||
# Convert value to string and URL encode it
|
|
||||||
encoded_value = quote(str(setting_value), safe='')
|
|
||||||
command = f"{setting_name}%20{encoded_value}"
|
|
||||||
|
|
||||||
self.logger.debug(f"{device_name}: Sending command: {command}")
|
|
||||||
|
|
||||||
result, success = retry_command(
|
result, success = retry_command(
|
||||||
lambda: send_tasmota_command(device_ip, command, timeout=5, logger=self.logger),
|
lambda: send_tasmota_command(device_ip, command, timeout=5, logger=self.logger),
|
||||||
|
|||||||
@ -2,7 +2,6 @@
|
|||||||
|
|
||||||
import logging
|
import logging
|
||||||
import time
|
import time
|
||||||
import threading
|
|
||||||
from typing import Dict, List, Optional, Tuple
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
from utils import send_tasmota_command, retry_command, get_hostname_base
|
from utils import send_tasmota_command, retry_command, get_hostname_base
|
||||||
@ -22,7 +21,6 @@ class ConsoleSettingsManager:
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.logger = logger or logging.getLogger(__name__)
|
self.logger = logger or logging.getLogger(__name__)
|
||||||
self.command_failures = {} # Track failed commands by device
|
self.command_failures = {} # Track failed commands by device
|
||||||
self._lock = threading.Lock() # Thread-safe access to command_failures
|
|
||||||
|
|
||||||
def apply_console_settings(self, device: dict, device_details: dict) -> Tuple[bool, str]:
|
def apply_console_settings(self, device: dict, device_details: dict) -> Tuple[bool, str]:
|
||||||
"""
|
"""
|
||||||
@ -41,11 +39,12 @@ class ConsoleSettingsManager:
|
|||||||
if not device_ip:
|
if not device_ip:
|
||||||
return False, "No IP address"
|
return False, "No IP address"
|
||||||
|
|
||||||
# Get device type from DeviceName for template matching
|
# Get hostname base for template matching
|
||||||
device_type = device_details.get('Status', {}).get('DeviceName', '')
|
hostname = device_details.get('StatusNET', {}).get('Hostname', device_name)
|
||||||
|
hostname_base = get_hostname_base(hostname)
|
||||||
|
|
||||||
# Find which console_set to use for this device
|
# Find which console_set to use for this device
|
||||||
console_set_name = self._get_console_set_name(device_type)
|
console_set_name = self._get_console_set_name(hostname_base)
|
||||||
|
|
||||||
if not console_set_name:
|
if not console_set_name:
|
||||||
self.logger.debug(f"{device_name}: No console settings configured")
|
self.logger.debug(f"{device_name}: No console settings configured")
|
||||||
@ -74,7 +73,6 @@ class ConsoleSettingsManager:
|
|||||||
|
|
||||||
# Track failures for summary
|
# Track failures for summary
|
||||||
if failed_commands:
|
if failed_commands:
|
||||||
with self._lock:
|
|
||||||
if device_name not in self.command_failures:
|
if device_name not in self.command_failures:
|
||||||
self.command_failures[device_name] = []
|
self.command_failures[device_name] = []
|
||||||
self.command_failures[device_name].extend(failed_commands)
|
self.command_failures[device_name].extend(failed_commands)
|
||||||
@ -85,42 +83,22 @@ class ConsoleSettingsManager:
|
|||||||
self.logger.info(f"{device_name}: All console settings applied successfully")
|
self.logger.info(f"{device_name}: All console settings applied successfully")
|
||||||
return True, "Applied"
|
return True, "Applied"
|
||||||
|
|
||||||
def _get_console_set_name(self, device_type: str) -> Optional[str]:
|
def _get_console_set_name(self, hostname_base: str) -> Optional[str]:
|
||||||
"""
|
"""
|
||||||
Get the console_set name for a device based on DeviceName.
|
Get the console_set name for a device based on hostname.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
device_type: Device type/DeviceName from Status
|
hostname_base: Base hostname of device
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
str: Console set name or None
|
str: Console set name or None
|
||||||
"""
|
"""
|
||||||
device_list = self.config.get('device_list', {})
|
device_list = self.config.get('device_list', {})
|
||||||
|
|
||||||
# First try exact match on key
|
|
||||||
if device_type in device_list:
|
|
||||||
return device_list[device_type].get('console_set')
|
|
||||||
|
|
||||||
# Then try case-insensitive match on key
|
|
||||||
device_type_lower = device_type.lower()
|
|
||||||
for template_name, template_data in device_list.items():
|
for template_name, template_data in device_list.items():
|
||||||
if template_name.lower() == device_type_lower:
|
if hostname_base.lower() in template_name.lower():
|
||||||
return template_data.get('console_set')
|
return template_data.get('console_set')
|
||||||
|
|
||||||
# Finally, try matching against the template NAME field
|
|
||||||
# DeviceName comes from the template's NAME field
|
|
||||||
import json
|
|
||||||
for template_name, template_data in device_list.items():
|
|
||||||
template_str = template_data.get('template', '')
|
|
||||||
if template_str:
|
|
||||||
try:
|
|
||||||
template_obj = json.loads(template_str)
|
|
||||||
template_device_name = template_obj.get('NAME', '')
|
|
||||||
if template_device_name.lower() == device_type_lower:
|
|
||||||
return template_data.get('console_set')
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _get_console_commands(self, console_set_name: str) -> List[str]:
|
def _get_console_commands(self, console_set_name: str) -> List[str]:
|
||||||
@ -179,13 +157,7 @@ class ConsoleSettingsManager:
|
|||||||
|
|
||||||
time.sleep(0.5) # Brief delay between commands
|
time.sleep(0.5) # Brief delay between commands
|
||||||
|
|
||||||
# Send the actual command - properly URL encode
|
# Send the actual command
|
||||||
from urllib.parse import quote
|
|
||||||
# Split command into param and value, then URL encode the value part
|
|
||||||
parts = command.split(None, 1)
|
|
||||||
if len(parts) == 2:
|
|
||||||
escaped_command = parts[0] + '%20' + quote(parts[1], safe='')
|
|
||||||
else:
|
|
||||||
escaped_command = command.replace(' ', '%20')
|
escaped_command = command.replace(' ', '%20')
|
||||||
|
|
||||||
result, success = retry_command(
|
result, success = retry_command(
|
||||||
@ -200,33 +172,25 @@ class ConsoleSettingsManager:
|
|||||||
self.logger.error(f"{device_name}: Failed to set {param_name} after 3 attempts")
|
self.logger.error(f"{device_name}: Failed to set {param_name} after 3 attempts")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Verification disabled - it overwhelms devices with extra queries
|
# Verify the command was applied (if possible)
|
||||||
# Trust that the command was applied if no error was returned
|
if not self._verify_command(device_ip, device_name, param_name, param_value):
|
||||||
|
self.logger.warning(f"{device_name}: Verification failed for {param_name}")
|
||||||
|
# Don't return False here - some commands can't be verified
|
||||||
|
|
||||||
# Check if this is a rule definition - if so, enable it
|
# Check if this is a rule definition - if so, enable it
|
||||||
if param_name.lower().startswith('rule'):
|
if param_name.lower().startswith('rule'):
|
||||||
rule_number = param_name.lower().replace('rule', '')
|
rule_number = param_name.lower().replace('rule', '')
|
||||||
if rule_number.isdigit():
|
if rule_number.isdigit():
|
||||||
# Wait for device to process the rule before enabling
|
enable_command = f"Rule{rule_number}%201"
|
||||||
time.sleep(0.5)
|
|
||||||
|
|
||||||
# Enable the rule with ON command
|
self.logger.debug(f"{device_name}: Enabling rule{rule_number}")
|
||||||
# Note: Rule{N} 4 doesn't work reliably, use ON instead
|
|
||||||
enable_command = f"Rule{rule_number}%20ON"
|
|
||||||
|
|
||||||
self.logger.debug(f"{device_name}: Enabling rule{rule_number} (Once=OFF)")
|
result, success = send_tasmota_command(
|
||||||
|
device_ip, enable_command, timeout=5, logger=self.logger
|
||||||
result, success = retry_command(
|
|
||||||
lambda: send_tasmota_command(device_ip, enable_command, timeout=5, logger=self.logger),
|
|
||||||
max_attempts=3,
|
|
||||||
delay=1.0,
|
|
||||||
logger=self.logger,
|
|
||||||
device_name=device_name
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if not success:
|
if not success:
|
||||||
self.logger.error(f"{device_name}: Failed to enable rule{rule_number} after 3 attempts")
|
self.logger.warning(f"{device_name}: Failed to enable rule{rule_number}")
|
||||||
return False
|
|
||||||
|
|
||||||
time.sleep(0.3) # Brief delay between commands
|
time.sleep(0.3) # Brief delay between commands
|
||||||
return True
|
return True
|
||||||
@ -260,26 +224,15 @@ class ConsoleSettingsManager:
|
|||||||
return True # Can't verify, assume success
|
return True # Can't verify, assume success
|
||||||
|
|
||||||
# Check if value matches
|
# Check if value matches
|
||||||
current_value = str(result.get(param_name, ''))
|
current_value = result.get(param_name, '')
|
||||||
expected = str(expected_value)
|
|
||||||
|
|
||||||
# Normalize values for comparison (Tasmota returns ON/OFF for 1/0)
|
if str(current_value) == str(expected_value):
|
||||||
def normalize_value(val: str) -> str:
|
|
||||||
val_lower = val.lower()
|
|
||||||
if val_lower in ['on', '1', 'true']:
|
|
||||||
return '1'
|
|
||||||
elif val_lower in ['off', '0', 'false']:
|
|
||||||
return '0'
|
|
||||||
return val_lower
|
|
||||||
|
|
||||||
if normalize_value(current_value) == normalize_value(expected):
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def print_failure_summary(self):
|
def print_failure_summary(self):
|
||||||
"""Print summary of all command failures."""
|
"""Print summary of all command failures."""
|
||||||
with self._lock:
|
|
||||||
if not self.command_failures:
|
if not self.command_failures:
|
||||||
return
|
return
|
||||||
|
|
||||||
@ -287,9 +240,7 @@ class ConsoleSettingsManager:
|
|||||||
self.logger.error("COMMAND FAILURE SUMMARY")
|
self.logger.error("COMMAND FAILURE SUMMARY")
|
||||||
self.logger.error("=" * 60)
|
self.logger.error("=" * 60)
|
||||||
|
|
||||||
# Sort by device name for consistent output
|
for device_name, failed_commands in self.command_failures.items():
|
||||||
for device_name in sorted(self.command_failures.keys()):
|
|
||||||
failed_commands = self.command_failures[device_name]
|
|
||||||
self.logger.error(f"\n{device_name}:")
|
self.logger.error(f"\n{device_name}:")
|
||||||
for cmd in failed_commands:
|
for cmd in failed_commands:
|
||||||
self.logger.error(f" - {cmd}")
|
self.logger.error(f" - {cmd}")
|
||||||
|
|||||||
512
device_diff.py
512
device_diff.py
@ -1,512 +0,0 @@
|
|||||||
"""Device comparison and diagnostics module."""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from typing import Dict, List, Tuple, Optional
|
|
||||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
||||||
from utils import send_tasmota_command
|
|
||||||
|
|
||||||
|
|
||||||
class DeviceComparison:
|
|
||||||
"""Compare configuration between two Tasmota devices."""
|
|
||||||
|
|
||||||
def __init__(self, config: Optional[Dict] = None, logger: Optional[logging.Logger] = None):
|
|
||||||
"""
|
|
||||||
Initialize device comparison.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
config: Configuration dictionary (for comparing to config file)
|
|
||||||
logger: Optional logger instance
|
|
||||||
"""
|
|
||||||
self.config = config
|
|
||||||
self.logger = logger or logging.getLogger(__name__)
|
|
||||||
|
|
||||||
def get_device_full_status(self, device_ip: str, device_name: str) -> Dict:
|
|
||||||
"""
|
|
||||||
Get complete device status and configuration.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_ip: Device IP address
|
|
||||||
device_name: Device name for logging
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with all device status information
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Querying full status from {device_name} ({device_ip})")
|
|
||||||
|
|
||||||
device_info = {
|
|
||||||
'name': device_name,
|
|
||||||
'ip': device_ip,
|
|
||||||
'firmware': {},
|
|
||||||
'network': {},
|
|
||||||
'mqtt': {},
|
|
||||||
'setoptions': {},
|
|
||||||
'rules': {},
|
|
||||||
'gpio': {},
|
|
||||||
'other': {}
|
|
||||||
}
|
|
||||||
|
|
||||||
# Get Status 0 (all status)
|
|
||||||
result, success = send_tasmota_command(device_ip, "Status%200", timeout=10, logger=self.logger)
|
|
||||||
if success and result:
|
|
||||||
# Extract firmware info
|
|
||||||
if 'StatusFWR' in result:
|
|
||||||
device_info['firmware'] = result['StatusFWR']
|
|
||||||
|
|
||||||
# Extract network info
|
|
||||||
if 'StatusNET' in result:
|
|
||||||
device_info['network'] = result['StatusNET']
|
|
||||||
|
|
||||||
# Extract MQTT info
|
|
||||||
if 'StatusMQT' in result:
|
|
||||||
device_info['mqtt'] = result['StatusMQT']
|
|
||||||
|
|
||||||
# Extract basic status
|
|
||||||
if 'Status' in result:
|
|
||||||
device_info['other']['status'] = result['Status']
|
|
||||||
|
|
||||||
# Get all SetOptions (0-150)
|
|
||||||
self.logger.debug(f"Querying SetOptions from {device_name}")
|
|
||||||
for i in range(0, 151):
|
|
||||||
result, success = send_tasmota_command(device_ip, f"SetOption{i}", timeout=5, logger=self.logger)
|
|
||||||
if success and result:
|
|
||||||
key = f"SetOption{i}"
|
|
||||||
if key in result:
|
|
||||||
device_info['setoptions'][key] = result[key]
|
|
||||||
|
|
||||||
# Get Rules (1-3)
|
|
||||||
self.logger.debug(f"Querying Rules from {device_name}")
|
|
||||||
for i in range(1, 4):
|
|
||||||
result, success = send_tasmota_command(device_ip, f"Rule{i}%204", timeout=5, logger=self.logger)
|
|
||||||
if success and result:
|
|
||||||
key = f"Rule{i}"
|
|
||||||
if key in result:
|
|
||||||
device_info['rules'][key] = result[key]
|
|
||||||
|
|
||||||
# Get other important settings
|
|
||||||
for cmd in ['ButtonDebounce', 'SwitchDebounce', 'Template', 'Module']:
|
|
||||||
result, success = send_tasmota_command(device_ip, cmd, timeout=5, logger=self.logger)
|
|
||||||
if success and result:
|
|
||||||
device_info['other'][cmd] = result
|
|
||||||
|
|
||||||
return device_info
|
|
||||||
|
|
||||||
def get_expected_config(self, device_type: str) -> Dict:
|
|
||||||
"""
|
|
||||||
Get expected configuration from config file for a device type.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_type: Device type/DeviceName
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with expected configuration
|
|
||||||
"""
|
|
||||||
expected = {
|
|
||||||
'name': 'Expected (from config)',
|
|
||||||
'setoptions': {},
|
|
||||||
'rules': {},
|
|
||||||
'console_set': None
|
|
||||||
}
|
|
||||||
|
|
||||||
if not self.config:
|
|
||||||
return expected
|
|
||||||
|
|
||||||
# Find console_set for this device type
|
|
||||||
device_list = self.config.get('device_list', {})
|
|
||||||
console_set_name = None
|
|
||||||
|
|
||||||
# Try exact match first
|
|
||||||
if device_type in device_list:
|
|
||||||
console_set_name = device_list[device_type].get('console_set')
|
|
||||||
else:
|
|
||||||
# Try case-insensitive match on key
|
|
||||||
device_type_lower = device_type.lower()
|
|
||||||
for template_name, template_data in device_list.items():
|
|
||||||
if template_name.lower() == device_type_lower:
|
|
||||||
console_set_name = template_data.get('console_set')
|
|
||||||
break
|
|
||||||
|
|
||||||
# If still not found, try matching against template NAME field
|
|
||||||
if not console_set_name:
|
|
||||||
import json
|
|
||||||
for template_name, template_data in device_list.items():
|
|
||||||
template_str = template_data.get('template', '')
|
|
||||||
if template_str:
|
|
||||||
try:
|
|
||||||
template_obj = json.loads(template_str)
|
|
||||||
template_device_name = template_obj.get('NAME', '')
|
|
||||||
if template_device_name.lower() == device_type_lower:
|
|
||||||
console_set_name = template_data.get('console_set')
|
|
||||||
break
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not console_set_name:
|
|
||||||
self.logger.warning(f"No console_set found for device type: {device_type}")
|
|
||||||
return expected
|
|
||||||
|
|
||||||
expected['console_set'] = console_set_name
|
|
||||||
|
|
||||||
# Get console commands for this set
|
|
||||||
console_set = self.config.get('console_set', {})
|
|
||||||
commands = console_set.get(console_set_name, [])
|
|
||||||
|
|
||||||
# Parse commands into SetOptions and Rules
|
|
||||||
for command in commands:
|
|
||||||
if not command or not command.strip():
|
|
||||||
continue
|
|
||||||
|
|
||||||
parts = command.split(None, 1)
|
|
||||||
if not parts:
|
|
||||||
continue
|
|
||||||
|
|
||||||
param_name = parts[0]
|
|
||||||
param_value = parts[1] if len(parts) > 1 else ""
|
|
||||||
|
|
||||||
if param_name.startswith('SetOption'):
|
|
||||||
# Store raw value - we'll normalize during comparison
|
|
||||||
expected['setoptions'][param_name] = param_value
|
|
||||||
elif param_name.lower().startswith('rule'):
|
|
||||||
# For rules, just store the rule text
|
|
||||||
expected['rules'][param_name] = {
|
|
||||||
'State': 'ON', # Rules should be enabled
|
|
||||||
'Once': 'OFF', # Should not be Once
|
|
||||||
'Rules': param_value
|
|
||||||
}
|
|
||||||
|
|
||||||
return expected
|
|
||||||
|
|
||||||
def compare_device_to_config(self, device_ip: str, device_name: str, device_type: str) -> Dict:
|
|
||||||
"""
|
|
||||||
Compare a device's actual configuration to expected config from file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device_ip: Device IP address
|
|
||||||
device_name: Device name
|
|
||||||
device_type: Device type/DeviceName
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with comparison results
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Comparing {device_name} vs expected config for {device_type}")
|
|
||||||
|
|
||||||
# Get actual device config
|
|
||||||
device = self.get_device_full_status(device_ip, device_name)
|
|
||||||
|
|
||||||
# Get expected config
|
|
||||||
expected = self.get_expected_config(device_type)
|
|
||||||
|
|
||||||
# Compare only SetOptions and Rules that are in config
|
|
||||||
setoption_diffs = []
|
|
||||||
for key, expected_value in expected['setoptions'].items():
|
|
||||||
actual_value = device['setoptions'].get(key)
|
|
||||||
|
|
||||||
# Normalize values for comparison (0/1 vs OFF/ON)
|
|
||||||
def normalize(val):
|
|
||||||
if val is None:
|
|
||||||
return None
|
|
||||||
val_str = str(val).upper()
|
|
||||||
if val_str in ['0', 'OFF', 'FALSE']:
|
|
||||||
return 'OFF'
|
|
||||||
elif val_str in ['1', 'ON', 'TRUE']:
|
|
||||||
return 'ON'
|
|
||||||
return str(val)
|
|
||||||
|
|
||||||
if normalize(actual_value) != normalize(expected_value):
|
|
||||||
setoption_diffs.append({
|
|
||||||
'key': key,
|
|
||||||
'device1_value': actual_value,
|
|
||||||
'device2_value': expected_value
|
|
||||||
})
|
|
||||||
|
|
||||||
rule_diffs = []
|
|
||||||
for key, expected_rule in expected['rules'].items():
|
|
||||||
# Normalize key (rule1 vs Rule1)
|
|
||||||
rule_key = key[0].upper() + key[1:] # Capitalize first letter
|
|
||||||
actual_rule = device['rules'].get(rule_key, {})
|
|
||||||
|
|
||||||
# Compare rule content
|
|
||||||
rules_match = True
|
|
||||||
if not actual_rule:
|
|
||||||
rules_match = False
|
|
||||||
else:
|
|
||||||
# Compare State, Once, and Rules content
|
|
||||||
expected_state = expected_rule.get('State', 'ON')
|
|
||||||
actual_state = actual_rule.get('State', 'OFF')
|
|
||||||
if actual_state != expected_state:
|
|
||||||
rules_match = False
|
|
||||||
|
|
||||||
expected_once = expected_rule.get('Once', 'OFF')
|
|
||||||
actual_once = actual_rule.get('Once', 'ON')
|
|
||||||
if actual_once != expected_once:
|
|
||||||
rules_match = False
|
|
||||||
|
|
||||||
expected_rules = expected_rule.get('Rules', '')
|
|
||||||
actual_rules = actual_rule.get('Rules', '')
|
|
||||||
if actual_rules.strip() != expected_rules.strip():
|
|
||||||
rules_match = False
|
|
||||||
|
|
||||||
if not rules_match:
|
|
||||||
rule_diffs.append({
|
|
||||||
'key': rule_key,
|
|
||||||
'device1_value': actual_rule if actual_rule else {},
|
|
||||||
'device2_value': expected_rule
|
|
||||||
})
|
|
||||||
|
|
||||||
differences = {
|
|
||||||
'device1': {'name': device_name, 'ip': device_ip},
|
|
||||||
'device2': {'name': f"Expected ({expected['console_set']})", 'ip': 'config file'},
|
|
||||||
'firmware': [],
|
|
||||||
'network': [],
|
|
||||||
'mqtt': [],
|
|
||||||
'setoptions': setoption_diffs,
|
|
||||||
'rules': rule_diffs,
|
|
||||||
'other': []
|
|
||||||
}
|
|
||||||
|
|
||||||
return differences
|
|
||||||
|
|
||||||
def compare_devices(self, device1_ip: str, device1_name: str,
|
|
||||||
device2_ip: str, device2_name: str) -> Dict:
|
|
||||||
"""
|
|
||||||
Compare two devices and return differences.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
device1_ip: First device IP
|
|
||||||
device1_name: First device name
|
|
||||||
device2_ip: Second device IP
|
|
||||||
device2_name: Second device name
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Dictionary with comparison results
|
|
||||||
"""
|
|
||||||
self.logger.info(f"Comparing {device1_name} vs {device2_name}")
|
|
||||||
|
|
||||||
# Get full status from both devices in parallel
|
|
||||||
self.logger.info("Querying devices in parallel...")
|
|
||||||
|
|
||||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
|
||||||
future1 = executor.submit(self.get_device_full_status, device1_ip, device1_name)
|
|
||||||
future2 = executor.submit(self.get_device_full_status, device2_ip, device2_name)
|
|
||||||
|
|
||||||
device1 = future1.result()
|
|
||||||
device2 = future2.result()
|
|
||||||
|
|
||||||
# Compare and find differences
|
|
||||||
differences = {
|
|
||||||
'device1': {'name': device1_name, 'ip': device1_ip},
|
|
||||||
'device2': {'name': device2_name, 'ip': device2_ip},
|
|
||||||
'firmware': self._compare_section(device1['firmware'], device2['firmware']),
|
|
||||||
'network': self._compare_section(device1['network'], device2['network']),
|
|
||||||
'mqtt': self._compare_section(device1['mqtt'], device2['mqtt']),
|
|
||||||
'setoptions': self._compare_section(device1['setoptions'], device2['setoptions']),
|
|
||||||
'rules': self._compare_section(device1['rules'], device2['rules']),
|
|
||||||
'other': self._compare_section(device1['other'], device2['other'])
|
|
||||||
}
|
|
||||||
|
|
||||||
return differences
|
|
||||||
|
|
||||||
def _compare_section(self, section1: Dict, section2: Dict) -> List[Dict]:
|
|
||||||
"""
|
|
||||||
Compare two configuration sections.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
section1: First device section
|
|
||||||
section2: Second device section
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of differences
|
|
||||||
"""
|
|
||||||
differences = []
|
|
||||||
|
|
||||||
# Get all keys from both sections
|
|
||||||
all_keys = set(section1.keys()) | set(section2.keys())
|
|
||||||
|
|
||||||
for key in sorted(all_keys):
|
|
||||||
val1 = section1.get(key)
|
|
||||||
val2 = section2.get(key)
|
|
||||||
|
|
||||||
if val1 != val2:
|
|
||||||
differences.append({
|
|
||||||
'key': key,
|
|
||||||
'device1_value': val1,
|
|
||||||
'device2_value': val2
|
|
||||||
})
|
|
||||||
|
|
||||||
return differences
|
|
||||||
|
|
||||||
def print_comparison_report(self, comparison: Dict) -> None:
|
|
||||||
"""
|
|
||||||
Print human-readable comparison report.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
comparison: Comparison results dictionary
|
|
||||||
"""
|
|
||||||
print("\n" + "=" * 80)
|
|
||||||
print("DEVICE COMPARISON REPORT")
|
|
||||||
print("=" * 80)
|
|
||||||
|
|
||||||
device1 = comparison['device1']
|
|
||||||
device2 = comparison['device2']
|
|
||||||
|
|
||||||
print(f"\nDevice 1: {device1['name']} ({device1['ip']})")
|
|
||||||
print(f"Device 2: {device2['name']} ({device2['ip']})")
|
|
||||||
|
|
||||||
# Print firmware info
|
|
||||||
print("\n" + "-" * 80)
|
|
||||||
print("FIRMWARE DIFFERENCES")
|
|
||||||
if comparison['firmware']:
|
|
||||||
self._print_differences(comparison['firmware'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
# Print network differences
|
|
||||||
print("-" * 80)
|
|
||||||
print("NETWORK DIFFERENCES")
|
|
||||||
if comparison['network']:
|
|
||||||
self._print_differences(comparison['network'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
# Print MQTT differences
|
|
||||||
print("-" * 80)
|
|
||||||
print("MQTT DIFFERENCES")
|
|
||||||
if comparison['mqtt']:
|
|
||||||
self._print_differences(comparison['mqtt'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
# Print SetOption differences
|
|
||||||
print("-" * 80)
|
|
||||||
print("SETOPTION DIFFERENCES")
|
|
||||||
if comparison['setoptions']:
|
|
||||||
self._print_differences(comparison['setoptions'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
# Print Rule differences (special row format)
|
|
||||||
print("-" * 80)
|
|
||||||
print("RULE DIFFERENCES")
|
|
||||||
if comparison['rules']:
|
|
||||||
self._print_rule_differences(comparison['rules'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
# Print other differences
|
|
||||||
print("-" * 80)
|
|
||||||
print("OTHER CONFIGURATION DIFFERENCES")
|
|
||||||
if comparison['other']:
|
|
||||||
self._print_differences(comparison['other'], device1['name'], device2['name'])
|
|
||||||
else:
|
|
||||||
print("No differences found")
|
|
||||||
print() # Blank line after section
|
|
||||||
|
|
||||||
print("\n" + "=" * 80)
|
|
||||||
print("END OF REPORT")
|
|
||||||
print("=" * 80 + "\n")
|
|
||||||
|
|
||||||
def _print_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
|
|
||||||
"""
|
|
||||||
Print list of differences in columnar format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
differences: List of difference dictionaries
|
|
||||||
device1_name: First device name
|
|
||||||
device2_name: Second device name
|
|
||||||
"""
|
|
||||||
if not differences:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Calculate column widths
|
|
||||||
max_key_len = max(len(str(diff['key'])) for diff in differences)
|
|
||||||
max_val1_len = max(len(str(diff['device1_value'])) for diff in differences)
|
|
||||||
max_val2_len = max(len(str(diff['device2_value'])) for diff in differences)
|
|
||||||
|
|
||||||
# Ensure minimum column widths
|
|
||||||
key_width = max(max_key_len, 15)
|
|
||||||
val1_width = max(max_val1_len, len(device1_name), 20)
|
|
||||||
val2_width = max(max_val2_len, len(device2_name), 20)
|
|
||||||
|
|
||||||
# Truncate device names if too long for column headers
|
|
||||||
dev1_header = device1_name[:val1_width]
|
|
||||||
dev2_header = device2_name[:val2_width]
|
|
||||||
|
|
||||||
# Print header row
|
|
||||||
print(f"\n{'Parameter':<{key_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
|
|
||||||
print("-" * key_width + " " + "-" * val1_width + " " + "-" * val2_width)
|
|
||||||
|
|
||||||
# Print each difference
|
|
||||||
for diff in differences:
|
|
||||||
key = str(diff['key'])
|
|
||||||
val1 = str(diff['device1_value'])
|
|
||||||
val2 = str(diff['device2_value'])
|
|
||||||
|
|
||||||
# Truncate values if too long
|
|
||||||
if len(val1) > 60:
|
|
||||||
val1 = val1[:57] + "..."
|
|
||||||
if len(val2) > 60:
|
|
||||||
val2 = val2[:57] + "..."
|
|
||||||
|
|
||||||
print(f"{key:<{key_width}} {val1:<{val1_width}} {val2:<{val2_width}}")
|
|
||||||
|
|
||||||
def _print_rule_differences(self, differences: List[Dict], device1_name: str, device2_name: str) -> None:
|
|
||||||
"""
|
|
||||||
Print rule differences in columnar format for easier field comparison.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
differences: List of difference dictionaries
|
|
||||||
device1_name: First device name
|
|
||||||
device2_name: Second device name
|
|
||||||
"""
|
|
||||||
if not differences:
|
|
||||||
return
|
|
||||||
|
|
||||||
# Group differences by rule number
|
|
||||||
rules_by_number = {}
|
|
||||||
for diff in differences:
|
|
||||||
rule_key = str(diff['key'])
|
|
||||||
rules_by_number[rule_key] = diff
|
|
||||||
|
|
||||||
# Print each rule's details in columnar format
|
|
||||||
for rule_num in sorted(rules_by_number.keys()):
|
|
||||||
diff = rules_by_number[rule_num]
|
|
||||||
val1 = diff['device1_value']
|
|
||||||
val2 = diff['device2_value']
|
|
||||||
|
|
||||||
print(f"\n{rule_num}:")
|
|
||||||
|
|
||||||
# Extract all fields from both devices
|
|
||||||
if isinstance(val1, dict) and isinstance(val2, dict):
|
|
||||||
all_fields = set(val1.keys()) | set(val2.keys())
|
|
||||||
|
|
||||||
# Calculate column widths
|
|
||||||
max_field_len = max(len(field) for field in all_fields) if all_fields else 10
|
|
||||||
field_width = max(max_field_len, 10)
|
|
||||||
|
|
||||||
dev1_header = device1_name
|
|
||||||
dev2_header = device2_name
|
|
||||||
|
|
||||||
# Calculate value column widths
|
|
||||||
val1_width = max(len(dev1_header), 20)
|
|
||||||
val2_width = max(len(dev2_header), 20)
|
|
||||||
|
|
||||||
# Print header
|
|
||||||
print(f"{'Field':<{field_width}} {dev1_header:<{val1_width}} {dev2_header:<{val2_width}}")
|
|
||||||
print("-" * field_width + " " + "-" * val1_width + " " + "-" * val2_width)
|
|
||||||
|
|
||||||
# Print each field
|
|
||||||
for field in sorted(all_fields):
|
|
||||||
v1 = str(val1.get(field, 'N/A'))
|
|
||||||
v2 = str(val2.get(field, 'N/A'))
|
|
||||||
|
|
||||||
# Truncate if too long
|
|
||||||
if len(v1) > val1_width:
|
|
||||||
v1 = v1[:val1_width-3] + "..."
|
|
||||||
if len(v2) > val2_width:
|
|
||||||
v2 = v2[:val2_width-3] + "..."
|
|
||||||
|
|
||||||
print(f"{field:<{field_width}} {v1:<{val1_width}} {v2:<{val2_width}}")
|
|
||||||
21
discovery.py
21
discovery.py
@ -76,9 +76,7 @@ class TasmotaDiscovery:
|
|||||||
exclude_patterns = network_config.get('exclude_patterns', [])
|
exclude_patterns = network_config.get('exclude_patterns', [])
|
||||||
|
|
||||||
for pattern in exclude_patterns:
|
for pattern in exclude_patterns:
|
||||||
# Use match_entire_string=False to allow partial matching with wildcards
|
if match_pattern(device_name, pattern) or match_pattern(device_hostname, pattern):
|
||||||
if match_pattern(device_name, pattern, match_entire_string=False) or \
|
|
||||||
match_pattern(device_hostname, pattern, match_entire_string=False):
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
@ -230,23 +228,8 @@ class TasmotaDiscovery:
|
|||||||
|
|
||||||
# Track deprecated devices
|
# Track deprecated devices
|
||||||
if previous_data:
|
if previous_data:
|
||||||
# Handle case where previous_data might be a list directly or wrapped in a dict
|
|
||||||
if isinstance(previous_data, list):
|
|
||||||
previous_devices = previous_data
|
|
||||||
elif isinstance(previous_data, dict):
|
|
||||||
# Check if it has a 'devices' key (wrapped format)
|
|
||||||
if 'devices' in previous_data:
|
|
||||||
previous_devices = previous_data['devices']
|
|
||||||
else:
|
|
||||||
# Assume it's a single device dict, wrap it
|
|
||||||
previous_devices = [previous_data]
|
|
||||||
else:
|
|
||||||
# Invalid format, skip deprecated device tracking
|
|
||||||
self.logger.warning(f"Previous data has unexpected type: {type(previous_data)}")
|
|
||||||
return
|
|
||||||
|
|
||||||
current_ips = {d['ip'] for d in devices}
|
current_ips = {d['ip'] for d in devices}
|
||||||
deprecated = [d for d in previous_devices if d.get('ip') not in current_ips]
|
deprecated = [d for d in previous_data if d.get('ip') not in current_ips]
|
||||||
|
|
||||||
if deprecated:
|
if deprecated:
|
||||||
self.logger.info(f"Found {len(deprecated)} deprecated devices")
|
self.logger.info(f"Found {len(deprecated)} deprecated devices")
|
||||||
|
|||||||
@ -10,14 +10,14 @@
|
|||||||
"name": "NoT",
|
"name": "NoT",
|
||||||
"subnet": "192.168.8",
|
"subnet": "192.168.8",
|
||||||
"exclude_patterns": [
|
"exclude_patterns": [
|
||||||
"homeassistant*",
|
"^homeassistant*",
|
||||||
"*sonos*"
|
"^.*sonos.*"
|
||||||
],
|
],
|
||||||
"unknown_device_patterns": [
|
"unknown_device_patterns": [
|
||||||
"tasmota_*",
|
"^tasmota_*",
|
||||||
"tasmota-*",
|
"^tasmota-*",
|
||||||
"esp-*",
|
"^esp-*",
|
||||||
"ESP-*"
|
"^ESP-*"
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -54,19 +54,19 @@
|
|||||||
},
|
},
|
||||||
"Gosund_WP5_Plug": {
|
"Gosund_WP5_Plug": {
|
||||||
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,17,0,0,0,56,57,21,0,0],\"FLAG\":0,\"BASE\":18}",
|
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,17,0,0,0,56,57,21,0,0],\"FLAG\":0,\"BASE\":18}",
|
||||||
"console_set": "Plug"
|
"console_set": "Traditional"
|
||||||
},
|
},
|
||||||
"Gosund_Plug": {
|
"Gosund_Plug": {
|
||||||
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,32,0,0,0,320,321,224,0,0,0],\"FLAG\":0,\"BASE\":18}",
|
"template": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,32,0,0,0,320,321,224,0,0,0],\"FLAG\":0,\"BASE\":18}",
|
||||||
"console_set": "Plug"
|
"console_set": "Traditional"
|
||||||
},
|
},
|
||||||
"CloudFree_X10S_Plug": {
|
"CloudFree_X10S_Plug": {
|
||||||
"template": "{\"NAME\":\"Aoycocr X10S\",\"GPIO\":[56,0,57,0,21,134,0,0,131,17,132,0,0],\"FLAG\":0,\"BASE\":45}",
|
"template": "{\"NAME\":\"Aoycocr X10S\",\"GPIO\":[56,0,57,0,21,134,0,0,131,17,132,0,0],\"FLAG\":0,\"BASE\":45}",
|
||||||
"console_set": "Plug"
|
"console_set": "Traditional"
|
||||||
},
|
},
|
||||||
"Sonoff_S31_PM_Plug": {
|
"Sonoff_S31_PM_Plug": {
|
||||||
"template": "{\"NAME\":\"Sonoff S31\",\"GPIO\":[17,145,0,146,0,0,0,0,21,56,0,0,0],\"FLAG\":0,\"BASE\":41}",
|
"template": "{\"NAME\":\"Sonoff S31\",\"GPIO\":[17,145,0,146,0,0,0,0,21,56,0,0,0],\"FLAG\":0,\"BASE\":41}",
|
||||||
"console_set": "Plug"
|
"console_set": "Traditional"
|
||||||
},
|
},
|
||||||
"Sonoff TX Ultimate 1": {
|
"Sonoff TX Ultimate 1": {
|
||||||
"template": "{\"NAME\":\"Sonoff T5-1C-120\",\"GPIO\":[0,0,7808,0,7840,3872,0,0,0,1376,0,7776,0,0,224,3232,0,480,3200,0,0,0,3840,0,0,0,0,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":1}",
|
"template": "{\"NAME\":\"Sonoff T5-1C-120\",\"GPIO\":[0,0,7808,0,7840,3872,0,0,0,1376,0,7776,0,0,224,3232,0,480,3200,0,0,0,3840,0,0,0,0,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":1}",
|
||||||
@ -85,7 +85,7 @@
|
|||||||
"SetOption13 0",
|
"SetOption13 0",
|
||||||
"SetOption19 0",
|
"SetOption19 0",
|
||||||
"SetOption32 8",
|
"SetOption32 8",
|
||||||
"SetOption40 0",
|
"SetOption40 40",
|
||||||
"SetOption53 1",
|
"SetOption53 1",
|
||||||
"SetOption73 1",
|
"SetOption73 1",
|
||||||
"rule1 on button1#state=10 do power0 toggle endon"
|
"rule1 on button1#state=10 do power0 toggle endon"
|
||||||
@ -102,19 +102,11 @@
|
|||||||
"SetOption13 0",
|
"SetOption13 0",
|
||||||
"SetOption19 0",
|
"SetOption19 0",
|
||||||
"SetOption32 8",
|
"SetOption32 8",
|
||||||
"SetOption40 0",
|
"SetOption40 40",
|
||||||
|
|
||||||
"SetOption53 1",
|
"SetOption53 1",
|
||||||
"SetOption73 1"
|
"SetOption73 1"
|
||||||
],
|
|
||||||
"Plug": [
|
|
||||||
"SwitchRetain Off",
|
|
||||||
"ButtonRetain Off",
|
|
||||||
"PowerRetain On",
|
|
||||||
"PowerOnState 3",
|
|
||||||
"SetOption1 0",
|
|
||||||
"SetOption3 1",
|
|
||||||
"SetOption73 0",
|
|
||||||
"rule1 on button1#state=10 do power0 toggle endon"
|
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
47
reporting.py
47
reporting.py
@ -24,53 +24,6 @@ class ReportGenerator:
|
|||||||
self.config = config
|
self.config = config
|
||||||
self.discovery = discovery
|
self.discovery = discovery
|
||||||
self.logger = logger or logging.getLogger(__name__)
|
self.logger = logger or logging.getLogger(__name__)
|
||||||
self.errors_and_warnings = [] # Collect errors and warnings
|
|
||||||
|
|
||||||
def add_error(self, device_name: str, message: str):
|
|
||||||
"""Add an error message to the collection."""
|
|
||||||
self.errors_and_warnings.append(('ERROR', device_name, message))
|
|
||||||
|
|
||||||
def add_warning(self, device_name: str, message: str):
|
|
||||||
"""Add a warning message to the collection."""
|
|
||||||
self.errors_and_warnings.append(('WARNING', device_name, message))
|
|
||||||
|
|
||||||
def print_errors_and_warnings_summary(self):
|
|
||||||
"""Print summary of all errors and warnings that require user attention."""
|
|
||||||
if not self.errors_and_warnings:
|
|
||||||
return
|
|
||||||
|
|
||||||
self.logger.info("")
|
|
||||||
self.logger.error("=" * 60)
|
|
||||||
self.logger.error("ERRORS AND WARNINGS REQUIRING ATTENTION")
|
|
||||||
self.logger.error("=" * 60)
|
|
||||||
|
|
||||||
# Sort by severity (ERROR first, then WARNING) and then by device name
|
|
||||||
sorted_issues = sorted(self.errors_and_warnings,
|
|
||||||
key=lambda x: (0 if x[0] == 'ERROR' else 1, x[1]))
|
|
||||||
|
|
||||||
for severity, device_name, message in sorted_issues:
|
|
||||||
if severity == 'ERROR':
|
|
||||||
self.logger.error(f" ✗ {device_name}: {message}")
|
|
||||||
else:
|
|
||||||
self.logger.warning(f" ⚠ {device_name}: {message}")
|
|
||||||
|
|
||||||
# Print action items
|
|
||||||
self.logger.error("")
|
|
||||||
self.logger.error("ACTION REQUIRED:")
|
|
||||||
|
|
||||||
# Group by issue type
|
|
||||||
connection_errors = [x for x in sorted_issues if 'connection' in x[2].lower() or 'refused' in x[2].lower()]
|
|
||||||
mqtt_errors = [x for x in sorted_issues if 'mqtt' in x[2].lower()]
|
|
||||||
other_errors = [x for x in sorted_issues if x not in connection_errors and x not in mqtt_errors]
|
|
||||||
|
|
||||||
if connection_errors:
|
|
||||||
self.logger.error(f" • {len(connection_errors)} device(s) unreachable - check if devices are online")
|
|
||||||
if mqtt_errors:
|
|
||||||
self.logger.error(f" • {len(mqtt_errors)} device(s) with MQTT issues - review configuration")
|
|
||||||
if other_errors:
|
|
||||||
self.logger.error(f" • {len(other_errors)} device(s) with other issues - review above details")
|
|
||||||
|
|
||||||
self.logger.error("=" * 60)
|
|
||||||
|
|
||||||
def generate_unifi_hostname_report(self) -> Dict:
|
def generate_unifi_hostname_report(self) -> Dict:
|
||||||
"""
|
"""
|
||||||
|
|||||||
@ -1,58 +0,0 @@
|
|||||||
#!/usr/bin/env python3
|
|
||||||
"""Test UniFi connection and authentication."""
|
|
||||||
|
|
||||||
import requests
|
|
||||||
import urllib3
|
|
||||||
import json
|
|
||||||
|
|
||||||
urllib3.disable_warnings()
|
|
||||||
|
|
||||||
# Load your actual configuration
|
|
||||||
with open('network_configuration.json', 'r') as f:
|
|
||||||
config = json.load(f)
|
|
||||||
|
|
||||||
host = config['unifi']['host']
|
|
||||||
username = config['unifi']['username']
|
|
||||||
password = config['unifi']['password']
|
|
||||||
site = config['unifi'].get('site', 'default')
|
|
||||||
|
|
||||||
print(f'Testing connection to: {host}')
|
|
||||||
print(f'Username: {username}')
|
|
||||||
print(f'Site: {site}')
|
|
||||||
print('=' * 60)
|
|
||||||
|
|
||||||
# Test UniFi OS login (modern)
|
|
||||||
print('\n1. Attempting UniFi OS login (/api/auth/login)...')
|
|
||||||
try:
|
|
||||||
session = requests.Session()
|
|
||||||
response = session.post(
|
|
||||||
f'{host}/api/auth/login',
|
|
||||||
json={'username': username, 'password': password},
|
|
||||||
verify=False,
|
|
||||||
timeout=10
|
|
||||||
)
|
|
||||||
print(f' Status code: {response.status_code}')
|
|
||||||
print(f' Response: {response.text[:200]}')
|
|
||||||
if response.status_code == 200:
|
|
||||||
print(' ✓ UniFi OS authentication successful!')
|
|
||||||
except Exception as e:
|
|
||||||
print(f' ✗ Error: {e}')
|
|
||||||
|
|
||||||
# Test legacy UniFi Controller login (older controllers)
|
|
||||||
print('\n2. Attempting legacy UniFi Controller login (/api/login)...')
|
|
||||||
try:
|
|
||||||
session2 = requests.Session()
|
|
||||||
response2 = session2.post(
|
|
||||||
f'{host}/api/login',
|
|
||||||
json={'username': username, 'password': password},
|
|
||||||
verify=False,
|
|
||||||
timeout=10
|
|
||||||
)
|
|
||||||
print(f' Status code: {response2.status_code}')
|
|
||||||
print(f' Response: {response2.text[:200]}')
|
|
||||||
if response2.status_code == 200:
|
|
||||||
print(' ✓ Legacy UniFi authentication successful!')
|
|
||||||
except Exception as e:
|
|
||||||
print(f' ✗ Error: {e}')
|
|
||||||
|
|
||||||
print('\n' + '=' * 60)
|
|
||||||
@ -129,8 +129,7 @@ class UnifiClient:
|
|||||||
Raises:
|
Raises:
|
||||||
UniFiDataError: If request fails
|
UniFiDataError: If request fails
|
||||||
"""
|
"""
|
||||||
# UniFi OS (UDM-SE) uses /proxy/network prefix for Network application API
|
endpoint = f'/api/s/{self.site_id}/stat/sta'
|
||||||
endpoint = f'/proxy/network/api/s/{self.site_id}/stat/sta'
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = self._request_json(endpoint)
|
response = self._request_json(endpoint)
|
||||||
@ -156,8 +155,7 @@ class UnifiClient:
|
|||||||
Raises:
|
Raises:
|
||||||
UniFiDataError: If request fails
|
UniFiDataError: If request fails
|
||||||
"""
|
"""
|
||||||
# UniFi OS (UDM-SE) uses /proxy/network prefix for Network application API
|
endpoint = f'/api/s/{self.site_id}/stat/device'
|
||||||
endpoint = f'/proxy/network/api/s/{self.site_id}/stat/device'
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = self._request_json(endpoint)
|
response = self._request_json(endpoint)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user