Major improvements in v2.1: - Fixed console settings matching to use DeviceName and template NAME fields - Proper URL encoding for rules (handles # and = characters) - Fixed rule enabling (Rule1 ON vs Rule1 4) - Fixed MQTT FullTopic %20 prefix issue - Added single-device diff (--diff DEVICE) - Improved button timing (SetOption32=8 for 0.8s hold) - Created Plug profile separate from switch settings - Fixed SetOption verification and disabled to prevent device overload - Added delays and retry logic for reliable rule enabling
448 lines
16 KiB
Python
448 lines
16 KiB
Python
"""Main entry point for TasmotaManager."""
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from typing import Optional
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern
|
|
from unifi_client import UnifiClient, AuthenticationError
|
|
from discovery import TasmotaDiscovery
|
|
from configuration import ConfigurationManager
|
|
from console_settings import ConsoleSettingsManager
|
|
from unknown_devices import UnknownDeviceProcessor
|
|
from reporting import ReportGenerator
|
|
from device_diff import DeviceComparison
|
|
|
|
|
|
def setup_logging(debug: bool = False) -> logging.Logger:
|
|
"""
|
|
Setup logging configuration.
|
|
|
|
Args:
|
|
debug: Enable debug logging
|
|
|
|
Returns:
|
|
Logger instance
|
|
"""
|
|
level = logging.DEBUG if debug else logging.INFO
|
|
|
|
logging.basicConfig(
|
|
level=level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
return logging.getLogger('TasmotaManager')
|
|
|
|
|
|
def load_config(config_path: str, logger: logging.Logger) -> Optional[dict]:
|
|
"""
|
|
Load configuration file.
|
|
|
|
Args:
|
|
config_path: Path to configuration file
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
Configuration dictionary or None
|
|
"""
|
|
config = load_json_file(config_path, logger)
|
|
|
|
if not config:
|
|
logger.error(f"Failed to load configuration from {config_path}")
|
|
return None
|
|
|
|
# Validate required sections
|
|
required_sections = ['unifi', 'mqtt']
|
|
for section in required_sections:
|
|
if section not in config:
|
|
logger.error(f"Configuration missing required section: {section}")
|
|
return None
|
|
|
|
return config
|
|
|
|
|
|
def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiClient]:
|
|
"""
|
|
Setup UniFi client.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
UnifiClient instance or None
|
|
"""
|
|
unifi_config = config.get('unifi', {})
|
|
|
|
try:
|
|
client = UnifiClient(
|
|
host=unifi_config['host'],
|
|
username=unifi_config['username'],
|
|
password=unifi_config['password'],
|
|
site=unifi_config.get('site', 'default'),
|
|
verify_ssl=False,
|
|
logger=logger
|
|
)
|
|
return client
|
|
|
|
except AuthenticationError as e:
|
|
logger.error(f"UniFi authentication failed: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to setup UniFi client: {e}")
|
|
return None
|
|
|
|
|
|
def process_single_device(device: dict, config_manager: ConfigurationManager,
|
|
console_manager: ConsoleSettingsManager,
|
|
logger: logging.Logger,
|
|
report_gen: 'ReportGenerator') -> tuple:
|
|
"""
|
|
Process a single device for configuration.
|
|
|
|
Args:
|
|
device: Device to process
|
|
config_manager: Configuration manager instance
|
|
console_manager: Console settings manager instance
|
|
logger: Logger instance
|
|
report_gen: Report generator for error collection
|
|
|
|
Returns:
|
|
Tuple of (device_info, success, messages)
|
|
"""
|
|
device_name = device.get('name', 'Unknown')
|
|
device_ip = device.get('ip', '')
|
|
messages = []
|
|
|
|
try:
|
|
logger.debug(f"Processing: {device_name} ({device_ip})")
|
|
|
|
# Get device details
|
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
|
|
|
if not device_details:
|
|
messages.append(f" {device_name}: Could not get device details, skipping")
|
|
report_gen.add_error(device_name, f"Connection failed to {device_ip}")
|
|
return None, False, messages
|
|
|
|
# Check and update template
|
|
template_success = config_manager.check_and_update_template(device, device_details)
|
|
|
|
if template_success:
|
|
logger.debug(f" {device_name}: Template checked/updated")
|
|
# Refresh device details after template update
|
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
|
|
|
# Configure MQTT
|
|
mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details)
|
|
|
|
if mqtt_success:
|
|
if mqtt_status == "Updated":
|
|
logger.debug(f" {device_name}: MQTT updated")
|
|
else:
|
|
logger.debug(f" {device_name}: MQTT already configured")
|
|
else:
|
|
messages.append(f" {device_name}: MQTT configuration failed - {mqtt_status}")
|
|
report_gen.add_error(device_name, f"MQTT configuration failed: {mqtt_status}")
|
|
|
|
# Apply console settings
|
|
console_success, console_status = console_manager.apply_console_settings(device, device_details)
|
|
|
|
if console_success:
|
|
if console_status == "Applied":
|
|
logger.debug(f" {device_name}: Console settings applied")
|
|
elif console_status != "No console settings" and console_status != "Empty console set":
|
|
logger.debug(f" {device_name}: Console settings - {console_status}")
|
|
else:
|
|
messages.append(f" {device_name}: Console settings failed - {console_status}")
|
|
report_gen.add_warning(device_name, f"Console settings failed: {console_status}")
|
|
|
|
# Save device details
|
|
device_info = {
|
|
**device,
|
|
'mqtt_status': mqtt_status,
|
|
'console_status': console_status,
|
|
'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown')
|
|
}
|
|
|
|
logger.debug(f" {device_name}: ✓ Processing completed successfully")
|
|
return device_info, True, messages
|
|
|
|
except Exception as e:
|
|
messages.append(f" {device_name}: ✗ Error during processing: {e}")
|
|
report_gen.add_error(device_name, f"Unexpected error: {str(e)}")
|
|
return None, False, messages
|
|
|
|
|
|
def process_devices(devices: list, config_manager: ConfigurationManager,
|
|
console_manager: ConsoleSettingsManager, logger: logging.Logger,
|
|
report_gen: 'ReportGenerator', max_workers: int = 10):
|
|
"""
|
|
Process all devices for configuration in parallel.
|
|
|
|
Args:
|
|
devices: List of devices to process
|
|
config_manager: Configuration manager instance
|
|
console_manager: Console settings manager instance
|
|
logger: Logger instance
|
|
report_gen: Report generator for error collection
|
|
max_workers: Maximum number of parallel workers (default: 10)
|
|
"""
|
|
device_details_list = []
|
|
stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0}
|
|
all_messages = []
|
|
|
|
logger.info(f"\nProcessing {len(devices)} devices in parallel (max {max_workers} workers)...")
|
|
|
|
# Process devices in parallel using ThreadPoolExecutor
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all device processing tasks
|
|
future_to_device = {
|
|
executor.submit(process_single_device, device, config_manager,
|
|
console_manager, logger, report_gen): device
|
|
for device in devices
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in as_completed(future_to_device):
|
|
device = future_to_device[future]
|
|
device_name = device.get('name', 'Unknown')
|
|
|
|
try:
|
|
device_info, success, messages = future.result()
|
|
|
|
# Store messages for reporting (only errors/warnings)
|
|
all_messages.extend(messages)
|
|
|
|
if success and device_info:
|
|
device_details_list.append(device_info)
|
|
stats['processed'] += 1
|
|
|
|
# Track MQTT and console updates
|
|
if device_info.get('mqtt_status') == 'Updated':
|
|
stats['mqtt_updated'] += 1
|
|
if device_info.get('console_status') == 'Applied':
|
|
stats['console_updated'] += 1
|
|
else:
|
|
stats['failed'] += 1
|
|
|
|
except Exception as e:
|
|
all_messages.append(f"{device_name}: Unexpected error: {e}")
|
|
report_gen.add_error(device_name, f"Processing exception: {str(e)}")
|
|
stats['failed'] += 1
|
|
|
|
# Print only error/warning messages (messages list now only contains issues)
|
|
if all_messages:
|
|
logger.info("\n" + "=" * 60)
|
|
logger.info("PROCESSING ISSUES")
|
|
logger.info("=" * 60)
|
|
for message in all_messages:
|
|
logger.info(message)
|
|
|
|
return device_details_list, stats
|
|
|
|
def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]:
|
|
"""
|
|
Find a device by IP address or hostname.
|
|
|
|
Args:
|
|
devices: List of devices
|
|
identifier: IP address or hostname (with optional wildcards)
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
Device dictionary or None
|
|
"""
|
|
# Check if it's an IP address
|
|
if is_valid_ip(identifier):
|
|
for device in devices:
|
|
if device.get('ip') == identifier:
|
|
return device
|
|
logger.error(f"No device found with IP: {identifier}")
|
|
return None
|
|
|
|
# Search by hostname with pattern matching
|
|
matches = []
|
|
|
|
for device in devices:
|
|
device_name = device.get('name', '')
|
|
device_hostname = device.get('hostname', '')
|
|
|
|
# Try exact match first
|
|
if device_name.lower() == identifier.lower() or device_hostname.lower() == identifier.lower():
|
|
return device
|
|
|
|
# Try pattern matching
|
|
if match_pattern(device_name, identifier, match_entire_string=False) or \
|
|
match_pattern(device_hostname, identifier, match_entire_string=False):
|
|
matches.append(device)
|
|
|
|
if len(matches) == 0:
|
|
logger.error(f"No device found matching: {identifier}")
|
|
return None
|
|
elif len(matches) == 1:
|
|
return matches[0]
|
|
else:
|
|
logger.warning(f"Multiple devices match '{identifier}':")
|
|
for device in matches:
|
|
logger.warning(f" - {device.get('name')} ({device.get('ip')})")
|
|
logger.info(f"Using first match: {matches[0].get('name')}")
|
|
return matches[0]
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
|
parser.add_argument('--config', default='network_configuration.json',
|
|
help='Path to configuration file')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug logging')
|
|
parser.add_argument('--skip-unifi', action='store_true',
|
|
help='Skip UniFi discovery and use existing data')
|
|
parser.add_argument('--process-unknown', action='store_true',
|
|
help='Process unknown devices interactively')
|
|
parser.add_argument('--unifi-hostname-report', action='store_true',
|
|
help='Generate UniFi hostname comparison report')
|
|
parser.add_argument('--Device', type=str,
|
|
help='Process single device by IP or hostname')
|
|
parser.add_argument('--diff', nargs='+', metavar='DEVICE',
|
|
help='Compare devices: --diff DEVICE (vs config) or --diff DEVICE1 DEVICE2 (vs each other)')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging
|
|
logger = setup_logging(args.debug)
|
|
logger.info("TasmotaManager v2.1 starting")
|
|
|
|
# Ensure data directory exists
|
|
ensure_data_directory()
|
|
|
|
# Load configuration
|
|
config = load_config(args.config, logger)
|
|
if not config:
|
|
return 1
|
|
|
|
# Setup UniFi client
|
|
unifi_client = setup_unifi_client(config, logger)
|
|
if not unifi_client:
|
|
return 1
|
|
|
|
# Create managers
|
|
discovery = TasmotaDiscovery(config, unifi_client, logger)
|
|
config_manager = ConfigurationManager(config, logger)
|
|
console_manager = ConsoleSettingsManager(config, logger)
|
|
unknown_processor = UnknownDeviceProcessor(config, config_manager, logger)
|
|
report_gen = ReportGenerator(config, discovery, logger)
|
|
|
|
# Handle device comparison mode
|
|
if args.diff:
|
|
if len(args.diff) < 1 or len(args.diff) > 2:
|
|
logger.error("--diff requires 1 or 2 device arguments")
|
|
return 1
|
|
|
|
device_comp = DeviceComparison(config, logger)
|
|
|
|
# Get devices list to resolve names/IPs
|
|
devices = discovery.get_tasmota_devices()
|
|
|
|
# Find device 1
|
|
device1 = find_device_by_identifier(devices, args.diff[0], logger)
|
|
if not device1:
|
|
logger.error(f"Device not found: {args.diff[0]}")
|
|
return 1
|
|
|
|
if len(args.diff) == 1:
|
|
# Single device: compare to config file
|
|
# Get device type from Status
|
|
from utils import send_tasmota_command
|
|
result, success = send_tasmota_command(device1['ip'], "Status%200", timeout=10, logger=logger)
|
|
if not success or not result:
|
|
logger.error(f"Failed to get device status for {device1['name']}")
|
|
return 1
|
|
|
|
device_type = result.get('Status', {}).get('DeviceName', '')
|
|
if not device_type:
|
|
logger.error(f"Could not determine device type for {device1['name']}")
|
|
return 1
|
|
|
|
comparison = device_comp.compare_device_to_config(
|
|
device1['ip'], device1['name'], device_type
|
|
)
|
|
else:
|
|
# Two devices: compare to each other
|
|
device2 = find_device_by_identifier(devices, args.diff[1], logger)
|
|
if not device2:
|
|
logger.error(f"Device 2 not found: {args.diff[1]}")
|
|
return 1
|
|
|
|
comparison = device_comp.compare_devices(
|
|
device1['ip'], device1['name'],
|
|
device2['ip'], device2['name']
|
|
)
|
|
|
|
# Print report
|
|
device_comp.print_comparison_report(comparison)
|
|
return 0
|
|
|
|
# Handle hostname report mode
|
|
if args.unifi_hostname_report:
|
|
report_gen.generate_unifi_hostname_report()
|
|
return 0
|
|
|
|
# Get devices
|
|
if args.skip_unifi:
|
|
logger.info("Using existing device data")
|
|
current_file = get_data_file_path('current.json')
|
|
devices = load_json_file(current_file, logger)
|
|
if not devices:
|
|
logger.error("No existing device data found")
|
|
return 1
|
|
else:
|
|
devices = discovery.get_tasmota_devices()
|
|
|
|
# Save device list
|
|
previous_data = load_json_file(get_data_file_path('current.json'), logger)
|
|
discovery.save_tasmota_config(devices, previous_data)
|
|
|
|
# Handle single device mode
|
|
if args.Device:
|
|
device = find_device_by_identifier(devices, args.Device, logger)
|
|
if not device:
|
|
return 1
|
|
devices = [device]
|
|
|
|
# Handle unknown device processing
|
|
if args.process_unknown:
|
|
unknown_devices = discovery.get_unknown_devices(devices)
|
|
unknown_processor.process_unknown_devices(unknown_devices)
|
|
return 0
|
|
|
|
# Process all devices
|
|
logger.info(f"\nProcessing {len(devices)} devices...")
|
|
device_details_list, stats = process_devices(devices, config_manager, console_manager, logger, report_gen)
|
|
|
|
# Save device details
|
|
report_gen.save_device_details(device_details_list)
|
|
|
|
# Print summaries
|
|
report_gen.print_processing_summary(
|
|
stats['processed'],
|
|
stats['mqtt_updated'],
|
|
stats['console_updated'],
|
|
stats['failed']
|
|
)
|
|
|
|
console_manager.print_failure_summary()
|
|
|
|
# Print errors and warnings summary
|
|
report_gen.print_errors_and_warnings_summary()
|
|
|
|
logger.info("TasmotaManager completed")
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main()) |