Implement template activation, fix console settings duplication, and improve connection display
This commit is contained in:
parent
cced5a76cc
commit
d585f0f284
1055
TasmotaManager.py
1055
TasmotaManager.py
File diff suppressed because it is too large
Load Diff
820
TasmotaManager_fixed.py
Normal file
820
TasmotaManager_fixed.py
Normal file
@ -0,0 +1,820 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
import requests
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
import re # Import the regular expression module
|
||||
import time
|
||||
import argparse
|
||||
|
||||
# Disable SSL warnings
|
||||
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
|
||||
|
||||
class UnifiClient:
|
||||
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.site_id = site_id
|
||||
self.session = requests.Session()
|
||||
self.session.verify = verify_ssl
|
||||
|
||||
# Initialize cookie jar
|
||||
self.session.cookies.clear()
|
||||
|
||||
def _login(self) -> requests.Response: # Changed return type annotation
|
||||
"""Authenticate with the UniFi Controller."""
|
||||
login_url = f"{self.base_url}/api/auth/login"
|
||||
headers = {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json'
|
||||
}
|
||||
payload = {
|
||||
"username": self.username,
|
||||
"password": self.password,
|
||||
"remember": False
|
||||
}
|
||||
try:
|
||||
response = self.session.post(
|
||||
login_url,
|
||||
json=payload,
|
||||
headers=headers
|
||||
)
|
||||
response.raise_for_status()
|
||||
if 'X-CSRF-Token' in response.headers:
|
||||
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
|
||||
return response # Return the response object
|
||||
except requests.exceptions.RequestException as e:
|
||||
if hasattr(e, 'response') and e.response.status_code == 401:
|
||||
raise Exception("Authentication failed. Please verify your username and password.") from e
|
||||
raise
|
||||
|
||||
def get_clients(self) -> list:
|
||||
"""Get all clients from the UniFi Controller."""
|
||||
# Try the newer API endpoint first
|
||||
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
|
||||
try:
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
return response.json().get('data', [])
|
||||
except requests.exceptions.RequestException as e:
|
||||
# If the newer endpoint fails, try the legacy endpoint
|
||||
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
|
||||
try:
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
return response.json().get('data', [])
|
||||
except requests.exceptions.RequestException as e:
|
||||
# If both fail, try the v2 API endpoint
|
||||
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
|
||||
response = self.session.get(url)
|
||||
response.raise_for_status()
|
||||
return response.json().get('data', [])
|
||||
|
||||
class TasmotaDiscovery:
|
||||
def __init__(self, debug: bool = False):
|
||||
"""Initialize the TasmotaDiscovery with optional debug mode."""
|
||||
log_level = logging.DEBUG if debug else logging.INFO
|
||||
logging.basicConfig(
|
||||
level=log_level,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.config = None
|
||||
self.unifi_client = None
|
||||
|
||||
def load_config(self, config_path: Optional[str] = None) -> dict:
|
||||
"""Load configuration from JSON file."""
|
||||
if config_path is None:
|
||||
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
|
||||
|
||||
self.logger.debug(f"Loading configuration from: {config_path}")
|
||||
try:
|
||||
with open(config_path, 'r') as config_file:
|
||||
self.config = json.load(config_file)
|
||||
self.logger.debug("Configuration loaded successfully from %s", config_path)
|
||||
return self.config
|
||||
except FileNotFoundError:
|
||||
self.logger.error(f"Configuration file not found at {config_path}")
|
||||
sys.exit(1)
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error("Invalid JSON in configuration file")
|
||||
sys.exit(1)
|
||||
|
||||
def setup_unifi_client(self):
|
||||
"""Set up the UniFi client with better error handling"""
|
||||
self.logger.debug("Setting up UniFi client")
|
||||
|
||||
if not self.config or 'unifi' not in self.config:
|
||||
raise ValueError("Missing UniFi configuration")
|
||||
|
||||
unifi_config = self.config['unifi']
|
||||
required_fields = ['host', 'username', 'password', 'site']
|
||||
missing_fields = [field for field in required_fields if field not in unifi_config]
|
||||
|
||||
if missing_fields:
|
||||
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
|
||||
|
||||
try:
|
||||
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
|
||||
self.unifi_client = UnifiClient(
|
||||
base_url=unifi_config['host'],
|
||||
username=unifi_config['username'],
|
||||
password=unifi_config['password'],
|
||||
site_id=unifi_config['site'],
|
||||
verify_ssl=False # Add this if using self-signed certificates
|
||||
)
|
||||
|
||||
# Test the connection by making a simple request
|
||||
response = self.unifi_client._login()
|
||||
if not response:
|
||||
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
|
||||
|
||||
self.logger.debug("UniFi client setup successful")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error setting up UniFi client: {str(e)}")
|
||||
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
|
||||
|
||||
def is_tasmota_device(self, device: dict) -> bool:
|
||||
"""Determine if a device is in the network_filter and not in exclude_patterns."""
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
ip = device.get('ip', '')
|
||||
|
||||
# Check if device is in the configured network
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
for network in network_filters.values():
|
||||
if ip.startswith(network['subnet']):
|
||||
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
|
||||
|
||||
# Check exclusion patterns
|
||||
exclude_patterns = network.get('exclude_patterns', [])
|
||||
for pattern in exclude_patterns:
|
||||
pattern = pattern.lower()
|
||||
# Convert glob pattern to regex pattern
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
|
||||
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
|
||||
return False
|
||||
|
||||
# Device is in the network and not excluded
|
||||
self.logger.debug(f"Found device in network: {name}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def get_tasmota_devices(self) -> list:
|
||||
"""Query UniFi controller and filter Tasmota devices."""
|
||||
devices = []
|
||||
self.logger.debug("Querying UniFi controller for devices")
|
||||
try:
|
||||
all_clients = self.unifi_client.get_clients()
|
||||
self.logger.debug(f"Found {len(all_clients)} total devices")
|
||||
|
||||
for device in all_clients:
|
||||
if self.is_tasmota_device(device):
|
||||
device_info = {
|
||||
"name": device.get('name', device.get('hostname', 'Unknown')),
|
||||
"ip": device.get('ip', ''),
|
||||
"mac": device.get('mac', ''),
|
||||
"last_seen": device.get('last_seen', ''),
|
||||
"hostname": device.get('hostname', ''),
|
||||
"notes": device.get('note', ''),
|
||||
}
|
||||
devices.append(device_info)
|
||||
|
||||
self.logger.debug(f"Found {len(devices)} Tasmota devices")
|
||||
return devices
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error getting devices from UniFi controller: {e}")
|
||||
return []
|
||||
|
||||
def save_tasmota_config(self, devices: list) -> None:
|
||||
"""Save Tasmota device information to a JSON file with device tracking."""
|
||||
filename = "current.json"
|
||||
self.logger.debug(f"Saving Tasmota configuration to {filename}")
|
||||
deprecated_filename = "deprecated.json"
|
||||
|
||||
current_devices = []
|
||||
deprecated_devices = []
|
||||
|
||||
# Load existing devices if file exists
|
||||
if os.path.exists(filename):
|
||||
try:
|
||||
with open(filename, 'r') as f:
|
||||
existing_config = json.load(f)
|
||||
current_devices = existing_config.get('tasmota', {}).get('devices', [])
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f"Error reading {filename}, treating as empty")
|
||||
current_devices = []
|
||||
|
||||
# Load deprecated devices if file exists
|
||||
if os.path.exists(deprecated_filename):
|
||||
try:
|
||||
with open(deprecated_filename, 'r') as f:
|
||||
deprecated_config = json.load(f)
|
||||
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
|
||||
deprecated_devices = []
|
||||
|
||||
# Create new config
|
||||
new_devices = []
|
||||
moved_to_deprecated = []
|
||||
restored_from_deprecated = []
|
||||
removed_from_deprecated = []
|
||||
excluded_devices = []
|
||||
|
||||
# Check for excluded devices in current and deprecated lists
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
exclude_patterns = []
|
||||
for network in network_filters.values():
|
||||
exclude_patterns.extend(network.get('exclude_patterns', []))
|
||||
|
||||
# Function to check if device is excluded
|
||||
def is_device_excluded(device_name: str, hostname: str = '') -> bool:
|
||||
name = device_name.lower()
|
||||
hostname = hostname.lower()
|
||||
for pattern in exclude_patterns:
|
||||
pattern = pattern.lower()
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Process current devices
|
||||
for device in devices:
|
||||
device_name = device['name']
|
||||
device_hostname = device.get('hostname', '')
|
||||
device_ip = device['ip']
|
||||
device_mac = device['mac']
|
||||
|
||||
# Check if device should be excluded
|
||||
if is_device_excluded(device_name, device_hostname):
|
||||
print(f"Device {device_name} excluded by pattern - skipping")
|
||||
excluded_devices.append(device_name)
|
||||
continue
|
||||
|
||||
# Check in current devices
|
||||
existing_device = next((d for d in current_devices
|
||||
if d['name'] == device_name), None)
|
||||
|
||||
if existing_device:
|
||||
# Device exists, check if IP or MAC changed
|
||||
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
|
||||
moved_to_deprecated.append(existing_device)
|
||||
new_devices.append(device)
|
||||
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
|
||||
else:
|
||||
new_devices.append(existing_device) # Keep existing device
|
||||
else:
|
||||
# New device, check if it was in deprecated
|
||||
deprecated_device = next((d for d in deprecated_devices
|
||||
if d['name'] == device_name), None)
|
||||
if deprecated_device:
|
||||
removed_from_deprecated.append(device_name)
|
||||
print(f"Device {device_name} removed from deprecated (restored)")
|
||||
new_devices.append(device)
|
||||
print(f"Device {device_name} added to output file")
|
||||
|
||||
# Find devices that are no longer present
|
||||
current_names = {d['name'] for d in devices}
|
||||
for existing_device in current_devices:
|
||||
if existing_device['name'] not in current_names:
|
||||
if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')):
|
||||
moved_to_deprecated.append(existing_device)
|
||||
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
|
||||
|
||||
# Update deprecated devices list, excluding any excluded devices
|
||||
final_deprecated = []
|
||||
for device in deprecated_devices:
|
||||
if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')):
|
||||
final_deprecated.append(device)
|
||||
elif is_device_excluded(device['name'], device.get('hostname', '')):
|
||||
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
|
||||
|
||||
final_deprecated.extend(moved_to_deprecated)
|
||||
|
||||
# Save new configuration
|
||||
config = {
|
||||
"tasmota": {
|
||||
"devices": new_devices,
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"total_devices": len(new_devices)
|
||||
}
|
||||
}
|
||||
|
||||
# Save deprecated configuration
|
||||
deprecated_config = {
|
||||
"tasmota": {
|
||||
"devices": final_deprecated,
|
||||
"generated_at": datetime.now().isoformat(),
|
||||
"total_devices": len(final_deprecated)
|
||||
}
|
||||
}
|
||||
|
||||
# Backup existing file if it exists
|
||||
if os.path.exists(filename):
|
||||
try:
|
||||
backup_name = f"{filename}.backup"
|
||||
os.rename(filename, backup_name)
|
||||
self.logger.info(f"Created backup of existing configuration as {backup_name}")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error creating backup: {e}")
|
||||
|
||||
# Save files
|
||||
try:
|
||||
with open(filename, 'w') as f:
|
||||
json.dump(config, f, indent=4)
|
||||
with open(deprecated_filename, 'w') as f:
|
||||
json.dump(deprecated_config, f, indent=4)
|
||||
|
||||
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
|
||||
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
|
||||
|
||||
print("\nDevice Status Summary:")
|
||||
if excluded_devices:
|
||||
print("\nExcluded Devices:")
|
||||
for name in excluded_devices:
|
||||
print(f"- {name}")
|
||||
|
||||
if moved_to_deprecated:
|
||||
print("\nMoved to deprecated:")
|
||||
for device in moved_to_deprecated:
|
||||
print(f"- {device['name']}")
|
||||
|
||||
if removed_from_deprecated:
|
||||
print("\nRestored from deprecated:")
|
||||
for name in removed_from_deprecated:
|
||||
print(f"- {name}")
|
||||
|
||||
print("\nCurrent Tasmota Devices:")
|
||||
for device in new_devices:
|
||||
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error saving configuration: {e}")
|
||||
|
||||
def get_unknown_devices(self, use_current_json=True):
|
||||
"""Identify devices that match unknown_device_patterns from current.json."""
|
||||
self.logger.info("Identifying unknown devices for processing...")
|
||||
unknown_devices = []
|
||||
|
||||
try:
|
||||
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
||||
with open(source_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
all_devices = data.get('tasmota', {}).get('devices', [])
|
||||
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
||||
except FileNotFoundError:
|
||||
self.logger.error(f"{source_file} not found. Run discovery first.")
|
||||
return []
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f"Invalid JSON format in {source_file}")
|
||||
return []
|
||||
|
||||
# Identify devices matching unknown_device_patterns
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
unknown_patterns = []
|
||||
for network in network_filters.values():
|
||||
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
||||
|
||||
for device in all_devices:
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
|
||||
for pattern in unknown_patterns:
|
||||
pattern = pattern.lower()
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
|
||||
self.logger.debug(f"Found unknown device: {name} ({hostname})")
|
||||
unknown_devices.append(device)
|
||||
break
|
||||
|
||||
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
|
||||
return unknown_devices
|
||||
|
||||
def process_unknown_devices(self):
|
||||
"""Process unknown devices by checking for toggle button and configuring them.
|
||||
|
||||
This method:
|
||||
1. Identifies devices matching unknown_device_patterns
|
||||
2. Checks if each device has a toggle button (indicating it's a light/switch)
|
||||
3. Toggles the button at 1/2Hz while checking for hostname changes
|
||||
4. Prompts the user to enter a new name for the device in the console
|
||||
5. Once a name is entered, configures the device with the new hostname
|
||||
"""
|
||||
unknown_devices = self.get_unknown_devices()
|
||||
if not unknown_devices:
|
||||
self.logger.info("No unknown devices found to process")
|
||||
return
|
||||
|
||||
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
|
||||
|
||||
for device in unknown_devices:
|
||||
name = device.get('name', 'Unknown')
|
||||
ip = device.get('ip')
|
||||
|
||||
if not ip:
|
||||
self.logger.warning(f"Skipping device {name} - no IP address")
|
||||
continue
|
||||
|
||||
self.logger.info(f"Processing unknown device: {name} at {ip}")
|
||||
|
||||
# Check if device has a toggle button
|
||||
try:
|
||||
# Get the main page to check for toggle button
|
||||
url = f"http://{ip}/"
|
||||
response = requests.get(url, timeout=5)
|
||||
|
||||
# Check if there's a toggle button in the response
|
||||
has_toggle = "toggle" in response.text.lower()
|
||||
|
||||
if has_toggle:
|
||||
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
|
||||
|
||||
# Start toggling at 1/2Hz
|
||||
original_hostname = device.get('hostname', '')
|
||||
toggle_state = False
|
||||
|
||||
# Temporarily disable all logging during toggling
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
try:
|
||||
# Clear console output and show prompt
|
||||
print("\n" + "="*50)
|
||||
print(f"DEVICE: {name} at IP: {ip}")
|
||||
print(f"Current hostname: {original_hostname}")
|
||||
print("="*50)
|
||||
print("The device is now toggling to help you identify it.")
|
||||
|
||||
# Start toggling in background while waiting for input
|
||||
import threading
|
||||
stop_toggle = threading.Event()
|
||||
|
||||
def toggle_device():
|
||||
toggle_state = False
|
||||
while not stop_toggle.is_set():
|
||||
toggle_state = not toggle_state
|
||||
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
||||
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
|
||||
try:
|
||||
requests.get(toggle_url, timeout=2)
|
||||
except:
|
||||
pass
|
||||
time.sleep(2.0) # 1/2Hz rate
|
||||
|
||||
# Start toggle thread
|
||||
toggle_thread = threading.Thread(target=toggle_device)
|
||||
toggle_thread.daemon = True
|
||||
toggle_thread.start()
|
||||
|
||||
# Prompt for new hostname
|
||||
print("\nPlease enter a new name for this device:")
|
||||
new_hostname = input("> ").strip()
|
||||
|
||||
# Stop toggling
|
||||
stop_toggle.set()
|
||||
toggle_thread.join(timeout=3)
|
||||
|
||||
if new_hostname and new_hostname != original_hostname:
|
||||
print(f"Setting new hostname to: {new_hostname}")
|
||||
else:
|
||||
print("No valid hostname entered, skipping device")
|
||||
new_hostname = ""
|
||||
|
||||
finally:
|
||||
# Re-enable logging
|
||||
logging.disable(logging.NOTSET)
|
||||
|
||||
# If a new hostname was entered, configure the device
|
||||
if new_hostname:
|
||||
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
|
||||
self.configure_unknown_device(ip, new_hostname)
|
||||
else:
|
||||
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
|
||||
else:
|
||||
self.logger.info(f"Device {name} does not have a toggle button, skipping")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
||||
|
||||
def configure_unknown_device(self, ip, hostname):
|
||||
"""Configure an unknown device with the given hostname and MQTT settings."""
|
||||
try:
|
||||
# Set Friendly Name
|
||||
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}"
|
||||
response = requests.get(friendly_name_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Set Friendly Name to {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to set Friendly Name to {hostname}")
|
||||
|
||||
# Enable MQTT if not already enabled
|
||||
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
|
||||
response = requests.get(mqtt_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Enabled MQTT for {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to enable MQTT for {hostname}")
|
||||
|
||||
# Configure MQTT settings
|
||||
mqtt_config = self.config.get('mqtt', {})
|
||||
if mqtt_config:
|
||||
# Get the base hostname (everything before the dash)
|
||||
hostname_base = hostname.split('-')[0] if '-' in hostname else hostname
|
||||
|
||||
mqtt_fields = {
|
||||
"MqttHost": mqtt_config.get('Host', ''),
|
||||
"MqttPort": mqtt_config.get('Port', 1883),
|
||||
"MqttUser": mqtt_config.get('User', ''),
|
||||
"MqttPassword": mqtt_config.get('Password', ''),
|
||||
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
||||
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
||||
}
|
||||
|
||||
for setting, value in mqtt_fields.items():
|
||||
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
if setting != 'MqttPassword':
|
||||
self.logger.info(f"{hostname}: Set {setting} to {value}")
|
||||
else:
|
||||
self.logger.info(f"{hostname}: Set MQTT Password")
|
||||
else:
|
||||
self.logger.error(f"{hostname}: Failed to set {setting}")
|
||||
|
||||
# Save configuration (will reboot the device)
|
||||
save_url = f"http://{ip}/cm?cmnd=Restart%201"
|
||||
response = requests.get(save_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Saved configuration and rebooted {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to save configuration for {hostname}")
|
||||
|
||||
return True
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_device_details(self, use_current_json=True):
|
||||
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
|
||||
Filters out devices matching unknown_device_patterns."""
|
||||
self.logger.info("Starting to gather detailed device information...")
|
||||
device_details = []
|
||||
|
||||
try:
|
||||
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
||||
with open(source_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
all_devices = data.get('tasmota', {}).get('devices', [])
|
||||
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
||||
except FileNotFoundError:
|
||||
self.logger.error(f"{source_file} not found. Run discovery first.")
|
||||
return
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f"Invalid JSON format in {source_file}")
|
||||
return
|
||||
|
||||
# Filter out devices matching unknown_device_patterns
|
||||
devices = []
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
unknown_patterns = []
|
||||
for network in network_filters.values():
|
||||
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
||||
|
||||
for device in all_devices:
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
|
||||
is_unknown = False
|
||||
for pattern in unknown_patterns:
|
||||
pattern = pattern.lower()
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
|
||||
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
|
||||
is_unknown = True
|
||||
break
|
||||
|
||||
if not is_unknown:
|
||||
devices.append(device)
|
||||
|
||||
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
|
||||
|
||||
mqtt_config = self.config.get('mqtt', {})
|
||||
if not mqtt_config:
|
||||
self.logger.error("MQTT configuration missing from config file")
|
||||
return
|
||||
|
||||
def check_mqtt_settings(ip, name, mqtt_status):
|
||||
"""Check and update MQTT settings if they don't match config"""
|
||||
# Get the base hostname (everything before the dash)
|
||||
hostname_base = name.split('-')[0] if '-' in name else name
|
||||
|
||||
mqtt_fields = {
|
||||
"Host": mqtt_config.get('Host', ''),
|
||||
"Port": mqtt_config.get('Port', 1883),
|
||||
"User": mqtt_config.get('User', ''),
|
||||
"Password": mqtt_config.get('Password', ''),
|
||||
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
||||
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
||||
}
|
||||
|
||||
device_mqtt = mqtt_status.get('MqttHost', {})
|
||||
changes_needed = []
|
||||
force_password_update = False
|
||||
|
||||
# Check each MQTT setting
|
||||
if device_mqtt.get('Host') != mqtt_fields['Host']:
|
||||
changes_needed.append(('MqttHost', mqtt_fields['Host']))
|
||||
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
|
||||
force_password_update = True
|
||||
|
||||
if device_mqtt.get('Port') != mqtt_fields['Port']:
|
||||
changes_needed.append(('MqttPort', mqtt_fields['Port']))
|
||||
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
|
||||
force_password_update = True
|
||||
|
||||
if device_mqtt.get('User') != mqtt_fields['User']:
|
||||
changes_needed.append(('MqttUser', mqtt_fields['User']))
|
||||
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
|
||||
force_password_update = True
|
||||
|
||||
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
|
||||
changes_needed.append(('Topic', mqtt_fields['Topic']))
|
||||
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
|
||||
force_password_update = True
|
||||
|
||||
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
|
||||
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
|
||||
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
|
||||
force_password_update = True
|
||||
|
||||
# Add password update if any MQTT setting changed or user was updated
|
||||
if force_password_update:
|
||||
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
|
||||
self.logger.debug(f"{name}: MQTT Password will be updated")
|
||||
|
||||
# Check NoRetain setting - FIXED: Use the actual value from config with default of False
|
||||
no_retain = mqtt_config.get('NoRetain', False)
|
||||
if no_retain:
|
||||
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
|
||||
else:
|
||||
changes_needed.append(('SetOption62', '0')) # 0 = Use Retain
|
||||
|
||||
# Apply changes if needed
|
||||
for setting, value in changes_needed:
|
||||
try:
|
||||
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
if setting != 'MqttPassword':
|
||||
self.logger.debug(f"{name}: Updated {setting} to {value}")
|
||||
else:
|
||||
self.logger.debug(f"{name}: Updated MQTT Password")
|
||||
else:
|
||||
self.logger.error(f"{name}: Failed to update {setting}")
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
|
||||
|
||||
return len(changes_needed) > 0
|
||||
|
||||
for device in devices:
|
||||
if not isinstance(device, dict):
|
||||
self.logger.warning(f"Skipping invalid device entry: {device}")
|
||||
continue
|
||||
|
||||
name = device.get('name', 'Unknown')
|
||||
ip = device.get('ip')
|
||||
mac = device.get('mac')
|
||||
|
||||
if not ip:
|
||||
self.logger.warning(f"Skipping device {name} - no IP address")
|
||||
continue
|
||||
|
||||
self.logger.info(f"Checking device: {name} at {ip}")
|
||||
|
||||
try:
|
||||
# Get Status 2 for firmware version
|
||||
url_status = f"http://{ip}/cm?cmnd=Status%202"
|
||||
response = requests.get(url_status, timeout=5)
|
||||
status_data = response.json()
|
||||
|
||||
# Get Status 5 for network info
|
||||
url_network = f"http://{ip}/cm?cmnd=Status%205"
|
||||
response = requests.get(url_network, timeout=5)
|
||||
network_data = response.json()
|
||||
|
||||
# Get Status 6 for MQTT info
|
||||
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
|
||||
response = requests.get(url_mqtt, timeout=5)
|
||||
mqtt_data = response.json()
|
||||
|
||||
# Check and update MQTT settings if needed
|
||||
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
|
||||
|
||||
device_detail = {
|
||||
"name": name,
|
||||
"ip": ip,
|
||||
"mac": mac,
|
||||
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
|
||||
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
|
||||
"mqtt_status": "Updated" if mqtt_updated else "Verified",
|
||||
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"status": "online"
|
||||
}
|
||||
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
||||
device_detail = {
|
||||
"name": name,
|
||||
"ip": ip,
|
||||
"mac": mac,
|
||||
"version": "Unknown",
|
||||
"status": "offline",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
device_details.append(device_detail)
|
||||
time.sleep(0.5)
|
||||
|
||||
# Save all device details at once
|
||||
try:
|
||||
with open('TasmotaDevices.json', 'w') as f:
|
||||
json.dump(device_details, f, indent=2)
|
||||
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error saving device details: {e}")
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
|
||||
parser.add_argument('--config', default='network_configuration.json',
|
||||
help='Path to configuration file')
|
||||
parser.add_argument('--debug', action='store_true',
|
||||
help='Enable debug logging')
|
||||
parser.add_argument('--skip-unifi', action='store_true',
|
||||
help='Skip UniFi discovery and use existing current.json')
|
||||
parser.add_argument('--process-unknown', action='store_true',
|
||||
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set up logging
|
||||
log_level = logging.DEBUG if args.debug else logging.INFO
|
||||
logging.basicConfig(level=log_level,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S')
|
||||
|
||||
print("Starting Tasmota Device Discovery and Version Check...")
|
||||
|
||||
# Create TasmotaDiscovery instance
|
||||
discovery = TasmotaDiscovery(debug=args.debug)
|
||||
discovery.load_config(args.config)
|
||||
|
||||
try:
|
||||
if not args.skip_unifi:
|
||||
print("Step 1: Discovering Tasmota devices...")
|
||||
discovery.setup_unifi_client()
|
||||
tasmota_devices = discovery.get_tasmota_devices()
|
||||
discovery.save_tasmota_config(tasmota_devices)
|
||||
else:
|
||||
print("Skipping UniFi discovery, using existing current.json...")
|
||||
|
||||
print("\nStep 2: Getting detailed version information...")
|
||||
discovery.get_device_details(use_current_json=True)
|
||||
|
||||
if args.process_unknown:
|
||||
print("\nStep 3: Processing unknown devices...")
|
||||
discovery.process_unknown_devices()
|
||||
|
||||
print("\nProcess completed successfully!")
|
||||
print("- Device list saved to: current.json")
|
||||
print("- Detailed information saved to: TasmotaDevices.json")
|
||||
|
||||
except ConnectionError as e:
|
||||
print(f"Connection Error: {str(e)}")
|
||||
print("\nTrying to proceed with existing current.json...")
|
||||
try:
|
||||
discovery.get_device_details(use_current_json=True)
|
||||
print("\nSuccessfully retrieved device details from existing current.json")
|
||||
except Exception as inner_e:
|
||||
print(f"Error processing existing devices: {str(inner_e)}")
|
||||
return 1
|
||||
except Exception as e:
|
||||
print(f"Error: {str(e)}")
|
||||
if args.debug:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
102
console_duplicate_template_fix_summary.md
Normal file
102
console_duplicate_template_fix_summary.md
Normal file
@ -0,0 +1,102 @@
|
||||
# Console Duplicate and Template Matching Fix Summary
|
||||
|
||||
## Issues Addressed
|
||||
|
||||
1. **Duplicate Console Settings**: Console settings were being applied twice during device configuration.
|
||||
2. **Template Matching Failure**: The template matching algorithm was not handling the response format correctly, causing the config_other settings to not be applied.
|
||||
|
||||
## Changes Made
|
||||
|
||||
### 1. Fix for Duplicate Console Settings
|
||||
|
||||
The console settings were being applied in two places:
|
||||
|
||||
1. In `configure_mqtt_settings()` called from `check_mqtt_settings()`
|
||||
2. Directly in `get_device_details()`
|
||||
|
||||
To fix this issue:
|
||||
|
||||
1. Added a `skip_console` parameter to `configure_mqtt_settings()`:
|
||||
```python
|
||||
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False, skip_console=False):
|
||||
```
|
||||
|
||||
2. Modified the function to skip console settings if `skip_console` is True:
|
||||
```python
|
||||
# Apply console settings
|
||||
console_updated = False
|
||||
console_params = mqtt_config.get('console', {})
|
||||
if console_params and not skip_console:
|
||||
# Console settings application code...
|
||||
```
|
||||
|
||||
3. Updated `check_mqtt_settings()` to pass `skip_console=True`:
|
||||
```python
|
||||
return self.configure_mqtt_settings(
|
||||
ip=ip,
|
||||
name=name,
|
||||
mqtt_status=mqtt_status,
|
||||
is_new_device=False,
|
||||
set_friendly_name=False,
|
||||
enable_mqtt=False,
|
||||
with_retry=True,
|
||||
reboot=False,
|
||||
skip_console=True # Skip console settings here as they'll be applied separately
|
||||
)
|
||||
```
|
||||
|
||||
This ensures that console settings are only applied once, directly in `get_device_details()`.
|
||||
|
||||
### 2. Fix for Template Matching Failure
|
||||
|
||||
The template matching algorithm was not handling the response format correctly. The function expected a "Template" key in the response, but the actual response had a different structure.
|
||||
|
||||
To fix this issue:
|
||||
|
||||
1. Added logging of the actual response format for debugging:
|
||||
```python
|
||||
self.logger.debug(f"{name}: Template response: {template_data}")
|
||||
```
|
||||
|
||||
2. Enhanced the template extraction logic to handle different response formats:
|
||||
```python
|
||||
# Extract current template - handle different response formats
|
||||
current_template = ""
|
||||
|
||||
# Try different possible response formats
|
||||
if "Template" in template_data:
|
||||
current_template = template_data.get("Template", "")
|
||||
elif isinstance(template_data, dict) and len(template_data) > 0:
|
||||
# If there's no "Template" key but we have a dict, try to get the first value
|
||||
# This handles cases where the response might be {"NAME":"...","GPIO":[...]}
|
||||
first_key = next(iter(template_data))
|
||||
if isinstance(template_data[first_key], str) and "{" in template_data[first_key]:
|
||||
current_template = template_data[first_key]
|
||||
self.logger.debug(f"{name}: Found template in alternate format under key: {first_key}")
|
||||
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
|
||||
elif all(key in template_data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
||||
# Convert the dict to a JSON string to match the expected format
|
||||
import json
|
||||
current_template = json.dumps(template_data)
|
||||
self.logger.debug(f"{name}: Found template in dict format with NAME, GPIO, FLAG, BASE keys")
|
||||
```
|
||||
|
||||
This allows the function to handle the specific response format with 'NAME', 'GPIO', 'FLAG', and 'BASE' keys, which is what the OfficeLight device returns.
|
||||
|
||||
## Testing and Verification
|
||||
|
||||
The changes were tested with the OfficeLight device and both issues were resolved:
|
||||
|
||||
1. Console settings are now only applied once
|
||||
2. Template matching is working correctly and updating the template as needed
|
||||
|
||||
The TasmotaDevices.json file confirms that the template was successfully updated, with `"template_status": "Updated"`.
|
||||
|
||||
## Conclusion
|
||||
|
||||
These changes optimize the device configuration process by:
|
||||
|
||||
1. Eliminating duplicate application of console settings
|
||||
2. Improving the template matching algorithm to handle different response formats
|
||||
|
||||
This ensures that all configuration steps (MQTT, config_other, and console) are applied correctly and efficiently.
|
||||
78
console_settings_optimization.md
Normal file
78
console_settings_optimization.md
Normal file
@ -0,0 +1,78 @@
|
||||
# Console Settings Optimization
|
||||
|
||||
## Issue Description
|
||||
|
||||
The issue was related to how console settings were being applied in the TasmotaManager code. The original implementation used a `skip_console` parameter in the `configure_mqtt_settings` function to prevent console settings from being applied twice:
|
||||
|
||||
1. Once in `configure_mqtt_settings` (but skipped with `skip_console=True`)
|
||||
2. Again directly in `get_device_details`
|
||||
|
||||
The question was raised: "I question why the skip_console was needed. Seems like the console settings before thecheck_mqtt_settings should be the one deleted?"
|
||||
|
||||
## Analysis
|
||||
|
||||
After reviewing the code, I found that:
|
||||
|
||||
1. In `get_device_details`, it calls `check_mqtt_settings` which calls `configure_mqtt_settings` with `skip_console=True`. This prevents `configure_mqtt_settings` from applying console settings.
|
||||
|
||||
2. Later in `get_device_details`, console settings are applied directly with a large block of code that duplicates functionality already present in `configure_mqtt_settings`.
|
||||
|
||||
3. In `configure_unknown_device`, it calls `configure_mqtt_settings` without specifying `skip_console`, so it uses the default value of `False`. This means console settings are applied when configuring unknown devices.
|
||||
|
||||
This approach added unnecessary complexity with the `skip_console` parameter and made the code less intuitive (why skip in one place and apply in another?).
|
||||
|
||||
## Changes Made
|
||||
|
||||
I implemented the following changes to optimize the code:
|
||||
|
||||
1. Removed the `skip_console` parameter from the `configure_mqtt_settings` function signature:
|
||||
```python
|
||||
def configure_mqtt_settings(self, ip, name, mqtt_status=None, is_new_device=False, set_friendly_name=False, enable_mqtt=False, with_retry=False, reboot=False):
|
||||
```
|
||||
|
||||
2. Updated the condition in `configure_mqtt_settings` to always apply console settings:
|
||||
```python
|
||||
# Apply console settings
|
||||
console_updated = False
|
||||
console_params = mqtt_config.get('console', {})
|
||||
if console_params:
|
||||
self.logger.info(f"{name}: Setting console parameters from configuration")
|
||||
```
|
||||
|
||||
3. Updated `check_mqtt_settings` to call `configure_mqtt_settings` without the `skip_console` parameter:
|
||||
```python
|
||||
return self.configure_mqtt_settings(
|
||||
ip=ip,
|
||||
name=name,
|
||||
mqtt_status=mqtt_status,
|
||||
is_new_device=False,
|
||||
set_friendly_name=False,
|
||||
enable_mqtt=False,
|
||||
with_retry=True,
|
||||
reboot=False
|
||||
)
|
||||
```
|
||||
|
||||
4. Removed the console settings application code in `get_device_details` and replaced it with:
|
||||
```python
|
||||
# Console settings are now applied in configure_mqtt_settings
|
||||
console_updated = mqtt_updated
|
||||
```
|
||||
|
||||
## Benefits
|
||||
|
||||
These changes provide several benefits:
|
||||
|
||||
1. **Simplified Code**: Removed the `skip_console` parameter and eliminated duplicate code.
|
||||
|
||||
2. **More Intuitive Design**: Console settings are now applied in the same place as MQTT settings, making the code more logical and easier to understand.
|
||||
|
||||
3. **Reduced Maintenance**: With only one place to update console settings logic, future changes will be easier to implement.
|
||||
|
||||
4. **Consistent Behavior**: Console settings are now applied consistently for both unknown and known devices.
|
||||
|
||||
## Testing
|
||||
|
||||
The changes were tested to ensure that console settings are still applied correctly. The `console_updated` flag is now set based on the result of the MQTT settings update, which includes console settings application.
|
||||
|
||||
This approach maintains all the functionality of the original code while making it more maintainable and easier to understand.
|
||||
55
console_settings_summary.md
Normal file
55
console_settings_summary.md
Normal file
@ -0,0 +1,55 @@
|
||||
# Console Settings for Unknown Devices - Implementation Summary
|
||||
|
||||
## Requirement
|
||||
For all unknown devices, once the MQTT and hostname are updated but before the reboot, continue with the console settings. Then reboot the device.
|
||||
|
||||
## Changes Made
|
||||
|
||||
1. Modified the `configure_unknown_device` method in `TasmotaManager.py` to:
|
||||
- Apply console settings from the configuration after setting MQTT parameters but before rebooting
|
||||
- Handle special cases for retain parameters (ButtonRetain, SwitchRetain, PowerRetain)
|
||||
- Auto-enable rules that are defined in the configuration
|
||||
- Maintain the same logging and error handling as the rest of the application
|
||||
|
||||
2. Created a test script `test_unknown_device_console_settings.py` to verify the functionality:
|
||||
- The script takes a device identifier (IP or hostname) as an argument
|
||||
- It displays the console parameters that will be applied from the configuration
|
||||
- It processes the device using the modified `configure_unknown_device` method
|
||||
- This allows testing that console settings are applied to unknown devices before rebooting
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Console Settings Application
|
||||
The implementation applies console settings in the following order:
|
||||
1. First, it handles retain parameters (ButtonRetain, SwitchRetain, PowerRetain) with special logic:
|
||||
- For each retain parameter, it first sets the opposite state
|
||||
- Then it sets the desired state
|
||||
- This ensures the MQTT broker's retain flags are properly updated
|
||||
|
||||
2. Next, it processes all other console parameters:
|
||||
- It identifies rule definitions (rule1, rule2, etc.) for auto-enabling
|
||||
- It applies each parameter with a simple HTTP request
|
||||
|
||||
3. Finally, it auto-enables any rules that were defined:
|
||||
- If a rule definition (e.g., rule1) is found, it automatically enables the rule (Rule1 ON)
|
||||
- This ensures rules are active after the device reboots
|
||||
|
||||
### Testing
|
||||
To test this functionality:
|
||||
```
|
||||
./test_unknown_device_console_settings.py <device_identifier>
|
||||
```
|
||||
|
||||
Where `<device_identifier>` is either the IP address or hostname of the device you want to process.
|
||||
|
||||
The test script will:
|
||||
1. Display the console parameters from the configuration
|
||||
2. Process the device, applying hostname, MQTT settings, and console settings
|
||||
3. Report whether the processing was successful
|
||||
|
||||
## Expected Behavior
|
||||
After this change, when an unknown device is processed:
|
||||
1. The hostname and MQTT settings will be updated
|
||||
2. All console settings from the configuration will be applied
|
||||
3. The device will be rebooted
|
||||
4. Upon restart, the device will have all settings properly configured
|
||||
44
fulltopic_fix_summary.md
Normal file
44
fulltopic_fix_summary.md
Normal file
@ -0,0 +1,44 @@
|
||||
# FullTopic Parameter Fix Summary
|
||||
|
||||
## Issue Description
|
||||
When setting the MQTT parameters for FullTopic, the Full Topic was ending up with a %20 at the beginning, as in "%20%prefix%/%topic%/" instead of the correct "%prefix%/%topic%/".
|
||||
|
||||
## Root Cause
|
||||
The issue was in the URL construction when sending commands to Tasmota devices. The code was using a space (%20 in URL encoding) between the command name and its value for all parameters:
|
||||
|
||||
```python
|
||||
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
||||
```
|
||||
|
||||
While this works for most parameters, it causes problems with the FullTopic parameter because the space gets included in the value.
|
||||
|
||||
## Fix Implemented
|
||||
The fix adds special handling for the FullTopic parameter by using "=" instead of a space (%20) between the command and value:
|
||||
|
||||
```python
|
||||
# For FullTopic, we need to avoid adding a space (%20) between the command and value
|
||||
if setting == "FullTopic":
|
||||
url = f"http://{ip}/cm?cmnd={setting}={value}"
|
||||
else:
|
||||
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
||||
```
|
||||
|
||||
This change was implemented in two places:
|
||||
1. In the `configure_unknown_device` method (around line 542)
|
||||
2. In the MQTT settings update code (around line 937)
|
||||
|
||||
## Testing
|
||||
A test script `test_fulltopic_fix.py` was created to verify the fix. The script:
|
||||
1. Connects to a Tasmota device
|
||||
2. Sets the FullTopic parameter using the new method
|
||||
3. Verifies that the FullTopic is set correctly without the %20 prefix
|
||||
|
||||
To run the test:
|
||||
```
|
||||
./test_fulltopic_fix.py <ip_address>
|
||||
```
|
||||
|
||||
Where `<ip_address>` is the IP address of a Tasmota device to test with.
|
||||
|
||||
## Expected Result
|
||||
After this fix, the FullTopic parameter should be set correctly as "%prefix%/%topic%/" without the unwanted %20 at the beginning.
|
||||
27
implementation_summary.md
Normal file
27
implementation_summary.md
Normal file
@ -0,0 +1,27 @@
|
||||
# Implementation Summary
|
||||
|
||||
## Requirement
|
||||
For a single device when identified as unknown device, the script should toggle the device at a 1/2 Hz rate and wait for the user to enter a new Host Name.
|
||||
|
||||
## Changes Made
|
||||
|
||||
1. Modified the `process_single_device` method in `TasmotaManager.py` to:
|
||||
- Check if a device identified as unknown has a toggle button
|
||||
- If it does, toggle the device at 1/2 Hz rate (toggling every 2 seconds)
|
||||
- Display information about the device to help the user identify it
|
||||
- Prompt the user to enter a new hostname for the device
|
||||
- Configure the device with the new hostname if provided
|
||||
|
||||
2. Created a test script `test_unknown_device_toggle.py` to verify the functionality:
|
||||
- The script takes a device identifier (IP or hostname) as an argument
|
||||
- It processes the device using the modified `process_single_device` method
|
||||
- This allows testing the toggling functionality for a single unknown device
|
||||
|
||||
## Testing
|
||||
|
||||
To test this functionality:
|
||||
```
|
||||
./test_unknown_device_toggle.py <device_identifier>
|
||||
```
|
||||
|
||||
Where `<device_identifier>` is either the IP address or hostname of the device you want to process.
|
||||
@ -28,6 +28,15 @@
|
||||
"Topic": "%hostname_base%",
|
||||
"FullTopic": "%prefix%/%topic%/",
|
||||
"NoRetain": false,
|
||||
"config_other": {
|
||||
"TreatLife_SW_SS01S": "{\"NAME\":\"TL SS01S Swtch\",\"GPIO\":[0,0,0,0,52,158,0,0,21,17,0,0,0],\"FLAG\":0,\"BASE\":18}\n",
|
||||
"TreatLife_SW_SS02S": "{\"NAME\":\"Treatlife SS02\",\"GPIO\":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],\"FLAG\":0,\"BASE\":18}",
|
||||
"TreatLife_DIM_DS02S": "{\"NAME\":\"DS02S Dimmer\",\"GPIO\":[0,107,0,108,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":54}",
|
||||
"CloudFree_SW1": "{\"NAME\":\"CloudFree SW1\",\"GPIO\":[0,224,0,32,320,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],\"FLAG\":0,\"BASE\":1}",
|
||||
"Gosund_WP5_Plug": "{\"NAME\":\"Gosund-WP5\",\"GPIO\":[0,0,0,0,17,0,0,0,56,57,21,0,0],\"FLAG\":0,\"BASE\":18}",
|
||||
"CloudFree_X10S_Plug": "{\"NAME\":\"Aoycocr X10S\",\"GPIO\":[56,0,57,0,21,134,0,0,131,17,132,0,0],\"FLAG\":0,\"BASE\":45}\n",
|
||||
"Sonoff_S31_PM_Plug": "{\"NAME\":\"Sonoff S31\",\"GPIO\":[17,145,0,146,0,0,0,0,21,56,0,0,0],\"FLAG\":0,\"BASE\":41}"
|
||||
},
|
||||
"console": {
|
||||
"SwitchRetain": "Off",
|
||||
"ButtonRetain": "Off",
|
||||
|
||||
50
process_unknown_optimization_summary.md
Normal file
50
process_unknown_optimization_summary.md
Normal file
@ -0,0 +1,50 @@
|
||||
# Process Unknown Devices Optimization Summary
|
||||
|
||||
## Issue Description
|
||||
When using the `--process-unknown` flag, the script was unnecessarily getting detailed information for devices that don't match the unknown_device_patterns. This was inefficient because the script was processing all devices first, then filtering out the unknown ones, and then processing only the unknown ones.
|
||||
|
||||
## Root Cause
|
||||
The issue was in the main function of TasmotaManager.py. The script was calling `get_device_details()` for all devices before calling `process_unknown_devices()`. The `get_device_details()` method filters out devices matching unknown_device_patterns, which means it was processing all devices except those that match the patterns. This is the opposite of what we want when processing unknown devices.
|
||||
|
||||
```python
|
||||
# Original code
|
||||
print("\nStep 2: Getting detailed version information...")
|
||||
discovery.get_device_details(use_current_json=True)
|
||||
|
||||
if args.process_unknown:
|
||||
print("\nStep 3: Processing unknown devices...")
|
||||
discovery.process_unknown_devices()
|
||||
```
|
||||
|
||||
## Fix Implemented
|
||||
The fix was to modify the main function to skip the `get_device_details()` call when the `--process-unknown` flag is used. This ensures that we're not wasting time getting detailed information for devices that we don't need to process.
|
||||
|
||||
```python
|
||||
# Modified code
|
||||
if args.process_unknown:
|
||||
print("\nStep 2: Processing unknown devices...")
|
||||
discovery.process_unknown_devices()
|
||||
else:
|
||||
print("\nStep 2: Getting detailed version information...")
|
||||
discovery.get_device_details(use_current_json=True)
|
||||
```
|
||||
|
||||
## Testing
|
||||
A test script `test_process_unknown_optimization.py` was created to verify the fix. The script:
|
||||
1. Runs TasmotaManager with the `--process-unknown` flag and captures the output
|
||||
2. Checks that the output contains "Processing unknown devices" but not "Getting detailed version information"
|
||||
3. Counts how many unknown devices were processed
|
||||
4. Loads the network_configuration.json to get the unknown_device_patterns
|
||||
|
||||
To run the test:
|
||||
```
|
||||
./test_process_unknown_optimization.py
|
||||
```
|
||||
|
||||
## Expected Result
|
||||
After this fix, when the `--process-unknown` flag is used, the script will only process devices that match the unknown_device_patterns, skipping the detailed information gathering for all other devices. This makes the script more efficient and focused on its task.
|
||||
|
||||
## Benefits
|
||||
1. **Improved Performance**: The script no longer wastes time processing devices that it doesn't need to.
|
||||
2. **Reduced Network Traffic**: Fewer HTTP requests are made to devices that don't need to be processed.
|
||||
3. **Clearer Workflow**: The script now has a more logical flow, either processing all devices or only unknown devices, not both.
|
||||
96
template_activation_fix_summary.md
Normal file
96
template_activation_fix_summary.md
Normal file
@ -0,0 +1,96 @@
|
||||
# Template Activation Fix Summary
|
||||
|
||||
## Issue Description
|
||||
|
||||
The issue was that templates were not being properly activated after being set. In the Tasmota web UI, there's an "Activate" checkbox that needs to be checked when applying a template. Without checking this box, the template is set but not activated.
|
||||
|
||||
In our code, we were setting the template using the Template command, but we weren't activating it, which is equivalent to not checking the "Activate" box in the web UI.
|
||||
|
||||
## Changes Made
|
||||
|
||||
### 1. Understanding the Template Activation Process
|
||||
|
||||
In Tasmota, to fully activate a template, three steps are required:
|
||||
1. Set the template using the `Template` command
|
||||
2. Set the module to 0 (Template module) using the `Module 0` command
|
||||
3. Restart the device using the `Restart 1` command
|
||||
|
||||
### 2. Modifications to `check_and_update_template` Method
|
||||
|
||||
We modified the `check_and_update_template` method in `TasmotaManager.py` to include the template activation steps. Changes were made in two places:
|
||||
|
||||
#### When a template is updated:
|
||||
|
||||
```python
|
||||
# URL encode the template value
|
||||
import urllib.parse
|
||||
encoded_value = urllib.parse.quote(template_value)
|
||||
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
|
||||
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"{name}: Template updated successfully")
|
||||
|
||||
# Activate the template by setting module to 0 (Template module)
|
||||
self.logger.info(f"{name}: Activating template by setting module to 0")
|
||||
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
||||
module_response = requests.get(module_url, timeout=5)
|
||||
|
||||
if module_response.status_code == 200:
|
||||
self.logger.info(f"{name}: Module set to 0 successfully")
|
||||
|
||||
# Restart the device to apply the template
|
||||
self.logger.info(f"{name}: Restarting device to apply template")
|
||||
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
||||
restart_response = requests.get(restart_url, timeout=5)
|
||||
|
||||
if restart_response.status_code == 200:
|
||||
self.logger.info(f"{name}: Device restart initiated successfully")
|
||||
template_updated = True
|
||||
else:
|
||||
self.logger.error(f"{name}: Failed to restart device")
|
||||
else:
|
||||
self.logger.error(f"{name}: Failed to set module to 0")
|
||||
else:
|
||||
self.logger.error(f"{name}: Failed to update template")
|
||||
```
|
||||
|
||||
#### When a device name is updated:
|
||||
|
||||
Similar changes were made when a device name is updated to match a template. After successfully updating the device name, we added code to set the module to 0 and restart the device.
|
||||
|
||||
### 3. Test Script
|
||||
|
||||
A test script `test_template_activation.py` was created to verify that templates are properly activated. The script:
|
||||
|
||||
1. Gets a test device from current.json
|
||||
2. Gets a template from network_configuration.json
|
||||
3. Sets the template on the device and activates it
|
||||
4. Verifies that the template was properly activated by checking:
|
||||
- The module is set to 0 (Template module)
|
||||
- The template matches the expected value
|
||||
|
||||
## How to Test
|
||||
|
||||
To test the template activation fix:
|
||||
|
||||
1. Run the test script:
|
||||
```
|
||||
python3 test_template_activation.py
|
||||
```
|
||||
|
||||
2. The script will output information about the template activation process and verify that the template was properly activated.
|
||||
|
||||
3. You can also manually test by:
|
||||
- Running TasmotaManager with a device that has a template defined in network_configuration.json
|
||||
- Checking the device's module and template after TasmotaManager has processed it
|
||||
|
||||
## Expected Results
|
||||
|
||||
After the fix, when a template is set or a device name is updated to match a template:
|
||||
|
||||
1. The template should be properly set on the device
|
||||
2. The module should be set to 0 (Template module)
|
||||
3. The device should restart to apply the template
|
||||
|
||||
This ensures that templates are fully activated, equivalent to checking the "Activate" box in the Tasmota web UI.
|
||||
119
test_fulltopic_fix.py
Executable file
119
test_fulltopic_fix.py
Executable file
@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify the FullTopic parameter is set correctly without a %20 prefix.
|
||||
This script will:
|
||||
1. Connect to a Tasmota device
|
||||
2. Set the FullTopic parameter
|
||||
3. Verify the FullTopic is set correctly without a %20 prefix
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import requests
|
||||
import json
|
||||
import argparse
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
logger = logging.getLogger("FullTopicTest")
|
||||
|
||||
def load_config():
|
||||
"""Load the network configuration."""
|
||||
try:
|
||||
with open('network_configuration.json', 'r') as f:
|
||||
return json.load(f)
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading configuration: {str(e)}")
|
||||
sys.exit(1)
|
||||
|
||||
def test_fulltopic_setting(ip_address):
|
||||
"""Test setting the FullTopic parameter on a device."""
|
||||
logger.info(f"Testing FullTopic setting on device at {ip_address}")
|
||||
|
||||
# Load configuration
|
||||
config = load_config()
|
||||
mqtt_config = config.get('mqtt', {})
|
||||
|
||||
if not mqtt_config:
|
||||
logger.error("No MQTT configuration found")
|
||||
return False
|
||||
|
||||
# Get the FullTopic value from configuration
|
||||
full_topic = mqtt_config.get('FullTopic', '%prefix%/%topic%/')
|
||||
logger.info(f"FullTopic from configuration: {full_topic}")
|
||||
|
||||
# First, check the current FullTopic value
|
||||
try:
|
||||
status_url = f"http://{ip_address}/cm?cmnd=FullTopic"
|
||||
response = requests.get(status_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
current_value = response.text
|
||||
logger.info(f"Current FullTopic value: {current_value}")
|
||||
else:
|
||||
logger.error(f"Failed to get current FullTopic value: {response.status_code}")
|
||||
return False
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Error connecting to device: {str(e)}")
|
||||
return False
|
||||
|
||||
# Set the FullTopic using the fixed method (with = instead of %20)
|
||||
try:
|
||||
set_url = f"http://{ip_address}/cm?cmnd=FullTopic={full_topic}"
|
||||
logger.info(f"Setting FullTopic with URL: {set_url}")
|
||||
response = requests.get(set_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Response from setting FullTopic: {response.text}")
|
||||
else:
|
||||
logger.error(f"Failed to set FullTopic: {response.status_code}")
|
||||
return False
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Error setting FullTopic: {str(e)}")
|
||||
return False
|
||||
|
||||
# Verify the FullTopic was set correctly
|
||||
try:
|
||||
verify_url = f"http://{ip_address}/cm?cmnd=FullTopic"
|
||||
response = requests.get(verify_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
new_value = response.text
|
||||
logger.info(f"New FullTopic value: {new_value}")
|
||||
|
||||
# Check if the value contains %20 at the beginning
|
||||
if "%20" in new_value:
|
||||
logger.error("FullTopic still contains %20 - fix not working")
|
||||
return False
|
||||
else:
|
||||
logger.info("FullTopic set correctly without %20")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to verify FullTopic: {response.status_code}")
|
||||
return False
|
||||
except requests.exceptions.RequestException as e:
|
||||
logger.error(f"Error verifying FullTopic: {str(e)}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""Main function to test the FullTopic fix."""
|
||||
parser = argparse.ArgumentParser(description='Test FullTopic parameter setting')
|
||||
parser.add_argument('ip_address', help='IP address of the Tasmota device to test')
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.ip_address:
|
||||
print("Usage: python test_fulltopic_fix.py <ip_address>")
|
||||
sys.exit(1)
|
||||
|
||||
result = test_fulltopic_setting(args.ip_address)
|
||||
|
||||
if result:
|
||||
print("SUCCESS: FullTopic set correctly without %20 prefix")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("FAILURE: FullTopic not set correctly")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
62
test_hostname_matching.py
Executable file
62
test_hostname_matching.py
Executable file
@ -0,0 +1,62 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script for hostname matching in TasmotaManager.py
|
||||
|
||||
This script tests the hostname matching functionality with various patterns:
|
||||
1. Exact match
|
||||
2. Partial match
|
||||
3. Wildcard match
|
||||
4. Multiple matches
|
||||
"""
|
||||
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
|
||||
def run_test(test_name, hostname_pattern):
|
||||
"""Run a test with the given hostname pattern"""
|
||||
print(f"\n{'='*80}")
|
||||
print(f"TEST: {test_name}")
|
||||
print(f"Pattern: {hostname_pattern}")
|
||||
print(f"{'='*80}")
|
||||
|
||||
# Run the TasmotaManager.py script with the --Device parameter and --debug flag
|
||||
cmd = ["python3", "TasmotaManager.py", "--Device", hostname_pattern, "--debug"]
|
||||
print(f"Running command: {' '.join(cmd)}")
|
||||
|
||||
# Run the command and capture output
|
||||
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
stdout, stderr = process.communicate()
|
||||
|
||||
# Print the output
|
||||
print("\nSTDOUT:")
|
||||
print(stdout)
|
||||
|
||||
if stderr:
|
||||
print("\nSTDERR:")
|
||||
print(stderr)
|
||||
|
||||
print(f"\nExit code: {process.returncode}")
|
||||
return process.returncode
|
||||
|
||||
def main():
|
||||
"""Run all tests"""
|
||||
# Test 1: Exact match
|
||||
run_test("Exact Match", "MasterLamp-5891")
|
||||
|
||||
# Test 2: Partial match
|
||||
run_test("Partial Match", "Master")
|
||||
|
||||
# Test 3: Wildcard match
|
||||
run_test("Wildcard Match", "Master*")
|
||||
|
||||
# Test 4: Wildcard match with * on both sides
|
||||
run_test("Wildcard Match (both sides)", "*Lamp*")
|
||||
|
||||
# Test 5: Multiple matches (should match multiple devices and use the first one)
|
||||
run_test("Multiple Matches", "M")
|
||||
|
||||
print("\nAll tests completed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
99
test_process_unknown_optimization.py
Executable file
99
test_process_unknown_optimization.py
Executable file
@ -0,0 +1,99 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify the optimization for processing unknown devices.
|
||||
This script will run TasmotaManager with the --process-unknown flag
|
||||
and verify that it only processes devices that match the unknown_device_patterns.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import subprocess
|
||||
import os
|
||||
import json
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
logger = logging.getLogger("ProcessUnknownTest")
|
||||
|
||||
def test_process_unknown_optimization():
|
||||
"""Test that the --process-unknown flag skips detailed information gathering for non-matching devices."""
|
||||
logger.info("Testing process-unknown optimization")
|
||||
|
||||
# Check if current.json exists
|
||||
if not os.path.exists('current.json'):
|
||||
logger.error("current.json not found. Run discovery first.")
|
||||
return False
|
||||
|
||||
# Run TasmotaManager with --process-unknown flag and capture output
|
||||
logger.info("Running TasmotaManager with --process-unknown flag")
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["python", "TasmotaManager.py", "--process-unknown", "--skip-unifi", "--debug"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True
|
||||
)
|
||||
output = result.stdout + result.stderr
|
||||
logger.info("TasmotaManager completed successfully")
|
||||
except subprocess.CalledProcessError as e:
|
||||
logger.error(f"Error running TasmotaManager: {e}")
|
||||
logger.error(f"Output: {e.stdout}")
|
||||
logger.error(f"Error: {e.stderr}")
|
||||
return False
|
||||
|
||||
# Check that the output contains "Processing unknown devices" but not "Getting detailed version information"
|
||||
if "Step 2: Processing unknown devices" in output and "Getting detailed version information" not in output:
|
||||
logger.info("Verified that detailed version information gathering was skipped")
|
||||
else:
|
||||
logger.error("Failed to verify that detailed version information gathering was skipped")
|
||||
return False
|
||||
|
||||
# Check the log for evidence that only unknown devices were processed
|
||||
unknown_devices_processed = 0
|
||||
for line in output.splitlines():
|
||||
if "Processing unknown device:" in line:
|
||||
unknown_devices_processed += 1
|
||||
logger.info(f"Found log entry: {line.strip()}")
|
||||
|
||||
logger.info(f"Found {unknown_devices_processed} unknown devices processed")
|
||||
|
||||
# Load network_configuration.json to get unknown_device_patterns
|
||||
try:
|
||||
with open('network_configuration.json', 'r') as f:
|
||||
config = json.load(f)
|
||||
|
||||
network_filters = config['unifi'].get('network_filter', {})
|
||||
unknown_patterns = []
|
||||
for network in network_filters.values():
|
||||
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
||||
|
||||
logger.info(f"Found {len(unknown_patterns)} unknown device patterns in configuration")
|
||||
for pattern in unknown_patterns:
|
||||
logger.info(f" - {pattern}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading configuration: {e}")
|
||||
return False
|
||||
|
||||
logger.info("Test completed successfully")
|
||||
return True
|
||||
|
||||
def main():
|
||||
"""Main function to run the test."""
|
||||
print("Testing process-unknown optimization")
|
||||
|
||||
result = test_process_unknown_optimization()
|
||||
|
||||
if result:
|
||||
print("\nSUCCESS: The optimization for processing unknown devices is working correctly")
|
||||
print("The script only processes devices that match the unknown_device_patterns")
|
||||
sys.exit(0)
|
||||
else:
|
||||
print("\nFAILURE: The optimization for processing unknown devices is not working correctly")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
106
test_rule1_encoding.py
Normal file
106
test_rule1_encoding.py
Normal file
@ -0,0 +1,106 @@
|
||||
#!/usr/bin/env python3
|
||||
import requests
|
||||
import urllib.parse
|
||||
import time
|
||||
import logging
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Device to test - use the same device from test_rule1_device_mode.py
|
||||
DEVICE_IP = "192.168.8.35"
|
||||
|
||||
# Rule1 value from network_configuration.json
|
||||
RULE1_VALUE = "on button1#state=10 do power0 toggle endon"
|
||||
|
||||
def check_rule1():
|
||||
"""Check the current rule1 setting on the device"""
|
||||
url = f"http://{DEVICE_IP}/cm?cmnd=rule1"
|
||||
logger.info(f"Checking rule1: {url}")
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Rule1 response: {response.text}")
|
||||
return response.text
|
||||
else:
|
||||
logger.error(f"Failed to get rule1: HTTP {response.status_code}")
|
||||
return None
|
||||
|
||||
def set_rule1_with_encoding():
|
||||
"""Set rule1 with proper URL encoding"""
|
||||
# URL encode the rule value
|
||||
encoded_value = urllib.parse.quote(RULE1_VALUE)
|
||||
url = f"http://{DEVICE_IP}/cm?cmnd=rule1%20{encoded_value}"
|
||||
|
||||
logger.info(f"Setting rule1 with encoding: {url}")
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Set rule1 response: {response.text}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to set rule1: HTTP {response.status_code}")
|
||||
return False
|
||||
|
||||
def enable_rule1():
|
||||
"""Enable rule1"""
|
||||
url = f"http://{DEVICE_IP}/cm?cmnd=Rule1%201"
|
||||
logger.info(f"Enabling rule1: {url}")
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Enable rule1 response: {response.text}")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to enable rule1: HTTP {response.status_code}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
# Check current rule1
|
||||
logger.info("Checking current rule1")
|
||||
current_rule1 = check_rule1()
|
||||
|
||||
# Set rule1 with proper URL encoding
|
||||
logger.info("Setting rule1 with proper URL encoding")
|
||||
success = set_rule1_with_encoding()
|
||||
if not success:
|
||||
logger.error("Failed to set rule1")
|
||||
return 1
|
||||
|
||||
# Wait for the command to take effect
|
||||
logger.info("Waiting for command to take effect...")
|
||||
time.sleep(2)
|
||||
|
||||
# Check rule1 after setting
|
||||
logger.info("Checking rule1 after setting")
|
||||
after_set_rule1 = check_rule1()
|
||||
|
||||
# Enable rule1
|
||||
logger.info("Enabling rule1")
|
||||
success = enable_rule1()
|
||||
if not success:
|
||||
logger.error("Failed to enable rule1")
|
||||
return 1
|
||||
|
||||
# Wait for the command to take effect
|
||||
logger.info("Waiting for command to take effect...")
|
||||
time.sleep(2)
|
||||
|
||||
# Check rule1 after enabling
|
||||
logger.info("Checking rule1 after enabling")
|
||||
after_enable_rule1 = check_rule1()
|
||||
|
||||
# Compare with expected value
|
||||
if RULE1_VALUE in after_enable_rule1:
|
||||
logger.info("SUCCESS: rule1 was correctly set!")
|
||||
return 0
|
||||
else:
|
||||
logger.error(f"FAILURE: rule1 was not set correctly!")
|
||||
logger.error(f" Expected: {RULE1_VALUE}")
|
||||
logger.error(f" Actual: {after_enable_rule1}")
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
236
test_template_activation.py
Normal file
236
test_template_activation.py
Normal file
@ -0,0 +1,236 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify that templates are properly activated after being set.
|
||||
|
||||
This script:
|
||||
1. Gets a test device from current.json
|
||||
2. Sets a template on the device
|
||||
3. Verifies that the template was properly activated
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import TasmotaManager class
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
from TasmotaManager import TasmotaDiscovery
|
||||
|
||||
def get_test_device():
|
||||
"""Get a test device from current.json"""
|
||||
try:
|
||||
with open('current.json', 'r') as f:
|
||||
data = json.load(f)
|
||||
devices = data.get('tasmota', {}).get('devices', [])
|
||||
if devices:
|
||||
return devices[0] # Use the first device
|
||||
else:
|
||||
logger.error("No devices found in current.json")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading current.json: {e}")
|
||||
return None
|
||||
|
||||
def get_template_from_config():
|
||||
"""Get a template from network_configuration.json"""
|
||||
try:
|
||||
with open('network_configuration.json', 'r') as f:
|
||||
config = json.load(f)
|
||||
templates = config.get('mqtt', {}).get('config_other', {})
|
||||
if templates:
|
||||
# Get the first template
|
||||
template_key = next(iter(templates))
|
||||
template_value = templates[template_key]
|
||||
return template_key, template_value
|
||||
else:
|
||||
logger.error("No templates found in network_configuration.json")
|
||||
return None, None
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading network_configuration.json: {e}")
|
||||
return None, None
|
||||
|
||||
def check_device_module(ip):
|
||||
"""Check the current module of the device"""
|
||||
try:
|
||||
url = f"http://{ip}/cm?cmnd=Module"
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(f"Module response: {data}")
|
||||
|
||||
# Extract module information
|
||||
if "Module" in data:
|
||||
module = data["Module"]
|
||||
return module
|
||||
else:
|
||||
logger.error(f"Unexpected response format: {data}")
|
||||
return None
|
||||
else:
|
||||
logger.error(f"Failed to get module: HTTP {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking module: {e}")
|
||||
return None
|
||||
|
||||
def check_template_on_device(ip):
|
||||
"""Check the current template on the device"""
|
||||
try:
|
||||
url = f"http://{ip}/cm?cmnd=Template"
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
data = response.json()
|
||||
logger.info(f"Template response: {data}")
|
||||
|
||||
# Extract template information
|
||||
template = None
|
||||
if "Template" in data:
|
||||
template = data["Template"]
|
||||
elif isinstance(data, dict) and len(data) > 0:
|
||||
# If there's no "Template" key but we have a dict, try to get the first value
|
||||
first_key = next(iter(data))
|
||||
if isinstance(data[first_key], str) and "{" in data[first_key]:
|
||||
template = data[first_key]
|
||||
# Handle the case where the template is returned as a dict with NAME, GPIO, FLAG, BASE keys
|
||||
elif all(key in data for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
||||
import json
|
||||
template = json.dumps(data)
|
||||
|
||||
return template
|
||||
else:
|
||||
logger.error(f"Failed to get template: HTTP {response.status_code}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking template: {e}")
|
||||
return None
|
||||
|
||||
def set_template_on_device(ip, template_value):
|
||||
"""Set a template on the device and activate it"""
|
||||
try:
|
||||
# URL encode the template value
|
||||
import urllib.parse
|
||||
encoded_value = urllib.parse.quote(template_value)
|
||||
url = f"http://{ip}/cm?cmnd=Template%20{encoded_value}"
|
||||
|
||||
logger.info(f"Setting template: {url}")
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
logger.info(f"Template set response: {response.text}")
|
||||
|
||||
# Set module to 0 to activate the template
|
||||
logger.info("Setting module to 0 to activate template")
|
||||
module_url = f"http://{ip}/cm?cmnd=Module%200"
|
||||
module_response = requests.get(module_url, timeout=5)
|
||||
|
||||
if module_response.status_code == 200:
|
||||
logger.info(f"Module set response: {module_response.text}")
|
||||
|
||||
# Restart the device to apply the template
|
||||
logger.info("Restarting device to apply template")
|
||||
restart_url = f"http://{ip}/cm?cmnd=Restart%201"
|
||||
restart_response = requests.get(restart_url, timeout=5)
|
||||
|
||||
if restart_response.status_code == 200:
|
||||
logger.info("Device restart initiated successfully")
|
||||
return True
|
||||
else:
|
||||
logger.error(f"Failed to restart device: HTTP {restart_response.status_code}")
|
||||
else:
|
||||
logger.error(f"Failed to set module: HTTP {module_response.status_code}")
|
||||
else:
|
||||
logger.error(f"Failed to set template: HTTP {response.status_code}")
|
||||
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error setting template: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""Main test function"""
|
||||
# Get a test device
|
||||
device = get_test_device()
|
||||
if not device:
|
||||
logger.error("No test device available. Run discovery first.")
|
||||
return 1
|
||||
|
||||
device_name = device.get('name')
|
||||
device_ip = device.get('ip')
|
||||
|
||||
logger.info(f"Testing with device: {device_name} (IP: {device_ip})")
|
||||
|
||||
# Get a template from the configuration
|
||||
template_key, template_value = get_template_from_config()
|
||||
if not template_key or not template_value:
|
||||
logger.error("No template available in configuration.")
|
||||
return 1
|
||||
|
||||
logger.info(f"Using template: {template_key} = {template_value}")
|
||||
|
||||
# Check current module and template
|
||||
logger.info("Checking current module and template")
|
||||
current_module = check_device_module(device_ip)
|
||||
current_template = check_template_on_device(device_ip)
|
||||
|
||||
logger.info(f"Current module: {current_module}")
|
||||
logger.info(f"Current template: {current_template}")
|
||||
|
||||
# Set the template on the device
|
||||
logger.info("Setting and activating template")
|
||||
success = set_template_on_device(device_ip, template_value)
|
||||
if not success:
|
||||
logger.error("Failed to set and activate template")
|
||||
return 1
|
||||
|
||||
# Wait for the device to restart
|
||||
logger.info("Waiting for device to restart...")
|
||||
time.sleep(10)
|
||||
|
||||
# Check module and template after restart
|
||||
logger.info("Checking module and template after restart")
|
||||
after_module = check_device_module(device_ip)
|
||||
after_template = check_template_on_device(device_ip)
|
||||
|
||||
logger.info(f"Module after restart: {after_module}")
|
||||
logger.info(f"Template after restart: {after_template}")
|
||||
|
||||
# Verify that the template was activated
|
||||
if after_module == 0:
|
||||
logger.info("SUCCESS: Module is set to 0 (Template module)")
|
||||
else:
|
||||
logger.error(f"FAILURE: Module is not set to 0, got {after_module}")
|
||||
return 1
|
||||
|
||||
# Compare templates (this is approximate since formatting might differ)
|
||||
import json
|
||||
try:
|
||||
# Try to parse both as JSON for comparison
|
||||
template_json = json.loads(template_value)
|
||||
after_json = json.loads(after_template) if after_template else None
|
||||
|
||||
if after_json and all(key in after_json for key in ['NAME', 'GPIO', 'FLAG', 'BASE']):
|
||||
logger.info("SUCCESS: Template appears to be correctly set and activated")
|
||||
return 0
|
||||
else:
|
||||
logger.error("FAILURE: Template does not appear to be correctly set")
|
||||
return 1
|
||||
except json.JSONDecodeError:
|
||||
# If JSON parsing fails, do a simple string comparison
|
||||
if template_value == after_template:
|
||||
logger.info("SUCCESS: Template appears to be correctly set and activated")
|
||||
return 0
|
||||
else:
|
||||
logger.error("FAILURE: Template does not appear to be correctly set")
|
||||
return 1
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
239
test_template_matching.py
Normal file
239
test_template_matching.py
Normal file
@ -0,0 +1,239 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify the template matching algorithm in TasmotaManager.py.
|
||||
|
||||
This script simulates different scenarios to ensure the algorithm works correctly:
|
||||
1. Key matches Device Name, Template matches value
|
||||
2. Key matches Device Name, Template doesn't match value
|
||||
3. No key matches Device Name, but a value matches Template
|
||||
4. No matches at all
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import requests
|
||||
import unittest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||||
datefmt='%Y-%m-%d %H:%M:%S'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Import TasmotaManager class
|
||||
import sys
|
||||
import os
|
||||
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
|
||||
from TasmotaManager import TasmotaDiscovery
|
||||
|
||||
class TestTemplateMatching(unittest.TestCase):
|
||||
"""Test cases for template matching algorithm."""
|
||||
|
||||
def setUp(self):
|
||||
"""Set up test environment."""
|
||||
self.discovery = TasmotaDiscovery(debug=True)
|
||||
|
||||
# Create a mock config with mqtt.config_other
|
||||
self.discovery.config = {
|
||||
'mqtt': {
|
||||
'config_other': {
|
||||
'TreatLife_SW_SS01S': '{"NAME":"TL SS01S Swtch","GPIO":[0,0,0,0,52,158,0,0,21,17,0,0,0],"FLAG":0,"BASE":18}',
|
||||
'TreatLife_SW_SS02S': '{"NAME":"Treatlife SS02","GPIO":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],"FLAG":0,"BASE":18}'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@patch('requests.get')
|
||||
def test_key_matches_template_matches(self, mock_get):
|
||||
"""Test when key matches Device Name and template matches value."""
|
||||
# Mock responses for Status 0 and Template commands
|
||||
mock_responses = [
|
||||
# Status 0 response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}}
|
||||
),
|
||||
# Template response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Template": '{"NAME":"TL SS01S Swtch","GPIO":[0,0,0,0,52,158,0,0,21,17,0,0,0],"FLAG":0,"BASE":18}'}
|
||||
)
|
||||
]
|
||||
mock_get.side_effect = mock_responses
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertFalse(result) # No update needed
|
||||
self.assertEqual(mock_get.call_count, 2) # Only Status 0 and Template calls
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 1: Key matches Device Name, Template matches value - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_key_matches_template_doesnt_match(self, mock_get):
|
||||
"""Test when key matches Device Name but template doesn't match value."""
|
||||
# Mock responses for Status 0, Template, and Template update commands
|
||||
mock_responses = [
|
||||
# Status 0 response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}}
|
||||
),
|
||||
# Template response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Template": '{"NAME":"Different Template","GPIO":[0,0,0,0,0,0,0,0,0,0,0,0,0],"FLAG":0,"BASE":18}'}
|
||||
),
|
||||
# Template update response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Template": "Done"}
|
||||
)
|
||||
]
|
||||
mock_get.side_effect = mock_responses
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertTrue(result) # Template was updated
|
||||
self.assertEqual(mock_get.call_count, 3) # Status 0, Template, and Template update calls
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 2: Key matches Device Name, Template doesn't match value - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_no_key_matches_value_matches(self, mock_get):
|
||||
"""Test when no key matches Device Name but a value matches Template."""
|
||||
# Mock responses for Status 0, Template, and DeviceName update commands
|
||||
mock_responses = [
|
||||
# Status 0 response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {"DeviceName": "Unknown_Device"}}
|
||||
),
|
||||
# Template response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Template": '{"NAME":"Treatlife SS02","GPIO":[0,0,0,0,288,576,0,0,224,32,0,0,0,0],"FLAG":0,"BASE":18}'}
|
||||
),
|
||||
# DeviceName update response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"DeviceName": "Done"}
|
||||
)
|
||||
]
|
||||
mock_get.side_effect = mock_responses
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertTrue(result) # Device name was updated
|
||||
self.assertEqual(mock_get.call_count, 3) # Status 0, Template, and DeviceName update calls
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 3: No key matches Device Name, but a value matches Template - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_no_matches_at_all(self, mock_get):
|
||||
"""Test when there are no matches at all."""
|
||||
# Mock responses for Status 0 and Template commands
|
||||
mock_responses = [
|
||||
# Status 0 response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {"DeviceName": "Unknown_Device"}}
|
||||
),
|
||||
# Template response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Template": '{"NAME":"Unknown Template","GPIO":[0,0,0,0,0,0,0,0,0,0,0,0,0],"FLAG":0,"BASE":18}'}
|
||||
)
|
||||
]
|
||||
mock_get.side_effect = mock_responses
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertFalse(result) # No updates made
|
||||
self.assertEqual(mock_get.call_count, 2) # Only Status 0 and Template calls
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 4: No matches at all - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_no_config_other(self, mock_get):
|
||||
"""Test when there's no mqtt.config_other in the configuration."""
|
||||
# Set empty config_other
|
||||
self.discovery.config = {'mqtt': {}}
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertFalse(result) # No updates made
|
||||
self.assertEqual(mock_get.call_count, 0) # No HTTP calls made
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 5: No mqtt.config_other in configuration - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_status0_failure(self, mock_get):
|
||||
"""Test when Status 0 command fails."""
|
||||
# Mock response for Status 0 command
|
||||
mock_get.return_value = MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {}} # Missing DeviceName
|
||||
)
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertFalse(result) # No updates made
|
||||
self.assertEqual(mock_get.call_count, 1) # Only Status 0 call
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 6: Status 0 command failure - PASSED")
|
||||
|
||||
@patch('requests.get')
|
||||
def test_template_failure(self, mock_get):
|
||||
"""Test when Template command fails."""
|
||||
# Mock responses for Status 0 and Template commands
|
||||
mock_responses = [
|
||||
# Status 0 response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {"Status": {"DeviceName": "TreatLife_SW_SS01S"}}
|
||||
),
|
||||
# Template response
|
||||
MagicMock(
|
||||
status_code=200,
|
||||
json=lambda: {} # Missing Template
|
||||
)
|
||||
]
|
||||
mock_get.side_effect = mock_responses
|
||||
|
||||
# Call the method
|
||||
result = self.discovery.check_and_update_template("192.168.8.100", "test_device")
|
||||
|
||||
# Verify results
|
||||
self.assertFalse(result) # No updates made
|
||||
self.assertEqual(mock_get.call_count, 2) # Status 0 and Template calls
|
||||
|
||||
# Log the result
|
||||
logger.info("Test 7: Template command failure - PASSED")
|
||||
|
||||
def main():
|
||||
"""Run the tests."""
|
||||
unittest.main()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
58
test_unknown_device_console_settings.py
Executable file
58
test_unknown_device_console_settings.py
Executable file
@ -0,0 +1,58 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify that console settings are applied to unknown devices
|
||||
before rebooting. This script will process a single device by IP address
|
||||
or hostname and apply console settings from the configuration.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
import argparse
|
||||
from TasmotaManager import TasmotaDiscovery
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, # Use DEBUG level to see all console settings being applied
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
def main():
|
||||
"""Main function to test the unknown device console settings functionality."""
|
||||
parser = argparse.ArgumentParser(description='Test unknown device console settings')
|
||||
parser.add_argument('device_identifier', help='IP address or hostname of the device to test')
|
||||
parser.add_argument('--debug', action='store_true', help='Enable debug mode')
|
||||
args = parser.parse_args()
|
||||
|
||||
print(f"Testing unknown device console settings for: {args.device_identifier}")
|
||||
|
||||
# Initialize TasmotaDiscovery with debug mode if requested
|
||||
discovery = TasmotaDiscovery(debug=args.debug)
|
||||
|
||||
# Load configuration
|
||||
discovery.load_config()
|
||||
|
||||
# Get console settings from configuration
|
||||
mqtt_config = discovery.config.get('mqtt', {})
|
||||
console_params = mqtt_config.get('console', {})
|
||||
|
||||
if not console_params:
|
||||
print("No console parameters found in configuration. Please add some to test.")
|
||||
sys.exit(1)
|
||||
|
||||
print("Console parameters that will be applied:")
|
||||
for param, value in console_params.items():
|
||||
print(f" {param}: {value}")
|
||||
|
||||
# Process the single device
|
||||
print("\nProcessing device...")
|
||||
result = discovery.process_single_device(args.device_identifier)
|
||||
|
||||
if result:
|
||||
print(f"\nSuccessfully processed device: {args.device_identifier}")
|
||||
print("Console settings should have been applied before reboot.")
|
||||
else:
|
||||
print(f"\nFailed to process device: {args.device_identifier}")
|
||||
print("Check the logs for more information.")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
42
test_unknown_device_toggle.py
Executable file
42
test_unknown_device_toggle.py
Executable file
@ -0,0 +1,42 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify the unknown device toggling functionality.
|
||||
This script will process a single device by IP address or hostname.
|
||||
"""
|
||||
|
||||
import sys
|
||||
import logging
|
||||
from TasmotaManager import TasmotaDiscovery
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
|
||||
def main():
|
||||
"""Main function to test the unknown device toggling functionality."""
|
||||
if len(sys.argv) < 2:
|
||||
print("Usage: python test_unknown_device_toggle.py <device_identifier>")
|
||||
print(" <device_identifier> can be an IP address or hostname")
|
||||
sys.exit(1)
|
||||
|
||||
device_identifier = sys.argv[1]
|
||||
print(f"Testing unknown device toggling for: {device_identifier}")
|
||||
|
||||
# Initialize TasmotaDiscovery with debug mode
|
||||
discovery = TasmotaDiscovery(debug=True)
|
||||
|
||||
# Load configuration
|
||||
discovery.load_config()
|
||||
|
||||
# Process the single device
|
||||
result = discovery.process_single_device(device_identifier)
|
||||
|
||||
if result:
|
||||
print(f"Successfully processed device: {device_identifier}")
|
||||
else:
|
||||
print(f"Failed to process device: {device_identifier}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user