177 lines
6.3 KiB
Python
Executable File
177 lines
6.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Test script to check if rule1 is being set when using Device mode.
|
|
This script will:
|
|
1. Run TasmotaManager with --Device parameter
|
|
2. Check if rule1 was properly set on the device
|
|
"""
|
|
|
|
import sys
|
|
import subprocess
|
|
import requests
|
|
import json
|
|
import time
|
|
import logging
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Device to test - use a known device from current.json
|
|
def get_test_device():
|
|
try:
|
|
with open('current.json', 'r') as f:
|
|
data = json.load(f)
|
|
devices = data.get('tasmota', {}).get('devices', [])
|
|
if devices:
|
|
return devices[0] # Use the first device
|
|
else:
|
|
logger.error("No devices found in current.json")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error reading current.json: {e}")
|
|
return None
|
|
|
|
def get_rule1_from_config():
|
|
"""Get the rule1 value from network_configuration.json"""
|
|
try:
|
|
with open('network_configuration.json', 'r') as f:
|
|
config = json.load(f)
|
|
rule1 = config.get('mqtt', {}).get('console', {}).get('rule1', '')
|
|
return rule1
|
|
except Exception as e:
|
|
logger.error(f"Error reading network_configuration.json: {e}")
|
|
return ""
|
|
|
|
def check_rule1_on_device(ip):
|
|
"""Check the current rule1 setting on the device"""
|
|
try:
|
|
# First check the rule1 definition
|
|
url = f"http://{ip}/cm?cmnd=rule1"
|
|
logger.info(f"Sending command: {url}")
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
logger.info(f"Rule1 response: {data}")
|
|
|
|
# The response format might vary, handle different possibilities
|
|
if "Rule1" in data:
|
|
rule_data = data["Rule1"]
|
|
elif "RULE1" in data:
|
|
rule_data = data["RULE1"]
|
|
else:
|
|
logger.error(f"Unexpected response format: {data}")
|
|
return None
|
|
|
|
# Now check if the rule is enabled
|
|
url = f"http://{ip}/cm?cmnd=Rule1"
|
|
logger.info(f"Checking if rule is enabled: {url}")
|
|
response = requests.get(url, timeout=5)
|
|
if response.status_code == 200:
|
|
enable_data = response.json()
|
|
logger.info(f"Rule1 enable status: {enable_data}")
|
|
|
|
# Add enable status to the rule data if it's a dict
|
|
if isinstance(rule_data, dict):
|
|
rule_data["EnableStatus"] = enable_data
|
|
|
|
return rule_data
|
|
else:
|
|
logger.error(f"Failed to get rule1: HTTP {response.status_code}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Error checking rule1 on device: {e}")
|
|
return None
|
|
|
|
def run_device_mode(device_name):
|
|
"""Run TasmotaManager in Device mode"""
|
|
try:
|
|
cmd = ["python3", "TasmotaManager.py", "--Device", device_name, "--debug"]
|
|
logger.info(f"Running command: {' '.join(cmd)}")
|
|
|
|
# Run the command and capture output
|
|
process = subprocess.run(cmd, capture_output=True, text=True)
|
|
|
|
# Log the output
|
|
logger.info("Command output:")
|
|
for line in process.stdout.splitlines():
|
|
logger.info(f" {line}")
|
|
|
|
if process.returncode != 0:
|
|
logger.error(f"Command failed with return code {process.returncode}")
|
|
logger.error(f"Error output: {process.stderr}")
|
|
return False
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error running TasmotaManager: {e}")
|
|
return False
|
|
|
|
def main():
|
|
# Get a test device
|
|
device = get_test_device()
|
|
if not device:
|
|
logger.error("No test device available. Run discovery first.")
|
|
return 1
|
|
|
|
device_name = device.get('name')
|
|
device_ip = device.get('ip')
|
|
|
|
logger.info(f"Testing with device: {device_name} (IP: {device_ip})")
|
|
|
|
# Get expected rule1 from config
|
|
expected_rule1 = get_rule1_from_config()
|
|
logger.info(f"Expected rule1 from config: {expected_rule1}")
|
|
|
|
# Check current rule1 on device
|
|
current_rule1 = check_rule1_on_device(device_ip)
|
|
logger.info(f"Current rule1 on device: {current_rule1}")
|
|
|
|
# Run TasmotaManager in Device mode
|
|
logger.info(f"Running TasmotaManager in Device mode for {device_name}")
|
|
success = run_device_mode(device_name)
|
|
if not success:
|
|
logger.error("Failed to run TasmotaManager in Device mode")
|
|
return 1
|
|
|
|
# Wait a moment for changes to take effect
|
|
logger.info("Waiting for changes to take effect...")
|
|
time.sleep(3)
|
|
|
|
# Check rule1 after running Device mode
|
|
after_rule1 = check_rule1_on_device(device_ip)
|
|
logger.info(f"Rule1 after Device mode: {after_rule1}")
|
|
|
|
# Compare with expected value - handle different response formats
|
|
success = False
|
|
|
|
# If the response is a dict with Rules key, check that value
|
|
if isinstance(after_rule1, dict) and 'Rules' in after_rule1:
|
|
actual_rule = after_rule1['Rules']
|
|
logger.info(f"Extracted rule text from response: {actual_rule}")
|
|
if actual_rule == expected_rule1:
|
|
success = True
|
|
# If the response is a nested dict with Rule1 containing Rules
|
|
elif isinstance(after_rule1, dict) and 'EnableStatus' in after_rule1 and 'Rule1' in after_rule1['EnableStatus']:
|
|
if 'Rules' in after_rule1['EnableStatus']['Rule1']:
|
|
actual_rule = after_rule1['EnableStatus']['Rule1']['Rules']
|
|
logger.info(f"Extracted rule text from nested response: {actual_rule}")
|
|
if actual_rule == expected_rule1:
|
|
success = True
|
|
# Direct string comparison
|
|
elif after_rule1 == expected_rule1:
|
|
success = True
|
|
|
|
if success:
|
|
logger.info("SUCCESS: rule1 was correctly set!")
|
|
return 0
|
|
else:
|
|
logger.error(f"FAILURE: rule1 was not set correctly!")
|
|
logger.error(f" Expected: {expected_rule1}")
|
|
logger.error(f" Actual: {after_rule1}")
|
|
return 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |