- Created modular Python files (main, utils, discovery, configuration, console_settings, unknown_devices, reporting, unifi_client) - Moved documentation files to docs/ - Moved data files to data/ - Removed old monolithic TasmotaManager.py and TasmotaManager_fixed.py - Updated .gitignore and pyproject.toml - All functionality preserved, command-line interface unchanged Version: 2.0.0
233 lines
6.1 KiB
Python
233 lines
6.1 KiB
Python
"""Common utility functions used across the TasmotaManager modules."""
|
|
|
|
import re
|
|
import logging
|
|
import time
|
|
import os
|
|
import json
|
|
from typing import Tuple, Optional, Any, Dict
|
|
import requests
|
|
|
|
|
|
def match_pattern(text: str, pattern: str, match_entire_string: bool = True) -> bool:
|
|
"""
|
|
Match a text string against a pattern that may contain wildcards.
|
|
|
|
Args:
|
|
text: The text to match against
|
|
pattern: The pattern which may contain * wildcards
|
|
match_entire_string: If True, pattern must match the entire string
|
|
|
|
Returns:
|
|
bool: True if the pattern matches, False otherwise
|
|
"""
|
|
if not text:
|
|
return False
|
|
|
|
# Convert glob pattern to regex
|
|
escaped = re.escape(pattern)
|
|
regex_pattern = escaped.replace(r'\*', '.*')
|
|
|
|
if match_entire_string:
|
|
regex_pattern = f'^{regex_pattern}$'
|
|
|
|
return bool(re.match(regex_pattern, text, re.IGNORECASE))
|
|
|
|
|
|
def get_hostname_base(hostname: str) -> str:
|
|
"""
|
|
Extract the base hostname (everything before the first dash).
|
|
|
|
Args:
|
|
hostname: Full hostname (e.g., "KitchenLamp-1234")
|
|
|
|
Returns:
|
|
str: Base hostname (e.g., "KitchenLamp")
|
|
"""
|
|
if '-' in hostname:
|
|
return hostname.split('-')[0]
|
|
return hostname
|
|
|
|
|
|
def send_tasmota_command(ip: str, command: str, timeout: int = 5,
|
|
logger: Optional[logging.Logger] = None) -> Tuple[Optional[dict], bool]:
|
|
"""
|
|
Send a command to a Tasmota device via HTTP API.
|
|
|
|
Args:
|
|
ip: Device IP address
|
|
command: Tasmota command to send
|
|
timeout: Request timeout in seconds
|
|
logger: Optional logger for debug output
|
|
|
|
Returns:
|
|
Tuple of (response_dict, success_bool)
|
|
"""
|
|
url = f"http://{ip}/cm?cmnd={command}"
|
|
|
|
try:
|
|
if logger:
|
|
logger.debug(f"Sending command to {ip}: {command}")
|
|
|
|
response = requests.get(url, timeout=timeout)
|
|
response.raise_for_status()
|
|
|
|
result = response.json()
|
|
if logger:
|
|
logger.debug(f"Response from {ip}: {result}")
|
|
|
|
return result, True
|
|
|
|
except requests.exceptions.Timeout:
|
|
if logger:
|
|
logger.warning(f"Timeout sending command to {ip}: {command}")
|
|
return None, False
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
if logger:
|
|
logger.warning(f"Error sending command to {ip}: {e}")
|
|
return None, False
|
|
|
|
except Exception as e:
|
|
if logger:
|
|
logger.error(f"Unexpected error sending command to {ip}: {e}")
|
|
return None, False
|
|
|
|
|
|
def retry_command(func, max_attempts: int = 3, delay: float = 1.0,
|
|
logger: Optional[logging.Logger] = None, device_name: str = "") -> Tuple[Any, bool]:
|
|
"""
|
|
Retry a command function multiple times with delay between attempts.
|
|
|
|
Args:
|
|
func: Function to call (should return tuple of (result, success))
|
|
max_attempts: Maximum number of attempts
|
|
delay: Delay in seconds between attempts
|
|
logger: Optional logger for output
|
|
device_name: Device name for logging
|
|
|
|
Returns:
|
|
Tuple of (result, success)
|
|
"""
|
|
for attempt in range(1, max_attempts + 1):
|
|
result, success = func()
|
|
|
|
if success:
|
|
return result, True
|
|
|
|
if attempt < max_attempts:
|
|
if logger:
|
|
logger.debug(f"{device_name}: Retry attempt {attempt}/{max_attempts}")
|
|
time.sleep(delay)
|
|
|
|
return None, False
|
|
|
|
|
|
def format_device_info(device: dict) -> str:
|
|
"""
|
|
Format device information for display.
|
|
|
|
Args:
|
|
device: Device dictionary
|
|
|
|
Returns:
|
|
str: Formatted device info string
|
|
"""
|
|
name = device.get('name', 'Unknown')
|
|
ip = device.get('ip', 'Unknown')
|
|
mac = device.get('mac', 'Unknown')
|
|
connection = device.get('connection', 'Unknown')
|
|
|
|
return f"{name} ({ip}) - MAC: {mac}, Connection: {connection}"
|
|
|
|
|
|
def load_json_file(filepath: str, logger: Optional[logging.Logger] = None) -> Optional[dict]:
|
|
"""
|
|
Load and parse a JSON file.
|
|
|
|
Args:
|
|
filepath: Path to JSON file
|
|
logger: Optional logger for error output
|
|
|
|
Returns:
|
|
dict or None if file doesn't exist or can't be parsed
|
|
"""
|
|
if not os.path.exists(filepath):
|
|
if logger:
|
|
logger.debug(f"File not found: {filepath}")
|
|
return None
|
|
|
|
try:
|
|
with open(filepath, 'r') as f:
|
|
return json.load(f)
|
|
except json.JSONDecodeError as e:
|
|
if logger:
|
|
logger.error(f"Error parsing JSON file {filepath}: {e}")
|
|
return None
|
|
except Exception as e:
|
|
if logger:
|
|
logger.error(f"Error reading file {filepath}: {e}")
|
|
return None
|
|
|
|
|
|
def save_json_file(filepath: str, data: dict, logger: Optional[logging.Logger] = None) -> bool:
|
|
"""
|
|
Save data to a JSON file.
|
|
|
|
Args:
|
|
filepath: Path to save JSON file
|
|
data: Data to save
|
|
logger: Optional logger for error output
|
|
|
|
Returns:
|
|
bool: True if successful, False otherwise
|
|
"""
|
|
try:
|
|
# Ensure directory exists
|
|
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True)
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(data, f, indent=4)
|
|
return True
|
|
except Exception as e:
|
|
if logger:
|
|
logger.error(f"Error saving JSON file {filepath}: {e}")
|
|
return False
|
|
|
|
|
|
def is_valid_ip(ip_string: str) -> bool:
|
|
"""
|
|
Validate if a string is a valid IP address.
|
|
|
|
Args:
|
|
ip_string: String to validate
|
|
|
|
Returns:
|
|
bool: True if valid IP address
|
|
"""
|
|
import socket
|
|
|
|
try:
|
|
socket.inet_aton(ip_string)
|
|
return True
|
|
except socket.error:
|
|
return False
|
|
|
|
|
|
def ensure_data_directory():
|
|
"""Ensure the data directory exists."""
|
|
os.makedirs('data', exist_ok=True)
|
|
os.makedirs('data/temp', exist_ok=True)
|
|
|
|
|
|
def get_data_file_path(filename: str) -> str:
|
|
"""
|
|
Get the full path for a data file.
|
|
|
|
Args:
|
|
filename: Name of the file
|
|
|
|
Returns:
|
|
str: Full path in the data directory
|
|
"""
|
|
return os.path.join('data', filename) |