"""Main entry point for TasmotaManager.""" import argparse import logging import sys from typing import Optional from concurrent.futures import ThreadPoolExecutor, as_completed from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern from unifi_client import UnifiClient, AuthenticationError from discovery import TasmotaDiscovery from configuration import ConfigurationManager from console_settings import ConsoleSettingsManager from unknown_devices import UnknownDeviceProcessor from reporting import ReportGenerator def setup_logging(debug: bool = False) -> logging.Logger: """ Setup logging configuration. Args: debug: Enable debug logging Returns: Logger instance """ level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) return logging.getLogger('TasmotaManager') def load_config(config_path: str, logger: logging.Logger) -> Optional[dict]: """ Load configuration file. Args: config_path: Path to configuration file logger: Logger instance Returns: Configuration dictionary or None """ config = load_json_file(config_path, logger) if not config: logger.error(f"Failed to load configuration from {config_path}") return None # Validate required sections required_sections = ['unifi', 'mqtt'] for section in required_sections: if section not in config: logger.error(f"Configuration missing required section: {section}") return None return config def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiClient]: """ Setup UniFi client. Args: config: Configuration dictionary logger: Logger instance Returns: UnifiClient instance or None """ unifi_config = config.get('unifi', {}) try: client = UnifiClient( host=unifi_config['host'], username=unifi_config['username'], password=unifi_config['password'], site=unifi_config.get('site', 'default'), verify_ssl=False, logger=logger ) return client except AuthenticationError as e: logger.error(f"UniFi authentication failed: {e}") return None except Exception as e: logger.error(f"Failed to setup UniFi client: {e}") return None def process_single_device(device: dict, config_manager: ConfigurationManager, console_manager: ConsoleSettingsManager, logger: logging.Logger, report_gen: 'ReportGenerator') -> tuple: """ Process a single device for configuration. Args: device: Device to process config_manager: Configuration manager instance console_manager: Console settings manager instance logger: Logger instance report_gen: Report generator for error collection Returns: Tuple of (device_info, success, messages) """ device_name = device.get('name', 'Unknown') device_ip = device.get('ip', '') messages = [] try: logger.debug(f"Processing: {device_name} ({device_ip})") # Get device details device_details = config_manager.get_device_details(device_ip, device_name) if not device_details: messages.append(f" {device_name}: Could not get device details, skipping") report_gen.add_error(device_name, f"Connection failed to {device_ip}") return None, False, messages # Check and update template template_success = config_manager.check_and_update_template(device, device_details) if template_success: logger.debug(f" {device_name}: Template checked/updated") # Refresh device details after template update device_details = config_manager.get_device_details(device_ip, device_name) # Configure MQTT mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details) if mqtt_success: if mqtt_status == "Updated": logger.debug(f" {device_name}: MQTT updated") else: logger.debug(f" {device_name}: MQTT already configured") else: messages.append(f" {device_name}: MQTT configuration failed - {mqtt_status}") report_gen.add_error(device_name, f"MQTT configuration failed: {mqtt_status}") # Apply console settings console_success, console_status = console_manager.apply_console_settings(device, device_details) if console_success: if console_status == "Applied": logger.debug(f" {device_name}: Console settings applied") elif console_status != "No console settings" and console_status != "Empty console set": logger.debug(f" {device_name}: Console settings - {console_status}") else: messages.append(f" {device_name}: Console settings failed - {console_status}") report_gen.add_warning(device_name, f"Console settings failed: {console_status}") # Save device details device_info = { **device, 'mqtt_status': mqtt_status, 'console_status': console_status, 'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown') } logger.debug(f" {device_name}: ✓ Processing completed successfully") return device_info, True, messages except Exception as e: messages.append(f" {device_name}: ✗ Error during processing: {e}") report_gen.add_error(device_name, f"Unexpected error: {str(e)}") return None, False, messages def process_devices(devices: list, config_manager: ConfigurationManager, console_manager: ConsoleSettingsManager, logger: logging.Logger, report_gen: 'ReportGenerator', max_workers: int = 10): """ Process all devices for configuration in parallel. Args: devices: List of devices to process config_manager: Configuration manager instance console_manager: Console settings manager instance logger: Logger instance report_gen: Report generator for error collection max_workers: Maximum number of parallel workers (default: 10) """ device_details_list = [] stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0} all_messages = [] logger.info(f"\nProcessing {len(devices)} devices in parallel (max {max_workers} workers)...") # Process devices in parallel using ThreadPoolExecutor with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all device processing tasks future_to_device = { executor.submit(process_single_device, device, config_manager, console_manager, logger, report_gen): device for device in devices } # Collect results as they complete for future in as_completed(future_to_device): device = future_to_device[future] device_name = device.get('name', 'Unknown') try: device_info, success, messages = future.result() # Store messages for reporting (only errors/warnings) all_messages.extend(messages) if success and device_info: device_details_list.append(device_info) stats['processed'] += 1 # Track MQTT and console updates if device_info.get('mqtt_status') == 'Updated': stats['mqtt_updated'] += 1 if device_info.get('console_status') == 'Applied': stats['console_updated'] += 1 else: stats['failed'] += 1 except Exception as e: all_messages.append(f"{device_name}: Unexpected error: {e}") report_gen.add_error(device_name, f"Processing exception: {str(e)}") stats['failed'] += 1 # Print only error/warning messages (messages list now only contains issues) if all_messages: logger.info("\n" + "=" * 60) logger.info("PROCESSING ISSUES") logger.info("=" * 60) for message in all_messages: logger.info(message) return device_details_list, stats def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]: """ Find a device by IP address or hostname. Args: devices: List of devices identifier: IP address or hostname (with optional wildcards) logger: Logger instance Returns: Device dictionary or None """ # Check if it's an IP address if is_valid_ip(identifier): for device in devices: if device.get('ip') == identifier: return device logger.error(f"No device found with IP: {identifier}") return None # Search by hostname with pattern matching matches = [] for device in devices: device_name = device.get('name', '') device_hostname = device.get('hostname', '') # Try exact match first if device_name.lower() == identifier.lower() or device_hostname.lower() == identifier.lower(): return device # Try pattern matching if match_pattern(device_name, identifier, match_entire_string=False) or \ match_pattern(device_hostname, identifier, match_entire_string=False): matches.append(device) if len(matches) == 0: logger.error(f"No device found matching: {identifier}") return None elif len(matches) == 1: return matches[0] else: logger.warning(f"Multiple devices match '{identifier}':") for device in matches: logger.warning(f" - {device.get('name')} ({device.get('ip')})") logger.info(f"Using first match: {matches[0].get('name')}") return matches[0] def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Tasmota Device Manager') parser.add_argument('--config', default='network_configuration.json', help='Path to configuration file') parser.add_argument('--debug', action='store_true', help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing data') parser.add_argument('--process-unknown', action='store_true', help='Process unknown devices interactively') parser.add_argument('--unifi-hostname-report', action='store_true', help='Generate UniFi hostname comparison report') parser.add_argument('--Device', type=str, help='Process single device by IP or hostname') args = parser.parse_args() # Setup logging logger = setup_logging(args.debug) logger.info("TasmotaManager v2.0 starting") # Ensure data directory exists ensure_data_directory() # Load configuration config = load_config(args.config, logger) if not config: return 1 # Setup UniFi client unifi_client = setup_unifi_client(config, logger) if not unifi_client: return 1 # Create managers discovery = TasmotaDiscovery(config, unifi_client, logger) config_manager = ConfigurationManager(config, logger) console_manager = ConsoleSettingsManager(config, logger) unknown_processor = UnknownDeviceProcessor(config, config_manager, logger) report_gen = ReportGenerator(config, discovery, logger) # Handle hostname report mode if args.unifi_hostname_report: report_gen.generate_unifi_hostname_report() return 0 # Get devices if args.skip_unifi: logger.info("Using existing device data") current_file = get_data_file_path('current.json') devices = load_json_file(current_file, logger) if not devices: logger.error("No existing device data found") return 1 else: devices = discovery.get_tasmota_devices() # Save device list previous_data = load_json_file(get_data_file_path('current.json'), logger) discovery.save_tasmota_config(devices, previous_data) # Handle single device mode if args.Device: device = find_device_by_identifier(devices, args.Device, logger) if not device: return 1 devices = [device] # Handle unknown device processing if args.process_unknown: unknown_devices = discovery.get_unknown_devices(devices) unknown_processor.process_unknown_devices(unknown_devices) return 0 # Process all devices logger.info(f"\nProcessing {len(devices)} devices...") device_details_list, stats = process_devices(devices, config_manager, console_manager, logger, report_gen) # Save device details report_gen.save_device_details(device_details_list) # Print summaries report_gen.print_processing_summary( stats['processed'], stats['mqtt_updated'], stats['console_updated'], stats['failed'] ) console_manager.print_failure_summary() # Print errors and warnings summary report_gen.print_errors_and_warnings_summary() logger.info("TasmotaManager completed") return 0 if __name__ == '__main__': sys.exit(main())