- Changed from simple space replacement to full URL encoding using quote() - Rules contain # and = characters that must be encoded - # was being treated as URL fragment, truncating rule commands - Now encodes # as %23, = as %3D, spaces as %20, etc. - Fixes issue where rules weren't being applied correctly to devices
277 lines
9.9 KiB
Python
277 lines
9.9 KiB
Python
"""Console settings and parameter management."""
|
|
|
|
import logging
|
|
import time
|
|
import threading
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
from utils import send_tasmota_command, retry_command, get_hostname_base
|
|
|
|
|
|
class ConsoleSettingsManager:
|
|
"""Handles console parameter configuration for Tasmota devices."""
|
|
|
|
def __init__(self, config: dict, logger: Optional[logging.Logger] = None):
|
|
"""
|
|
Initialize console settings manager.
|
|
|
|
Args:
|
|
config: Configuration dictionary
|
|
logger: Optional logger instance
|
|
"""
|
|
self.config = config
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
self.command_failures = {} # Track failed commands by device
|
|
self._lock = threading.Lock() # Thread-safe access to command_failures
|
|
|
|
def apply_console_settings(self, device: dict, device_details: dict) -> Tuple[bool, str]:
|
|
"""
|
|
Apply console settings to a device.
|
|
|
|
Args:
|
|
device: Device info dictionary
|
|
device_details: Detailed device information
|
|
|
|
Returns:
|
|
Tuple of (success, status_message)
|
|
"""
|
|
device_name = device.get('name', 'Unknown')
|
|
device_ip = device.get('ip', '')
|
|
|
|
if not device_ip:
|
|
return False, "No IP address"
|
|
|
|
# Get device type from DeviceName for template matching
|
|
device_type = device_details.get('Status', {}).get('DeviceName', '')
|
|
|
|
# Find which console_set to use for this device
|
|
console_set_name = self._get_console_set_name(device_type)
|
|
|
|
if not console_set_name:
|
|
self.logger.debug(f"{device_name}: No console settings configured")
|
|
return True, "No console settings"
|
|
|
|
# Get the console command list
|
|
console_commands = self._get_console_commands(console_set_name)
|
|
|
|
if not console_commands:
|
|
self.logger.debug(f"{device_name}: Console set '{console_set_name}' is empty")
|
|
return True, "Empty console set"
|
|
|
|
self.logger.info(f"{device_name}: Applying {len(console_commands)} console settings from '{console_set_name}'")
|
|
|
|
# Apply each console command
|
|
failed_commands = []
|
|
|
|
for command in console_commands:
|
|
if not command or not command.strip():
|
|
continue # Skip empty commands
|
|
|
|
success = self._apply_single_command(device_ip, device_name, command)
|
|
|
|
if not success:
|
|
failed_commands.append(command)
|
|
|
|
# Track failures for summary
|
|
if failed_commands:
|
|
with self._lock:
|
|
if device_name not in self.command_failures:
|
|
self.command_failures[device_name] = []
|
|
self.command_failures[device_name].extend(failed_commands)
|
|
|
|
if failed_commands:
|
|
return False, f"Failed: {len(failed_commands)} commands"
|
|
|
|
self.logger.info(f"{device_name}: All console settings applied successfully")
|
|
return True, "Applied"
|
|
|
|
def _get_console_set_name(self, device_type: str) -> Optional[str]:
|
|
"""
|
|
Get the console_set name for a device based on DeviceName.
|
|
|
|
Args:
|
|
device_type: Device type/DeviceName from Status
|
|
|
|
Returns:
|
|
str: Console set name or None
|
|
"""
|
|
device_list = self.config.get('device_list', {})
|
|
|
|
# First try exact match
|
|
if device_type in device_list:
|
|
return device_list[device_type].get('console_set')
|
|
|
|
# Then try case-insensitive match
|
|
device_type_lower = device_type.lower()
|
|
for template_name, template_data in device_list.items():
|
|
if template_name.lower() == device_type_lower:
|
|
return template_data.get('console_set')
|
|
|
|
return None
|
|
|
|
def _get_console_commands(self, console_set_name: str) -> List[str]:
|
|
"""
|
|
Get console commands from a named console set.
|
|
|
|
Args:
|
|
console_set_name: Name of the console set
|
|
|
|
Returns:
|
|
list: List of console commands
|
|
"""
|
|
console_set = self.config.get('console_set', {})
|
|
|
|
if isinstance(console_set, dict):
|
|
commands = console_set.get(console_set_name, [])
|
|
if isinstance(commands, list):
|
|
return commands
|
|
|
|
return []
|
|
|
|
def _apply_single_command(self, device_ip: str, device_name: str, command: str) -> bool:
|
|
"""
|
|
Apply a single console command to a device.
|
|
|
|
Args:
|
|
device_ip: Device IP address
|
|
device_name: Device name for logging
|
|
command: Console command to apply
|
|
|
|
Returns:
|
|
bool: True if successful
|
|
"""
|
|
# Parse command into parameter and value
|
|
parts = command.split(None, 1)
|
|
if not parts:
|
|
return True # Empty command, skip
|
|
|
|
param_name = parts[0]
|
|
param_value = parts[1] if len(parts) > 1 else ""
|
|
|
|
self.logger.debug(f"{device_name}: Setting {param_name} = {param_value}")
|
|
|
|
# Handle Retain parameters - set opposite first, then desired state
|
|
if param_name.endswith('Retain'):
|
|
opposite_value = 'Off' if param_value.lower() in ['on', '1', 'true'] else 'On'
|
|
|
|
# Set opposite first
|
|
opposite_command = f"{param_name}%20{opposite_value}"
|
|
result, success = send_tasmota_command(
|
|
device_ip, opposite_command, timeout=5, logger=self.logger
|
|
)
|
|
|
|
if not success:
|
|
self.logger.warning(f"{device_name}: Failed to set {param_name} to opposite state")
|
|
|
|
time.sleep(0.5) # Brief delay between commands
|
|
|
|
# Send the actual command - properly URL encode
|
|
from urllib.parse import quote
|
|
# Split command into param and value, then URL encode the value part
|
|
parts = command.split(None, 1)
|
|
if len(parts) == 2:
|
|
escaped_command = parts[0] + '%20' + quote(parts[1], safe='')
|
|
else:
|
|
escaped_command = command.replace(' ', '%20')
|
|
|
|
result, success = retry_command(
|
|
lambda: send_tasmota_command(device_ip, escaped_command, timeout=5, logger=self.logger),
|
|
max_attempts=3,
|
|
delay=1.0,
|
|
logger=self.logger,
|
|
device_name=device_name
|
|
)
|
|
|
|
if not success:
|
|
self.logger.error(f"{device_name}: Failed to set {param_name} after 3 attempts")
|
|
return False
|
|
|
|
# Verification disabled - it overwhelms devices with extra queries
|
|
# Trust that the command was applied if no error was returned
|
|
|
|
# Check if this is a rule definition - if so, enable it
|
|
if param_name.lower().startswith('rule'):
|
|
rule_number = param_name.lower().replace('rule', '')
|
|
if rule_number.isdigit():
|
|
# Use Rule{N} 4 to enable without setting Once flag
|
|
# Rule 1 = Enable + Once ON (WRONG - causes single-fire issue)
|
|
# Rule 4 = Enable only (CORRECT - allows repeated firing)
|
|
enable_command = f"Rule{rule_number}%204"
|
|
|
|
self.logger.debug(f"{device_name}: Enabling rule{rule_number} (Once=OFF)")
|
|
|
|
result, success = send_tasmota_command(
|
|
device_ip, enable_command, timeout=5, logger=self.logger
|
|
)
|
|
|
|
if not success:
|
|
self.logger.warning(f"{device_name}: Failed to enable rule{rule_number}")
|
|
|
|
time.sleep(0.3) # Brief delay between commands
|
|
return True
|
|
|
|
def _verify_command(self, device_ip: str, device_name: str,
|
|
param_name: str, expected_value: str) -> bool:
|
|
"""
|
|
Verify a command was applied (where possible).
|
|
|
|
Args:
|
|
device_ip: Device IP address
|
|
device_name: Device name for logging
|
|
param_name: Parameter name
|
|
expected_value: Expected value
|
|
|
|
Returns:
|
|
bool: True if verified or verification not possible
|
|
"""
|
|
# Only verify certain parameters
|
|
verifiable = ['PowerOnState', 'SetOption']
|
|
|
|
if not any(param_name.startswith(v) for v in verifiable):
|
|
return True # Can't verify, assume success
|
|
|
|
# Get current value
|
|
result, success = send_tasmota_command(
|
|
device_ip, param_name, timeout=5, logger=self.logger
|
|
)
|
|
|
|
if not success or not result:
|
|
return True # Can't verify, assume success
|
|
|
|
# Check if value matches
|
|
current_value = str(result.get(param_name, ''))
|
|
expected = str(expected_value)
|
|
|
|
# Normalize values for comparison (Tasmota returns ON/OFF for 1/0)
|
|
def normalize_value(val: str) -> str:
|
|
val_lower = val.lower()
|
|
if val_lower in ['on', '1', 'true']:
|
|
return '1'
|
|
elif val_lower in ['off', '0', 'false']:
|
|
return '0'
|
|
return val_lower
|
|
|
|
if normalize_value(current_value) == normalize_value(expected):
|
|
return True
|
|
|
|
return False
|
|
|
|
def print_failure_summary(self):
|
|
"""Print summary of all command failures."""
|
|
with self._lock:
|
|
if not self.command_failures:
|
|
return
|
|
|
|
self.logger.error("=" * 60)
|
|
self.logger.error("COMMAND FAILURE SUMMARY")
|
|
self.logger.error("=" * 60)
|
|
|
|
# Sort by device name for consistent output
|
|
for device_name in sorted(self.command_failures.keys()):
|
|
failed_commands = self.command_failures[device_name]
|
|
self.logger.error(f"\n{device_name}:")
|
|
for cmd in failed_commands:
|
|
self.logger.error(f" - {cmd}")
|
|
|
|
self.logger.error("=" * 60)
|