- Renamed main.py to TasmotaManager.py for consistency with project name - Updated pyproject.toml to reflect the new module name - All functionality remains the same
309 lines
9.7 KiB
Python
309 lines
9.7 KiB
Python
"""Main entry point for TasmotaManager."""
|
|
|
|
import argparse
|
|
import logging
|
|
import sys
|
|
from typing import Optional
|
|
|
|
from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern
|
|
from unifi_client import UnifiClient, AuthenticationError
|
|
from discovery import TasmotaDiscovery
|
|
from configuration import ConfigurationManager
|
|
from console_settings import ConsoleSettingsManager
|
|
from unknown_devices import UnknownDeviceProcessor
|
|
from reporting import ReportGenerator
|
|
|
|
|
|
def setup_logging(debug: bool = False) -> logging.Logger:
|
|
"""
|
|
Setup logging configuration.
|
|
|
|
Args:
|
|
debug: Enable debug logging
|
|
|
|
Returns:
|
|
Logger instance
|
|
"""
|
|
level = logging.DEBUG if debug else logging.INFO
|
|
|
|
logging.basicConfig(
|
|
level=level,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
|
|
return logging.getLogger('TasmotaManager')
|
|
|
|
|
|
def load_config(config_path: str, logger: logging.Logger) -> Optional[dict]:
|
|
"""
|
|
Load configuration file.
|
|
|
|
Args:
|
|
config_path: Path to configuration file
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
Configuration dictionary or None
|
|
"""
|
|
config = load_json_file(config_path, logger)
|
|
|
|
if not config:
|
|
logger.error(f"Failed to load configuration from {config_path}")
|
|
return None
|
|
|
|
# Validate required sections
|
|
required_sections = ['unifi', 'mqtt']
|
|
for section in required_sections:
|
|
if section not in config:
|
|
logger.error(f"Configuration missing required section: {section}")
|
|
return None
|
|
|
|
return config
|
|
|
|
|
|
def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiClient]:
|
|
"""
|
|
Setup UniFi client.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
UnifiClient instance or None
|
|
"""
|
|
unifi_config = config.get('unifi', {})
|
|
|
|
try:
|
|
client = UnifiClient(
|
|
host=unifi_config['host'],
|
|
username=unifi_config['username'],
|
|
password=unifi_config['password'],
|
|
site=unifi_config.get('site', 'default'),
|
|
verify_ssl=False,
|
|
logger=logger
|
|
)
|
|
return client
|
|
|
|
except AuthenticationError as e:
|
|
logger.error(f"UniFi authentication failed: {e}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to setup UniFi client: {e}")
|
|
return None
|
|
|
|
|
|
def process_devices(devices: list, config_manager: ConfigurationManager,
|
|
console_manager: ConsoleSettingsManager, logger: logging.Logger):
|
|
"""
|
|
Process all devices for configuration.
|
|
|
|
Args:
|
|
devices: List of devices to process
|
|
config_manager: Configuration manager instance
|
|
console_manager: Console settings manager instance
|
|
logger: Logger instance
|
|
"""
|
|
device_details_list = []
|
|
stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0}
|
|
|
|
for device in devices:
|
|
device_name = device.get('name', 'Unknown')
|
|
device_ip = device.get('ip', '')
|
|
|
|
logger.info(f"\nProcessing: {device_name} ({device_ip})")
|
|
|
|
try:
|
|
# Get device details
|
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
|
|
|
if not device_details:
|
|
logger.warning(f"{device_name}: Could not get device details, skipping")
|
|
stats['failed'] += 1
|
|
continue
|
|
|
|
# Check and update template
|
|
template_success = config_manager.check_and_update_template(device, device_details)
|
|
|
|
# Refresh device details after template update
|
|
if template_success:
|
|
device_details = config_manager.get_device_details(device_ip, device_name)
|
|
|
|
# Configure MQTT
|
|
mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details)
|
|
|
|
if mqtt_success and mqtt_status == "Updated":
|
|
stats['mqtt_updated'] += 1
|
|
|
|
# Apply console settings
|
|
console_success, console_status = console_manager.apply_console_settings(device, device_details)
|
|
|
|
if console_success and console_status == "Applied":
|
|
stats['console_updated'] += 1
|
|
|
|
# Save device details
|
|
device_info = {
|
|
**device,
|
|
'mqtt_status': mqtt_status,
|
|
'console_status': console_status,
|
|
'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown')
|
|
}
|
|
device_details_list.append(device_info)
|
|
|
|
stats['processed'] += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"{device_name}: Error during processing: {e}")
|
|
stats['failed'] += 1
|
|
|
|
return device_details_list, stats
|
|
|
|
|
|
def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]:
|
|
"""
|
|
Find a device by IP address or hostname.
|
|
|
|
Args:
|
|
devices: List of devices
|
|
identifier: IP address or hostname (with optional wildcards)
|
|
logger: Logger instance
|
|
|
|
Returns:
|
|
Device dictionary or None
|
|
"""
|
|
# Check if it's an IP address
|
|
if is_valid_ip(identifier):
|
|
for device in devices:
|
|
if device.get('ip') == identifier:
|
|
return device
|
|
logger.error(f"No device found with IP: {identifier}")
|
|
return None
|
|
|
|
# Search by hostname with pattern matching
|
|
matches = []
|
|
|
|
for device in devices:
|
|
device_name = device.get('name', '')
|
|
device_hostname = device.get('hostname', '')
|
|
|
|
# Try exact match first
|
|
if device_name.lower() == identifier.lower() or device_hostname.lower() == identifier.lower():
|
|
return device
|
|
|
|
# Try pattern matching
|
|
if match_pattern(device_name, identifier, match_entire_string=False) or \
|
|
match_pattern(device_hostname, identifier, match_entire_string=False):
|
|
matches.append(device)
|
|
|
|
if len(matches) == 0:
|
|
logger.error(f"No device found matching: {identifier}")
|
|
return None
|
|
elif len(matches) == 1:
|
|
return matches[0]
|
|
else:
|
|
logger.warning(f"Multiple devices match '{identifier}':")
|
|
for device in matches:
|
|
logger.warning(f" - {device.get('name')} ({device.get('ip')})")
|
|
logger.info(f"Using first match: {matches[0].get('name')}")
|
|
return matches[0]
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
|
parser.add_argument('--config', default='network_configuration.json',
|
|
help='Path to configuration file')
|
|
parser.add_argument('--debug', action='store_true',
|
|
help='Enable debug logging')
|
|
parser.add_argument('--skip-unifi', action='store_true',
|
|
help='Skip UniFi discovery and use existing data')
|
|
parser.add_argument('--process-unknown', action='store_true',
|
|
help='Process unknown devices interactively')
|
|
parser.add_argument('--unifi-hostname-report', action='store_true',
|
|
help='Generate UniFi hostname comparison report')
|
|
parser.add_argument('--Device', type=str,
|
|
help='Process single device by IP or hostname')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Setup logging
|
|
logger = setup_logging(args.debug)
|
|
logger.info("TasmotaManager v2.0 starting")
|
|
|
|
# Ensure data directory exists
|
|
ensure_data_directory()
|
|
|
|
# Load configuration
|
|
config = load_config(args.config, logger)
|
|
if not config:
|
|
return 1
|
|
|
|
# Setup UniFi client
|
|
unifi_client = setup_unifi_client(config, logger)
|
|
if not unifi_client:
|
|
return 1
|
|
|
|
# Create managers
|
|
discovery = TasmotaDiscovery(config, unifi_client, logger)
|
|
config_manager = ConfigurationManager(config, logger)
|
|
console_manager = ConsoleSettingsManager(config, logger)
|
|
unknown_processor = UnknownDeviceProcessor(config, config_manager, logger)
|
|
report_gen = ReportGenerator(config, discovery, logger)
|
|
|
|
# Handle hostname report mode
|
|
if args.unifi_hostname_report:
|
|
report_gen.generate_unifi_hostname_report()
|
|
return 0
|
|
|
|
# Get devices
|
|
if args.skip_unifi:
|
|
logger.info("Using existing device data")
|
|
current_file = get_data_file_path('current.json')
|
|
devices = load_json_file(current_file, logger)
|
|
if not devices:
|
|
logger.error("No existing device data found")
|
|
return 1
|
|
else:
|
|
devices = discovery.get_tasmota_devices()
|
|
|
|
# Save device list
|
|
previous_data = load_json_file(get_data_file_path('current.json'), logger)
|
|
discovery.save_tasmota_config(devices, previous_data)
|
|
|
|
# Handle single device mode
|
|
if args.Device:
|
|
device = find_device_by_identifier(devices, args.Device, logger)
|
|
if not device:
|
|
return 1
|
|
devices = [device]
|
|
|
|
# Handle unknown device processing
|
|
if args.process_unknown:
|
|
unknown_devices = discovery.get_unknown_devices(devices)
|
|
unknown_processor.process_unknown_devices(unknown_devices)
|
|
return 0
|
|
|
|
# Process all devices
|
|
logger.info(f"\nProcessing {len(devices)} devices...")
|
|
device_details_list, stats = process_devices(devices, config_manager, console_manager, logger)
|
|
|
|
# Save device details
|
|
report_gen.save_device_details(device_details_list)
|
|
|
|
# Print summaries
|
|
report_gen.print_processing_summary(
|
|
stats['processed'],
|
|
stats['mqtt_updated'],
|
|
stats['console_updated'],
|
|
stats['failed']
|
|
)
|
|
|
|
console_manager.print_failure_summary()
|
|
|
|
logger.info("TasmotaManager completed")
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main()) |