TasmotaManager/TasmotaManager.py
Mike Geppert 41ab2e930b Bump version to 2.1
Major improvements in v2.1:
- Fixed console settings matching to use DeviceName and template NAME fields
- Proper URL encoding for rules (handles # and = characters)
- Fixed rule enabling (Rule1 ON vs Rule1 4)
- Fixed MQTT FullTopic %20 prefix issue
- Added single-device diff (--diff DEVICE)
- Improved button timing (SetOption32=8 for 0.8s hold)
- Created Plug profile separate from switch settings
- Fixed SetOption verification and disabled to prevent device overload
- Added delays and retry logic for reliable rule enabling
2026-01-07 22:14:57 -06:00

448 lines
16 KiB
Python

"""Main entry point for TasmotaManager."""
import argparse
import logging
import sys
from typing import Optional
from concurrent.futures import ThreadPoolExecutor, as_completed
from utils import load_json_file, ensure_data_directory, get_data_file_path, is_valid_ip, match_pattern
from unifi_client import UnifiClient, AuthenticationError
from discovery import TasmotaDiscovery
from configuration import ConfigurationManager
from console_settings import ConsoleSettingsManager
from unknown_devices import UnknownDeviceProcessor
from reporting import ReportGenerator
from device_diff import DeviceComparison
def setup_logging(debug: bool = False) -> logging.Logger:
"""
Setup logging configuration.
Args:
debug: Enable debug logging
Returns:
Logger instance
"""
level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
return logging.getLogger('TasmotaManager')
def load_config(config_path: str, logger: logging.Logger) -> Optional[dict]:
"""
Load configuration file.
Args:
config_path: Path to configuration file
logger: Logger instance
Returns:
Configuration dictionary or None
"""
config = load_json_file(config_path, logger)
if not config:
logger.error(f"Failed to load configuration from {config_path}")
return None
# Validate required sections
required_sections = ['unifi', 'mqtt']
for section in required_sections:
if section not in config:
logger.error(f"Configuration missing required section: {section}")
return None
return config
def setup_unifi_client(config: dict, logger: logging.Logger) -> Optional[UnifiClient]:
"""
Setup UniFi client.
Args:
config: Configuration dictionary
logger: Logger instance
Returns:
UnifiClient instance or None
"""
unifi_config = config.get('unifi', {})
try:
client = UnifiClient(
host=unifi_config['host'],
username=unifi_config['username'],
password=unifi_config['password'],
site=unifi_config.get('site', 'default'),
verify_ssl=False,
logger=logger
)
return client
except AuthenticationError as e:
logger.error(f"UniFi authentication failed: {e}")
return None
except Exception as e:
logger.error(f"Failed to setup UniFi client: {e}")
return None
def process_single_device(device: dict, config_manager: ConfigurationManager,
console_manager: ConsoleSettingsManager,
logger: logging.Logger,
report_gen: 'ReportGenerator') -> tuple:
"""
Process a single device for configuration.
Args:
device: Device to process
config_manager: Configuration manager instance
console_manager: Console settings manager instance
logger: Logger instance
report_gen: Report generator for error collection
Returns:
Tuple of (device_info, success, messages)
"""
device_name = device.get('name', 'Unknown')
device_ip = device.get('ip', '')
messages = []
try:
logger.debug(f"Processing: {device_name} ({device_ip})")
# Get device details
device_details = config_manager.get_device_details(device_ip, device_name)
if not device_details:
messages.append(f" {device_name}: Could not get device details, skipping")
report_gen.add_error(device_name, f"Connection failed to {device_ip}")
return None, False, messages
# Check and update template
template_success = config_manager.check_and_update_template(device, device_details)
if template_success:
logger.debug(f" {device_name}: Template checked/updated")
# Refresh device details after template update
device_details = config_manager.get_device_details(device_ip, device_name)
# Configure MQTT
mqtt_success, mqtt_status = config_manager.configure_mqtt_settings(device, device_details)
if mqtt_success:
if mqtt_status == "Updated":
logger.debug(f" {device_name}: MQTT updated")
else:
logger.debug(f" {device_name}: MQTT already configured")
else:
messages.append(f" {device_name}: MQTT configuration failed - {mqtt_status}")
report_gen.add_error(device_name, f"MQTT configuration failed: {mqtt_status}")
# Apply console settings
console_success, console_status = console_manager.apply_console_settings(device, device_details)
if console_success:
if console_status == "Applied":
logger.debug(f" {device_name}: Console settings applied")
elif console_status != "No console settings" and console_status != "Empty console set":
logger.debug(f" {device_name}: Console settings - {console_status}")
else:
messages.append(f" {device_name}: Console settings failed - {console_status}")
report_gen.add_warning(device_name, f"Console settings failed: {console_status}")
# Save device details
device_info = {
**device,
'mqtt_status': mqtt_status,
'console_status': console_status,
'firmware': device_details.get('StatusFWR', {}).get('Version', 'Unknown')
}
logger.debug(f" {device_name}: ✓ Processing completed successfully")
return device_info, True, messages
except Exception as e:
messages.append(f" {device_name}: ✗ Error during processing: {e}")
report_gen.add_error(device_name, f"Unexpected error: {str(e)}")
return None, False, messages
def process_devices(devices: list, config_manager: ConfigurationManager,
console_manager: ConsoleSettingsManager, logger: logging.Logger,
report_gen: 'ReportGenerator', max_workers: int = 10):
"""
Process all devices for configuration in parallel.
Args:
devices: List of devices to process
config_manager: Configuration manager instance
console_manager: Console settings manager instance
logger: Logger instance
report_gen: Report generator for error collection
max_workers: Maximum number of parallel workers (default: 10)
"""
device_details_list = []
stats = {'processed': 0, 'mqtt_updated': 0, 'console_updated': 0, 'failed': 0}
all_messages = []
logger.info(f"\nProcessing {len(devices)} devices in parallel (max {max_workers} workers)...")
# Process devices in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all device processing tasks
future_to_device = {
executor.submit(process_single_device, device, config_manager,
console_manager, logger, report_gen): device
for device in devices
}
# Collect results as they complete
for future in as_completed(future_to_device):
device = future_to_device[future]
device_name = device.get('name', 'Unknown')
try:
device_info, success, messages = future.result()
# Store messages for reporting (only errors/warnings)
all_messages.extend(messages)
if success and device_info:
device_details_list.append(device_info)
stats['processed'] += 1
# Track MQTT and console updates
if device_info.get('mqtt_status') == 'Updated':
stats['mqtt_updated'] += 1
if device_info.get('console_status') == 'Applied':
stats['console_updated'] += 1
else:
stats['failed'] += 1
except Exception as e:
all_messages.append(f"{device_name}: Unexpected error: {e}")
report_gen.add_error(device_name, f"Processing exception: {str(e)}")
stats['failed'] += 1
# Print only error/warning messages (messages list now only contains issues)
if all_messages:
logger.info("\n" + "=" * 60)
logger.info("PROCESSING ISSUES")
logger.info("=" * 60)
for message in all_messages:
logger.info(message)
return device_details_list, stats
def find_device_by_identifier(devices: list, identifier: str, logger: logging.Logger) -> Optional[dict]:
"""
Find a device by IP address or hostname.
Args:
devices: List of devices
identifier: IP address or hostname (with optional wildcards)
logger: Logger instance
Returns:
Device dictionary or None
"""
# Check if it's an IP address
if is_valid_ip(identifier):
for device in devices:
if device.get('ip') == identifier:
return device
logger.error(f"No device found with IP: {identifier}")
return None
# Search by hostname with pattern matching
matches = []
for device in devices:
device_name = device.get('name', '')
device_hostname = device.get('hostname', '')
# Try exact match first
if device_name.lower() == identifier.lower() or device_hostname.lower() == identifier.lower():
return device
# Try pattern matching
if match_pattern(device_name, identifier, match_entire_string=False) or \
match_pattern(device_hostname, identifier, match_entire_string=False):
matches.append(device)
if len(matches) == 0:
logger.error(f"No device found matching: {identifier}")
return None
elif len(matches) == 1:
return matches[0]
else:
logger.warning(f"Multiple devices match '{identifier}':")
for device in matches:
logger.warning(f" - {device.get('name')} ({device.get('ip')})")
logger.info(f"Using first match: {matches[0].get('name')}")
return matches[0]
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
parser.add_argument('--config', default='network_configuration.json',
help='Path to configuration file')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--skip-unifi', action='store_true',
help='Skip UniFi discovery and use existing data')
parser.add_argument('--process-unknown', action='store_true',
help='Process unknown devices interactively')
parser.add_argument('--unifi-hostname-report', action='store_true',
help='Generate UniFi hostname comparison report')
parser.add_argument('--Device', type=str,
help='Process single device by IP or hostname')
parser.add_argument('--diff', nargs='+', metavar='DEVICE',
help='Compare devices: --diff DEVICE (vs config) or --diff DEVICE1 DEVICE2 (vs each other)')
args = parser.parse_args()
# Setup logging
logger = setup_logging(args.debug)
logger.info("TasmotaManager v2.1 starting")
# Ensure data directory exists
ensure_data_directory()
# Load configuration
config = load_config(args.config, logger)
if not config:
return 1
# Setup UniFi client
unifi_client = setup_unifi_client(config, logger)
if not unifi_client:
return 1
# Create managers
discovery = TasmotaDiscovery(config, unifi_client, logger)
config_manager = ConfigurationManager(config, logger)
console_manager = ConsoleSettingsManager(config, logger)
unknown_processor = UnknownDeviceProcessor(config, config_manager, logger)
report_gen = ReportGenerator(config, discovery, logger)
# Handle device comparison mode
if args.diff:
if len(args.diff) < 1 or len(args.diff) > 2:
logger.error("--diff requires 1 or 2 device arguments")
return 1
device_comp = DeviceComparison(config, logger)
# Get devices list to resolve names/IPs
devices = discovery.get_tasmota_devices()
# Find device 1
device1 = find_device_by_identifier(devices, args.diff[0], logger)
if not device1:
logger.error(f"Device not found: {args.diff[0]}")
return 1
if len(args.diff) == 1:
# Single device: compare to config file
# Get device type from Status
from utils import send_tasmota_command
result, success = send_tasmota_command(device1['ip'], "Status%200", timeout=10, logger=logger)
if not success or not result:
logger.error(f"Failed to get device status for {device1['name']}")
return 1
device_type = result.get('Status', {}).get('DeviceName', '')
if not device_type:
logger.error(f"Could not determine device type for {device1['name']}")
return 1
comparison = device_comp.compare_device_to_config(
device1['ip'], device1['name'], device_type
)
else:
# Two devices: compare to each other
device2 = find_device_by_identifier(devices, args.diff[1], logger)
if not device2:
logger.error(f"Device 2 not found: {args.diff[1]}")
return 1
comparison = device_comp.compare_devices(
device1['ip'], device1['name'],
device2['ip'], device2['name']
)
# Print report
device_comp.print_comparison_report(comparison)
return 0
# Handle hostname report mode
if args.unifi_hostname_report:
report_gen.generate_unifi_hostname_report()
return 0
# Get devices
if args.skip_unifi:
logger.info("Using existing device data")
current_file = get_data_file_path('current.json')
devices = load_json_file(current_file, logger)
if not devices:
logger.error("No existing device data found")
return 1
else:
devices = discovery.get_tasmota_devices()
# Save device list
previous_data = load_json_file(get_data_file_path('current.json'), logger)
discovery.save_tasmota_config(devices, previous_data)
# Handle single device mode
if args.Device:
device = find_device_by_identifier(devices, args.Device, logger)
if not device:
return 1
devices = [device]
# Handle unknown device processing
if args.process_unknown:
unknown_devices = discovery.get_unknown_devices(devices)
unknown_processor.process_unknown_devices(unknown_devices)
return 0
# Process all devices
logger.info(f"\nProcessing {len(devices)} devices...")
device_details_list, stats = process_devices(devices, config_manager, console_manager, logger, report_gen)
# Save device details
report_gen.save_device_details(device_details_list)
# Print summaries
report_gen.print_processing_summary(
stats['processed'],
stats['mqtt_updated'],
stats['console_updated'],
stats['failed']
)
console_manager.print_failure_summary()
# Print errors and warnings summary
report_gen.print_errors_and_warnings_summary()
logger.info("TasmotaManager completed")
return 0
if __name__ == '__main__':
sys.exit(main())