"""Main entry point for TasmotaManager.""" import argparse import logging import sys from typing import Optional from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern from unifi_client import UnifiClient, AuthenticationError from discovery import TasmotaDiscovery from configuration import ConfigurationManager from console_settings import ConsoleSettingsManager from unknown_devices import UnknownDeviceProcessor from reporting import ReportGenerator def setup_logging(debug: bool = False) -> logging.Logger: """ Setup logging configuration. Args: debug: Enable debug logging Returns: Logger instance """ level = logging.DEBUG if debug else logging.INFO logging.basicConfig( level=level, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) return logging.getLogger('TasmotaManager') def load_config(config_path: str, logger: logging.Logger) -> Optional[dict]: """ Load configuration file. Args: config_path: Path to configuration file logger: Logger instance Returns: Configuration dictionary or None """ config = load_json_file(config_path, logger) if not config: logger.error(f"Failed to load configuration from {config_path}") return None # Validate required sections required_sections = ['unifi', 'mqtt'] for section in required_sections: if section not in config: logger.error(f"Configuration missing required section: {section}") return None return config def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiClient]: """ Setup UniFi client. Args: config: Configuration dictionary logger: Logger instance Returns: UnifiClient instance or None """ unifi_config = config.get('unifi', {}) try: client = UnifiClient( host=unifi_config['host'], username=unifi_config['username'], password=unifi_config['password'], site=unifi_config.get('site', 'default'), verify_ssl=False, logger=logger ) return client except AuthenticationError as e: logger.error(f"UniFi authentication failed: {e}") return None except Exception as e: logger.error(f"Failed to setup UniFi client: {e}") return None def process_devices(devices: list, config_manager: ConfigurationManager, console_manager: ConsoleSettingsManager, logger: logging.Logger): """ Process all devices for configuration. Args: devices: List of devices to process config_manager: Configuration manager instance console_manager: Console settings manager instance logger: Logger instance """ device_details_list = [] stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0} for device in devices: device_name = device.get('name', 'Unknown') device_ip = device.get('ip', '') logger.info(f"\nProcessing: {device_name} ({device_ip})") try: # Get device details device_details = config_manager.get_device_details(device_ip, device_name) if not device_details: logger.warning(f"{device_name}: Could not get device details, skipping") stats['failed'] += 1 continue # Check and update template template_success = config_manager.check_and_update_template(device, device_details) # Refresh device details after template update if template_success: device_details = config_manager.get_device_details(device_ip, device_name) # Configure MQTT mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details) if mqtt_success and mqtt_status == "Updated": stats['mqtt_updated'] += 1 # Apply console settings console_success, console_status = console_manager.apply_console_settings(device, device_details) if console_success and console_status == "Applied": stats['console_updated'] += 1 # Save device details device_info = { **device, 'mqtt_status': mqtt_status, 'console_status': console_status, 'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown') } device_details_list.append(device_info) stats['processed'] += 1 except Exception as e: logger.error(f"{device_name}: Error during processing: {e}") stats['failed'] += 1 return device_details_list, stats def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]: """ Find a device by IP address or hostname. Args: devices: List of devices identifier: IP address or hostname (with optional wildcards) logger: Logger instance Returns: Device dictionary or None """ # Check if it's an IP address if is_valid_ip(identifier): for device in devices: if device.get('ip') == identifier: return device logger.error(f"No device found with IP: {identifier}") return None # Search by hostname with pattern matching matches = [] for device in devices: device_name = device.get('name', '') device_hostname = device.get('hostname', '') # Try exact match first if device_name.lower() == identifier.lower() or device_hostname.lower() == identifier.lower(): return device # Try pattern matching if match_pattern(device_name, identifier, match_entire_string=False) or \ match_pattern(device_hostname, identifier, match_entire_string=False): matches.append(device) if len(matches) == 0: logger.error(f"No device found matching: {identifier}") return None elif len(matches) == 1: return matches[0] else: logger.warning(f"Multiple devices match '{identifier}':") for device in matches: logger.warning(f" - {device.get('name')} ({device.get('ip')})") logger.info(f"Using first match: {matches[0].get('name')}") return matches[0] def main(): """Main entry point.""" parser = argparse.ArgumentParser(description='Tasmota Device Manager') parser.add_argument('--config', default='network_configuration.json', help='Path to configuration file') parser.add_argument('--debug', action='store_true', help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing data') parser.add_argument('--process-unknown', action='store_true', help='Process unknown devices interactively') parser.add_argument('--unifi-hostname-report', action='store_true', help='Generate UniFi hostname comparison report') parser.add_argument('--Device', type=str, help='Process single device by IP or hostname') args = parser.parse_args() # Setup logging logger = setup_logging(args.debug) logger.info("TasmotaManager v2.0 starting") # Ensure data directory exists ensure_data_directory() # Load configuration config = load_config(args.config, logger) if not config: return 1 # Setup UniFi client unifi_client = setup_unifi_client(config, logger) if not unifi_client: return 1 # Create managers discovery = TasmotaDiscovery(config, unifi_client, logger) config_manager = ConfigurationManager(config, logger) console_manager = ConsoleSettingsManager(config, logger) unknown_processor = UnknownDeviceProcessor(config, config_manager, logger) report_gen = ReportGenerator(config, discovery, logger) # Handle hostname report mode if args.unifi_hostname_report: report_gen.generate_unifi_hostname_report() return 0 # Get devices if args.skip_unifi: logger.info("Using existing device data") current_file = get_data_file_path('current.json') devices = load_json_file(current_file, logger) if not devices: logger.error("No existing device data found") return 1 else: devices = discovery.get_tasmota_devices() # Save device list previous_data = load_json_file(get_data_file_path('current.json'), logger) discovery.save_tasmota_config(devices, previous_data) # Handle single device mode if args.Device: device = find_device_by_identifier(devices, args.Device, logger) if not device: return 1 devices = [device] # Handle unknown device processing if args.process_unknown: unknown_devices = discovery.get_unknown_devices(devices) unknown_processor.process_unknown_devices(unknown_devices) return 0 # Process all devices logger.info(f"\nProcessing {len(devices)} devices...") device_details_list, stats = process_devices(devices, config_manager, console_manager, logger) # Save device details report_gen.save_device_details(device_details_list) # Print summaries report_gen.print_processing_summary( stats['processed'], stats['mqtt_updated'], stats['console_updated'], stats['failed'] ) console_manager.print_failure_summary() logger.info("TasmotaManager completed") return 0 if __name__ == '__main__': sys.exit(main())