Add single device processing with --Device parameter and hostname matching features

This commit is contained in:
Mike Geppert 2025-08-06 03:22:26 -05:00
parent 9216619cd1
commit 60ab8f1309
2 changed files with 424 additions and 19 deletions

View File

@ -99,6 +99,54 @@ Command-line options:
- `--debug`: Enable debug logging
- `--skip-unifi`: Skip UniFi discovery and use existing current.json
- `--process-unknown`: Process unknown devices (matching unknown_device_patterns) to set up names and MQTT
- `--Device`: Process a single device by hostname or IP address
## Single Device Processing
The script can process a single device by hostname or IP address using the `--Device` parameter. When this parameter is provided, the script will:
1. Connect to the UniFi controller to find the device
2. If a hostname is provided, the script will find the corresponding IP address
3. If an IP address is provided, the script will find the corresponding hostname
4. Verify the device is in the correct network (as defined in network_filter)
5. Check if the device is in the exclude_patterns list (if so, processing will be skipped)
6. Check if the device is in the unknown_device_patterns list:
- If it is, the script will run the unknown device procedure for just this one device
- If not, the script will run the normal MQTT configuration procedure for just this one device
7. Save the device details to TasmotaDevices.json
### Hostname Matching Features
When using a hostname with the `--Device` parameter, the script supports:
- **Exact matching**: The provided hostname matches exactly (case-insensitive)
- **Partial matching**: The provided hostname is contained within a device's hostname
- Example: `--Device Master` will match devices named "MasterLamp-5891" or "MasterBedroom"
- **Wildcard matching**: The provided hostname contains wildcards (*) that match any characters
- Example: `--Device Master*` will match "MasterLamp-5891" but not "BedroomMaster"
- Example: `--Device *Lamp*` will match any device with "Lamp" in its name
If multiple devices match the provided hostname pattern, the script will:
1. Log a warning showing all matching devices
2. Automatically use the first match found
3. Continue processing with that device
This feature is useful for:
- Setting up or updating a single new device without processing all devices
- Troubleshooting a specific device
- Quickly checking if a device is properly configured
- Working with devices when you only remember part of the hostname
Example usage:
```bash
python TasmotaManager.py --Device mydevice.local
# or
python TasmotaManager.py --Device 192.168.8.123
# Partial match example
python TasmotaManager.py --Device Master
# Wildcard match example
python TasmotaManager.py --Device *Lamp*
```
## Unknown Device Processing

View File

