Major changes: - Implement parallel device processing using ThreadPoolExecutor (10 workers) - Add comprehensive error and warning tracking in ReportGenerator - Fix MQTT configuration verification (query Topic/FullTopic directly) - Improve console settings thread safety with locks - Fix UniFi client for UniFi OS API endpoints - Normalize FullTopic handling (strip URL-encoded spaces) - Update network exclude patterns to support wildcards - Add test_unifi_connection.py for debugging UniFi connectivity Performance improvements: - Process devices concurrently for faster execution - Reduced verbose logging during parallel processing Bug fixes: - Handle deprecated.json format correctly (list vs dict) - Fix exclude_patterns matching with partial string support - Fix UniFi API authentication and endpoint paths for UniFi OS 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
204 lines
7.5 KiB
Python
204 lines
7.5 KiB
Python
"""Report generation for Tasmota devices."""
|
|
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
from datetime import datetime
|
|
|
|
from utils import get_data_file_path, save_json_file, format_device_info
|
|
from discovery import TasmotaDiscovery
|
|
|
|
|
|
class ReportGenerator:
|
|
"""Generates various reports for Tasmota devices."""
|
|
|
|
def __init__(self, config: dict, discovery: TasmotaDiscovery,
|
|
logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize report generator.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
discovery: Discovery handler instance
|
|
logger: Optional logger instance
|
|
"""
|
|
self.config = config
|
|
self.discovery = discovery
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
self.errors_and_warnings = [] # Collect errors and warnings
|
|
|
|
def add_error(self, device_name: str, message: str):
|
|
"""Add an error message to the collection."""
|
|
self.errors_and_warnings.append(('ERROR', device_name, message))
|
|
|
|
def add_warning(self, device_name: str, message: str):
|
|
"""Add a warning message to the collection."""
|
|
self.errors_and_warnings.append(('WARNING', device_name, message))
|
|
|
|
def print_errors_and_warnings_summary(self):
|
|
"""Print summary of all errors and warnings that require user attention."""
|
|
if not self.errors_and_warnings:
|
|
return
|
|
|
|
self.logger.info("")
|
|
self.logger.error("=" * 60)
|
|
self.logger.error("ERRORS AND WARNINGS REQUIRING ATTENTION")
|
|
self.logger.error("=" * 60)
|
|
|
|
# Sort by severity (ERROR first, then WARNING) and then by device name
|
|
sorted_issues = sorted(self.errors_and_warnings,
|
|
key=lambda x: (0 if x[0] == 'ERROR' else 1, x[1]))
|
|
|
|
for severity, device_name, message in sorted_issues:
|
|
if severity == 'ERROR':
|
|
self.logger.error(f" ✗ {device_name}: {message}")
|
|
else:
|
|
self.logger.warning(f" ⚠ {device_name}: {message}")
|
|
|
|
# Print action items
|
|
self.logger.error("")
|
|
self.logger.error("ACTION REQUIRED:")
|
|
|
|
# Group by issue type
|
|
connection_errors = [x for x in sorted_issues if 'connection' in x[2].lower() or 'refused' in x[2].lower()]
|
|
mqtt_errors = [x for x in sorted_issues if 'mqtt' in x[2].lower()]
|
|
other_errors = [x for x in sorted_issues if x not in connection_errors and x not in mqtt_errors]
|
|
|
|
if connection_errors:
|
|
self.logger.error(f" • {len(connection_errors)} device(s) unreachable - check if devices are online")
|
|
if mqtt_errors:
|
|
self.logger.error(f" • {len(mqtt_errors)} device(s) with MQTT issues - review configuration")
|
|
if other_errors:
|
|
self.logger.error(f" • {len(other_errors)} device(s) with other issues - review above details")
|
|
|
|
self.logger.error("=" * 60)
|
|
|
|
def generate_unifi_hostname_report(self) -> Dict:
|
|
"""
|
|
Generate a report comparing UniFi and Tasmota hostnames.
|
|
|
|
Returns:
|
|
dict: Report data
|
|
"""
|
|
self.logger.info("Generating UniFi hostname report")
|
|
|
|
devices = self.discovery.get_tasmota_devices()
|
|
|
|
report = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'total_devices': len(devices),
|
|
'devices': []
|
|
}
|
|
|
|
for device in devices:
|
|
device_ip = device.get('ip', '')
|
|
device_name = device.get('name', 'Unknown')
|
|
unifi_hostname = device.get('hostname', '')
|
|
|
|
# Get self-reported hostname
|
|
tasmota_hostname, success = self.discovery.get_device_hostname(
|
|
device_ip, device_name, timeout=5
|
|
)
|
|
|
|
device_report = {
|
|
'name': device_name,
|
|
'ip': device_ip,
|
|
'mac': device.get('mac', ''),
|
|
'unifi_hostname': unifi_hostname,
|
|
'tasmota_hostname': tasmota_hostname if success else 'N/A',
|
|
'hostnames_match': tasmota_hostname == unifi_hostname if success else False,
|
|
'connection': device.get('connection', 'Unknown'),
|
|
'bug_detected': device.get('unifi_hostname_bug_detected', False)
|
|
}
|
|
|
|
report['devices'].append(device_report)
|
|
|
|
# Save report
|
|
report_file = get_data_file_path('TasmotaHostnameReport.json')
|
|
save_json_file(report_file, report, self.logger)
|
|
|
|
# Print summary
|
|
self._print_hostname_report_summary(report)
|
|
|
|
return report
|
|
|
|
def _print_hostname_report_summary(self, report: Dict):
|
|
"""
|
|
Print a summary of the hostname report.
|
|
|
|
Args:
|
|
report: Report data dictionary
|
|
"""
|
|
print(f"\n{'='*70}")
|
|
print("UniFi vs Tasmota Hostname Report")
|
|
print(f"{'='*70}")
|
|
print(f"Total devices: {report['total_devices']}")
|
|
print(f"Generated: {report['generated_at']}")
|
|
print(f"{'='*70}\n")
|
|
|
|
mismatches = 0
|
|
bug_detected = 0
|
|
|
|
for device in report['devices']:
|
|
if not device['hostnames_match']:
|
|
mismatches += 1
|
|
if device['bug_detected']:
|
|
bug_detected += 1
|
|
|
|
print(f"Hostname mismatches: {mismatches}")
|
|
print(f"UniFi bug detected: {bug_detected}")
|
|
print(f"\n{'='*70}")
|
|
|
|
if mismatches > 0:
|
|
print("\nDevices with hostname mismatches:")
|
|
print(f"{'Device':<25} {'UniFi Hostname':<25} {'Tasmota Hostname':<25}")
|
|
print("-" * 75)
|
|
|
|
for device in report['devices']:
|
|
if not device['hostnames_match']:
|
|
name = device['name'][:24]
|
|
unifi = device['unifi_hostname'][:24]
|
|
tasmota = device['tasmota_hostname'][:24]
|
|
bug = " [BUG]" if device['bug_detected'] else ""
|
|
print(f"{name:<25} {unifi:<25} {tasmota:<25}{bug}")
|
|
|
|
print(f"\n{'='*70}\n")
|
|
|
|
def save_device_details(self, device_details: List[Dict]):
|
|
"""
|
|
Save detailed device information to file.
|
|
|
|
Args:
|
|
device_details: List of detailed device info dictionaries
|
|
"""
|
|
output_file = get_data_file_path('TasmotaDevices.json')
|
|
|
|
# Add metadata
|
|
output = {
|
|
'generated_at': datetime.now().isoformat(),
|
|
'total_devices': len(device_details),
|
|
'devices': device_details
|
|
}
|
|
|
|
save_json_file(output_file, output, self.logger)
|
|
self.logger.info(f"Saved details for {len(device_details)} devices")
|
|
|
|
def print_processing_summary(self, processed: int, mqtt_updated: int,
|
|
console_updated: int, failed: int):
|
|
"""
|
|
Print summary of processing results.
|
|
|
|
Args:
|
|
processed: Number of devices processed
|
|
mqtt_updated: Number with MQTT updates
|
|
console_updated: Number with console updates
|
|
failed: Number that failed
|
|
"""
|
|
print(f"\n{'='*60}")
|
|
print("Processing Summary")
|
|
print(f"{'='*60}")
|
|
print(f"Total devices processed: {processed}")
|
|
print(f"MQTT settings updated: {mqtt_updated}")
|
|
print(f"Console settings applied: {console_updated}")
|
|
print(f"Failed: {failed}")
|
|
print(f"{'='*60}\n")
|