TasmotaManager/utils.py
Mike Geppert 9c22168f79 Refactor: Split TasmotaManager into modular structure
- Created modular Python files (main, utils, discovery, configuration, console_settings, unknown_devices, reporting, unifi_client)
- Moved documentation files to docs/
- Moved data files to data/
- Removed old monolithic TasmotaManager.py and TasmotaManager_fixed.py
- Updated .gitignore and pyproject.toml
- All functionality preserved, command-line interface unchanged
Version: 2.0.0
2025-10-29 16:38:03 +00:00

233 lines
6.1 KiB
Python

"""Common utility functions used across the TasmotaManager modules."""
import re
import logging
import time
import os
import json
from typing import Tuple, Optional, Any, Dict
import requests
def match_pattern(text: str, pattern: str, match_entire_string: bool = True) -> bool:
"""
Match a text string against a pattern that may contain wildcards.
Args:
text: The text to match against
pattern: The pattern which may contain * wildcards
match_entire_string: If True, pattern must match the entire string
Returns:
bool: True if the pattern matches, False otherwise
"""
if not text:
return False
# Convert glob pattern to regex
escaped = re.escape(pattern)
regex_pattern = escaped.replace(r'\*', '.*')
if match_entire_string:
regex_pattern = f'^{regex_pattern}$'
return bool(re.match(regex_pattern, text, re.IGNORECASE))
def get_hostname_base(hostname: str) -> str:
"""
Extract the base hostname (everything before the first dash).
Args:
hostname: Full hostname (e.g., "KitchenLamp-1234")
Returns:
str: Base hostname (e.g., "KitchenLamp")
"""
if '-' in hostname:
return hostname.split('-')[0]
return hostname
def send_tasmota_command(ip: str, command: str, timeout: int = 5,
logger: Optional[logging.Logger] = None) -> Tuple[Optional[dict], bool]:
"""
Send a command to a Tasmota device via HTTP API.
Args:
ip: Device IP address
command: Tasmota command to send
timeout: Request timeout in seconds
logger: Optional logger for debug output
Returns:
Tuple of (response_dict, success_bool)
"""
url = f"http://{ip}/cm?cmnd={command}"
try:
if logger:
logger.debug(f"Sending command to {ip}: {command}")
response = requests.get(url, timeout=timeout)
response.raise_for_status()
result = response.json()
if logger:
logger.debug(f"Response from {ip}: {result}")
return result, True
except requests.exceptions.Timeout:
if logger:
logger.warning(f"Timeout sending command to {ip}: {command}")
return None, False
except requests.exceptions.RequestException as e:
if logger:
logger.warning(f"Error sending command to {ip}: {e}")
return None, False
except Exception as e:
if logger:
logger.error(f"Unexpected error sending command to {ip}: {e}")
return None, False
def retry_command(func, max_attempts: int = 3, delay: float = 1.0,
logger: Optional[logging.Logger] = None, device_name: str = "") -> Tuple[Any, bool]:
"""
Retry a command function multiple times with delay between attempts.
Args:
func: Function to call (should return tuple of (result, success))
max_attempts: Maximum number of attempts
delay: Delay in seconds between attempts
logger: Optional logger for output
device_name: Device name for logging
Returns:
Tuple of (result, success)
"""
for attempt in range(1, max_attempts + 1):
result, success = func()
if success:
return result, True
if attempt < max_attempts:
if logger:
logger.debug(f"{device_name}: Retry attempt {attempt}/{max_attempts}")
time.sleep(delay)
return None, False
def format_device_info(device: dict) -> str:
"""
Format device information for display.
Args:
device: Device dictionary
Returns:
str: Formatted device info string
"""
name = device.get('name', 'Unknown')
ip = device.get('ip', 'Unknown')
mac = device.get('mac', 'Unknown')
connection = device.get('connection', 'Unknown')
return f"{name} ({ip}) - MAC: {mac}, Connection: {connection}"
def load_json_file(filepath: str, logger: Optional[logging.Logger] = None) -> Optional[dict]:
"""
Load and parse a JSON file.
Args:
filepath: Path to JSON file
logger: Optional logger for error output
Returns:
dict or None if file doesn't exist or can't be parsed
"""
if not os.path.exists(filepath):
if logger:
logger.debug(f"File not found: {filepath}")
return None
try:
with open(filepath, 'r') as f:
return json.load(f)
except json.JSONDecodeError as e:
if logger:
logger.error(f"Error parsing JSON file {filepath}: {e}")
return None
except Exception as e:
if logger:
logger.error(f"Error reading file {filepath}: {e}")
return None
def save_json_file(filepath: str, data: dict, logger: Optional[logging.Logger] = None) -> bool:
"""
Save data to a JSON file.
Args:
filepath: Path to save JSON file
data: Data to save
logger: Optional logger for error output
Returns:
bool: True if successful, False otherwise
"""
try:
# Ensure directory exists
os.makedirs(os.path.dirname(filepath) if os.path.dirname(filepath) else '.', exist_ok=True)
with open(filepath, 'w') as f:
json.dump(data, f, indent=4)
return True
except Exception as e:
if logger:
logger.error(f"Error saving JSON file {filepath}: {e}")
return False
def is_valid_ip(ip_string: str) -> bool:
"""
Validate if a string is a valid IP address.
Args:
ip_string: String to validate
Returns:
bool: True if valid IP address
"""
import socket
try:
socket.inet_aton(ip_string)
return True
except socket.error:
return False
def ensure_data_directory():
"""Ensure the data directory exists."""
os.makedirs('data', exist_ok=True)
os.makedirs('data/temp', exist_ok=True)
def get_data_file_path(filename: str) -> str:
"""
Get the full path for a data file.
Args:
filename: Name of the file
Returns:
str: Full path in the data directory
"""
return os.path.join('data', filename)