@ -539,7 +539,11 @@ class TasmotaDiscovery:
}
for setting, value in mqtt_fields.items():
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
# For FullTopic, we need to avoid adding a space (%20) between the command and value
if setting == "FullTopic":
url = f"http://{ip}/cm?cmnd={setting}={value}"
else:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
@ -549,6 +553,83 @@ class TasmotaDiscovery:
else:
self.logger.error(f"{hostname}: Failed to set {setting}")
# Apply console settings before rebooting
console_params = mqtt_config.get('console', {})
if console_params:
self.logger.info(f"{hostname}: Setting console parameters from configuration")
# Special handling for Retain parameters - need to send opposite state first, then final state
# This is necessary because the changes are what create the update of the Retain state at the MQTT server
retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"]
# Process Retain parameters first
for param in retain_params:
if param in console_params:
try:
final_value = console_params[param]
# Set opposite state first
opposite_value = "On" if final_value.lower() == "off" else "Off"
# First command (opposite state)
url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{hostname}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{hostname}: Failed to set {param} to {opposite_value}")
# Small delay to ensure commands are processed in order
time.sleep(0.5)
# Second command (final state)
url = f"http://{ip}/cm?cmnd={param}%20{final_value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{hostname}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)")
else:
self.logger.error(f"{hostname}: Failed to set {param} to {final_value}")
except Exception as e:
self.logger.error(f"{hostname}: Unexpected error setting {param} commands: {str(e)}")
# Process all other console parameters
# Track rules that need to be enabled
rules_to_enable = {}
for param, value in console_params.items():
# Skip Retain parameters as they're handled specially above
if param in retain_params:
continue
# Check if this is a rule definition (lowercase rule1, rule2, etc.)
if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit():
# Store the rule number for later enabling
rule_num = param[-1]
rules_to_enable[rule_num] = True
self.logger.debug(f"{hostname}: Detected rule definition {param}, will auto-enable")
# Regular console parameter
url = f"http://{ip}/cm?cmnd={param}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.debug(f"{hostname}: Set console parameter {param} to {value}")
else:
self.logger.error(f"{hostname}: Failed to set console parameter {param}")
# Auto-enable any rules that were defined
for rule_num in rules_to_enable:
rule_enable_param = f"Rule{rule_num}"
# Skip if the rule enable command was already in the config
if any(p.lower() == rule_enable_param.lower() for p in console_params):
continue
# Rule auto-enabling
url = f"http://{ip}/cm?cmnd={rule_enable_param}%201"
response = requests.get(url, timeout=5)
if response.status_code == 200:
self.logger.info(f"{hostname}: Auto-enabled {rule_enable_param}")
else:
self.logger.error(f"{hostname}: Failed to auto-enable {rule_enable_param}")
# Save configuration (will reboot the device)
save_url = f"http://{ip}/cm?cmnd=Restart%201"
response = requests.get(save_url, timeout=5)
@ -563,6 +644,263 @@ class TasmotaDiscovery:
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
return False
def is_ip_in_network_filter(self, ip_address):
"""Check if an IP address is in any of the configured network filters.
Args:
ip_address: The IP address to check
Returns:
tuple: (is_in_network, target_network, network_name) where:
- is_in_network is a boolean indicating if the IP is in a network
- target_network is the network configuration dict or None
- network_name is the name of the network or None
"""
network_filters = self.config['unifi'].get('network_filter', {})
for network_name, network in network_filters.items():
if ip_address.startswith(network['subnet']):
self.logger.info(f"IP {ip_address} is in network: {network_name}")
return True, network, network_name
self.logger.error(f"IP {ip_address} is not in any configured network")
return False, None, None
def process_single_device(self, device_identifier):
"""Process a single device by hostname or IP address.
Args:
device_identifier: Either a hostname or IP address
Returns:
bool: True if device was processed successfully, False otherwise
"""
self.logger.info(f"Processing single device: {device_identifier}")
# Check if device_identifier is an IP address or hostname
is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier))
# If it's an IP address, check if it's in the network_filter first
if is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier)
if not in_network:
return False
# Setup Unifi client if not already done
if not self.unifi_client:
try:
self.setup_unifi_client()
except ConnectionError as e:
self.logger.error(f"Failed to connect to UniFi controller: {str(e)}")
return False
# Get all clients from Unifi
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return False
# Find the device in Unifi
target_device = None
if is_ip:
# Search by IP
self.logger.debug(f"Searching for device with IP: {device_identifier}")
target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None)
if not target_device:
self.logger.error(f"No device found with IP: {device_identifier}")
return False
else:
# Search by hostname - support partial and wildcard matches
self.logger.debug(f"Searching for device with hostname: {device_identifier}")
# Check if the identifier contains wildcards
has_wildcards = '*' in device_identifier
# Convert wildcards to regex pattern if present
if has_wildcards:
pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*')
self.logger.debug(f"Using wildcard pattern: {pattern}")
else:
# For partial matches, we'll use the identifier as a substring
pattern = device_identifier.lower()
self.logger.debug(f"Using partial match pattern: {pattern}")
# Find all matching devices
matching_devices = []
for device in all_clients:
hostname = device.get('hostname', '').lower()
name = device.get('name', '').lower()
if has_wildcards:
# For wildcard matches, use regex
if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)):
matching_devices.append(device)
else:
# For partial matches, check if pattern is a substring
if pattern in hostname or pattern in name:
matching_devices.append(device)
# Handle the results
if not matching_devices:
self.logger.error(f"No devices found matching: {device_identifier}")
return False
elif len(matching_devices) > 1:
# Multiple matches found - log them and use the first one
self.logger.warning(f"Multiple devices found matching '{device_identifier}':")
for i, device in enumerate(matching_devices, 1):
device_name = device.get('name', device.get('hostname', 'Unknown'))
device_ip = device.get('ip', '')
self.logger.warning(f" {i}. {device_name} (IP: {device_ip})")
self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}")
# Use the first (or only) matching device
target_device = matching_devices[0]
# Get device details
device_name = target_device.get('name', target_device.get('hostname', 'Unknown'))
device_hostname = target_device.get('hostname', '')
device_ip = target_device.get('ip', '')
device_mac = target_device.get('mac', '')
self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})")
# If we're processing a hostname (not an IP), check if the device's IP is in the network_filter
if not is_ip:
in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip)
if not in_network:
self.logger.error(f"Device {device_name} is not in any configured network")
return False
# For IP addresses, we already have the target_network from the earlier check
# Check if device is excluded
exclude_patterns = target_network.get('exclude_patterns', [])
for pattern in exclude_patterns:
pattern_lower = pattern.lower()
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
if (re.match(f"^{pattern_regex}$", device_name.lower()) or
re.match(f"^{pattern_regex}$", device_hostname.lower())):
self.logger.error(f"Device {device_name} is excluded by pattern: {pattern}")
return False
# Check if device is in unknown_device_patterns
unknown_patterns = target_network.get('unknown_device_patterns', [])
is_unknown = False
for pattern in unknown_patterns:
pattern_lower = pattern.lower()
pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*')
if (re.match(f"^{pattern_regex}", device_name.lower()) or
re.match(f"^{pattern_regex}", device_hostname.lower())):
is_unknown = True
self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}")
break
# Create a device info dictionary
device_info = {
"name": device_name,
"ip": device_ip,
"mac": device_mac,
"last_seen": target_device.get('last_seen', ''),
"hostname": device_hostname,
"notes": target_device.get('note', ''),
}
# Process the device based on whether it's unknown or not
if is_unknown:
self.logger.info(f"Processing unknown device: {device_name}")
# Check if device has a toggle button
try:
# Get the main page to check for toggle button
url = f"http://{device_ip}/"
response = requests.get(url, timeout=5)
# Check if there's a toggle button in the response
has_toggle = "toggle" in response.text.lower()
if has_toggle:
self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate")
# Start toggling at 1/2Hz
toggle_state = False
# Temporarily disable all logging during toggling
logging.disable(logging.CRITICAL)
try:
# Clear console output and show prompt
print("\n" + "="*50)
print(f"DEVICE: {device_name} at IP: {device_ip}")
print(f"Current hostname: {device_hostname}")
print("="*50)
print("The device is now toggling to help you identify it.")
# Start toggling in background while waiting for input
import threading
stop_toggle = threading.Event()
def toggle_device():
toggle_state = False
while not stop_toggle.is_set():
toggle_state = not toggle_state
toggle_cmd = "Power On" if toggle_state else "Power Off"
toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}"
try:
requests.get(toggle_url, timeout=2)
except:
pass
time.sleep(2.0) # 1/2Hz rate
# Start toggle thread
toggle_thread = threading.Thread(target=toggle_device)
toggle_thread.daemon = True
toggle_thread.start()
# Prompt for new hostname
print("\nPlease enter a new name for this device:")
new_hostname = input("> ").strip()
# Stop toggling
stop_toggle.set()
toggle_thread.join(timeout=3)
if new_hostname and new_hostname != device_hostname:
print(f"Setting new hostname to: {new_hostname}")
# Re-enable logging
logging.disable(logging.NOTSET)
return self.configure_unknown_device(device_ip, new_hostname)
else:
print("No valid hostname entered, skipping device")
# Re-enable logging
logging.disable(logging.NOTSET)
return False
finally:
# Re-enable logging
logging.disable(logging.NOTSET)
else:
self.logger.info(f"Device {device_name} does not have a toggle button")
return self.configure_unknown_device(device_ip, device_hostname)
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}")
return False
else:
self.logger.info(f"Processing normal device: {device_name}")
# Create a temporary list with just this device
temp_devices = [device_info]
# Save to current.json temporarily
current_config = {"tasmota": {"devices": temp_devices}}
with open('current.json', 'w') as f:
json.dump(current_config, f, indent=2)
# Process the device
self.get_device_details(use_current_json=True)
return True
def get_device_details(self, use_current_json=True):
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
Filters out devices matching unknown_device_patterns.
@ -673,7 +1011,11 @@ class TasmotaDiscovery:
# Apply changes if needed
for setting, value in changes_needed:
try:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
# For FullTopic, we need to avoid adding a space (%20) between the command and value
if setting == "FullTopic":
url = f"http://{ip}/cm?cmnd={setting}={value}"
else:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
@ -1022,6 +1364,8 @@ def main():
help='Skip UniFi discovery and use existing current.json')
parser.add_argument('--process-unknown', action='store_true',
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
parser.add_argument('--Device',
help='Process a single device by hostname or IP address')
args = parser.parse_args()
@ -1038,24 +1382,37 @@ def main():
discovery.load_config(args.config)
try:
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
# Process a single device if --Device parameter is provided
if args.Device:
print(f"Processing single device: {args.Device}")
# Let process_single_device handle the UniFi client setup as needed
success = discovery.process_single_device(args.Device)
if success:
print(f"\nDevice {args.Device} processed successfully!")
print("- Detailed information saved to: TasmotaDevices.json")
else:
print(f"\nFailed to process device: {args.Device}")
return 1
else:
print("Skipping UniFi discovery, using existing current.json...")
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
if args.process_unknown:
print("\nStep 3: Processing unknown devices...")
discovery.process_unknown_devices()
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
# Normal processing flow
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
else:
print("Skipping UniFi discovery, using existing current.json...")
if args.process_unknown:
print("\nStep 2: Processing unknown devices...")
discovery.process_unknown_devices()
else:
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
except ConnectionError as e:
print(f"Connection Error: {str(e)}")