From 60ab8f13094d74b6b11a1972e045b8acd40a254d Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Wed, 6 Aug 2025 03:22:26 -0500 Subject: [PATCH] Add single device processing with --Device parameter and hostname matching features --- README.md | 48 ++++++ TasmotaManager.py | 395 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 424 insertions(+), 19 deletions(-) diff --git a/README.md b/README.md index 724a36b..c97af0d 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,54 @@ Command-line options: - `--debug`: Enable debug logging - `--skip-unifi`: Skip UniFi discovery and use existing current.json - `--process-unknown`: Process unknown devices (matching unknown_device_patterns) to set up names and MQTT +- `--Device`: Process a single device by hostname or IP address + +## Single Device Processing + +The script can process a single device by hostname or IP address using the `--Device` parameter. When this parameter is provided, the script will: + +1. Connect to the UniFi controller to find the device +2. If a hostname is provided, the script will find the corresponding IP address +3. If an IP address is provided, the script will find the corresponding hostname +4. Verify the device is in the correct network (as defined in network_filter) +5. Check if the device is in the exclude_patterns list (if so, processing will be skipped) +6. Check if the device is in the unknown_device_patterns list: + - If it is, the script will run the unknown device procedure for just this one device + - If not, the script will run the normal MQTT configuration procedure for just this one device +7. Save the device details to TasmotaDevices.json + +### Hostname Matching Features + +When using a hostname with the `--Device` parameter, the script supports: + +- **Exact matching**: The provided hostname matches exactly (case-insensitive) +- **Partial matching**: The provided hostname is contained within a device's hostname + - Example: `--Device Master` will match devices named "MasterLamp-5891" or "MasterBedroom" +- **Wildcard matching**: The provided hostname contains wildcards (*) that match any characters + - Example: `--Device Master*` will match "MasterLamp-5891" but not "BedroomMaster" + - Example: `--Device *Lamp*` will match any device with "Lamp" in its name + +If multiple devices match the provided hostname pattern, the script will: +1. Log a warning showing all matching devices +2. Automatically use the first match found +3. Continue processing with that device + +This feature is useful for: +- Setting up or updating a single new device without processing all devices +- Troubleshooting a specific device +- Quickly checking if a device is properly configured +- Working with devices when you only remember part of the hostname + +Example usage: +```bash +python TasmotaManager.py --Device mydevice.local +# or +python TasmotaManager.py --Device 192.168.8.123 +# Partial match example +python TasmotaManager.py --Device Master +# Wildcard match example +python TasmotaManager.py --Device *Lamp* +``` ## Unknown Device Processing diff --git a/TasmotaManager.py b/TasmotaManager.py index e9d8abb..e42f100 100644 --- a/TasmotaManager.py +++ b/TasmotaManager.py @@ -539,7 +539,11 @@ class TasmotaDiscovery: } for setting, value in mqtt_fields.items(): - url = f"http://{ip}/cm?cmnd={setting}%20{value}" + # For FullTopic, we need to avoid adding a space (%20) between the command and value + if setting == "FullTopic": + url = f"http://{ip}/cm?cmnd={setting}={value}" + else: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': @@ -549,6 +553,83 @@ class TasmotaDiscovery: else: self.logger.error(f"{hostname}: Failed to set {setting}") + # Apply console settings before rebooting + console_params = mqtt_config.get('console', {}) + if console_params: + self.logger.info(f"{hostname}: Setting console parameters from configuration") + + # Special handling for Retain parameters - need to send opposite state first, then final state + # This is necessary because the changes are what create the update of the Retain state at the MQTT server + retain_params = ["ButtonRetain", "SwitchRetain", "PowerRetain"] + + # Process Retain parameters first + for param in retain_params: + if param in console_params: + try: + final_value = console_params[param] + # Set opposite state first + opposite_value = "On" if final_value.lower() == "off" else "Off" + + # First command (opposite state) + url = f"http://{ip}/cm?cmnd={param}%20{opposite_value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{hostname}: Set {param} to {opposite_value} (step 1 of 2 to update MQTT broker retain settings)") + else: + self.logger.error(f"{hostname}: Failed to set {param} to {opposite_value}") + + # Small delay to ensure commands are processed in order + time.sleep(0.5) + + # Second command (final state) + url = f"http://{ip}/cm?cmnd={param}%20{final_value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{hostname}: Set {param} to {final_value} (step 2 of 2 to update MQTT broker retain settings)") + else: + self.logger.error(f"{hostname}: Failed to set {param} to {final_value}") + except Exception as e: + self.logger.error(f"{hostname}: Unexpected error setting {param} commands: {str(e)}") + + # Process all other console parameters + # Track rules that need to be enabled + rules_to_enable = {} + + for param, value in console_params.items(): + # Skip Retain parameters as they're handled specially above + if param in retain_params: + continue + + # Check if this is a rule definition (lowercase rule1, rule2, etc.) + if param.lower().startswith('rule') and param.lower() == param and param[-1].isdigit(): + # Store the rule number for later enabling + rule_num = param[-1] + rules_to_enable[rule_num] = True + self.logger.debug(f"{hostname}: Detected rule definition {param}, will auto-enable") + + # Regular console parameter + url = f"http://{ip}/cm?cmnd={param}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.debug(f"{hostname}: Set console parameter {param} to {value}") + else: + self.logger.error(f"{hostname}: Failed to set console parameter {param}") + + # Auto-enable any rules that were defined + for rule_num in rules_to_enable: + rule_enable_param = f"Rule{rule_num}" + # Skip if the rule enable command was already in the config + if any(p.lower() == rule_enable_param.lower() for p in console_params): + continue + + # Rule auto-enabling + url = f"http://{ip}/cm?cmnd={rule_enable_param}%201" + response = requests.get(url, timeout=5) + if response.status_code == 200: + self.logger.info(f"{hostname}: Auto-enabled {rule_enable_param}") + else: + self.logger.error(f"{hostname}: Failed to auto-enable {rule_enable_param}") + # Save configuration (will reboot the device) save_url = f"http://{ip}/cm?cmnd=Restart%201" response = requests.get(save_url, timeout=5) @@ -563,6 +644,263 @@ class TasmotaDiscovery: self.logger.error(f"Error configuring device at {ip}: {str(e)}") return False + def is_ip_in_network_filter(self, ip_address): + """Check if an IP address is in any of the configured network filters. + + Args: + ip_address: The IP address to check + + Returns: + tuple: (is_in_network, target_network, network_name) where: + - is_in_network is a boolean indicating if the IP is in a network + - target_network is the network configuration dict or None + - network_name is the name of the network or None + """ + network_filters = self.config['unifi'].get('network_filter', {}) + + for network_name, network in network_filters.items(): + if ip_address.startswith(network['subnet']): + self.logger.info(f"IP {ip_address} is in network: {network_name}") + return True, network, network_name + + self.logger.error(f"IP {ip_address} is not in any configured network") + return False, None, None + + def process_single_device(self, device_identifier): + """Process a single device by hostname or IP address. + + Args: + device_identifier: Either a hostname or IP address + + Returns: + bool: True if device was processed successfully, False otherwise + """ + self.logger.info(f"Processing single device: {device_identifier}") + + # Check if device_identifier is an IP address or hostname + is_ip = bool(re.match(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$", device_identifier)) + + # If it's an IP address, check if it's in the network_filter first + if is_ip: + in_network, target_network, network_name = self.is_ip_in_network_filter(device_identifier) + if not in_network: + return False + + # Setup Unifi client if not already done + if not self.unifi_client: + try: + self.setup_unifi_client() + except ConnectionError as e: + self.logger.error(f"Failed to connect to UniFi controller: {str(e)}") + return False + + # Get all clients from Unifi + try: + all_clients = self.unifi_client.get_clients() + self.logger.debug(f"Found {len(all_clients)} total devices") + except Exception as e: + self.logger.error(f"Error getting devices from UniFi controller: {e}") + return False + + # Find the device in Unifi + target_device = None + if is_ip: + # Search by IP + self.logger.debug(f"Searching for device with IP: {device_identifier}") + target_device = next((device for device in all_clients if device.get('ip') == device_identifier), None) + if not target_device: + self.logger.error(f"No device found with IP: {device_identifier}") + return False + else: + # Search by hostname - support partial and wildcard matches + self.logger.debug(f"Searching for device with hostname: {device_identifier}") + + # Check if the identifier contains wildcards + has_wildcards = '*' in device_identifier + + # Convert wildcards to regex pattern if present + if has_wildcards: + pattern = device_identifier.lower().replace('.', r'\.').replace('*', '.*') + self.logger.debug(f"Using wildcard pattern: {pattern}") + else: + # For partial matches, we'll use the identifier as a substring + pattern = device_identifier.lower() + self.logger.debug(f"Using partial match pattern: {pattern}") + + # Find all matching devices + matching_devices = [] + for device in all_clients: + hostname = device.get('hostname', '').lower() + name = device.get('name', '').lower() + + if has_wildcards: + # For wildcard matches, use regex + if (re.search(f"^{pattern}$", hostname) or re.search(f"^{pattern}$", name)): + matching_devices.append(device) + else: + # For partial matches, check if pattern is a substring + if pattern in hostname or pattern in name: + matching_devices.append(device) + + # Handle the results + if not matching_devices: + self.logger.error(f"No devices found matching: {device_identifier}") + return False + elif len(matching_devices) > 1: + # Multiple matches found - log them and use the first one + self.logger.warning(f"Multiple devices found matching '{device_identifier}':") + for i, device in enumerate(matching_devices, 1): + device_name = device.get('name', device.get('hostname', 'Unknown')) + device_ip = device.get('ip', '') + self.logger.warning(f" {i}. {device_name} (IP: {device_ip})") + self.logger.warning(f"Using the first match: {matching_devices[0].get('name', matching_devices[0].get('hostname', 'Unknown'))}") + + # Use the first (or only) matching device + target_device = matching_devices[0] + + # Get device details + device_name = target_device.get('name', target_device.get('hostname', 'Unknown')) + device_hostname = target_device.get('hostname', '') + device_ip = target_device.get('ip', '') + device_mac = target_device.get('mac', '') + + self.logger.info(f"Found device: {device_name} (IP: {device_ip}, Hostname: {device_hostname})") + + # If we're processing a hostname (not an IP), check if the device's IP is in the network_filter + if not is_ip: + in_network, target_network, network_name = self.is_ip_in_network_filter(device_ip) + if not in_network: + self.logger.error(f"Device {device_name} is not in any configured network") + return False + # For IP addresses, we already have the target_network from the earlier check + + # Check if device is excluded + exclude_patterns = target_network.get('exclude_patterns', []) + for pattern in exclude_patterns: + pattern_lower = pattern.lower() + pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*') + if (re.match(f"^{pattern_regex}$", device_name.lower()) or + re.match(f"^{pattern_regex}$", device_hostname.lower())): + self.logger.error(f"Device {device_name} is excluded by pattern: {pattern}") + return False + + # Check if device is in unknown_device_patterns + unknown_patterns = target_network.get('unknown_device_patterns', []) + is_unknown = False + + for pattern in unknown_patterns: + pattern_lower = pattern.lower() + pattern_regex = pattern_lower.replace('.', r'\.').replace('*', '.*') + if (re.match(f"^{pattern_regex}", device_name.lower()) or + re.match(f"^{pattern_regex}", device_hostname.lower())): + is_unknown = True + self.logger.info(f"Device {device_name} matches unknown device pattern: {pattern}") + break + + # Create a device info dictionary + device_info = { + "name": device_name, + "ip": device_ip, + "mac": device_mac, + "last_seen": target_device.get('last_seen', ''), + "hostname": device_hostname, + "notes": target_device.get('note', ''), + } + + # Process the device based on whether it's unknown or not + if is_unknown: + self.logger.info(f"Processing unknown device: {device_name}") + + # Check if device has a toggle button + try: + # Get the main page to check for toggle button + url = f"http://{device_ip}/" + response = requests.get(url, timeout=5) + + # Check if there's a toggle button in the response + has_toggle = "toggle" in response.text.lower() + + if has_toggle: + self.logger.info(f"Device {device_name} has a toggle button, toggling at 1/2Hz rate") + + # Start toggling at 1/2Hz + toggle_state = False + + # Temporarily disable all logging during toggling + logging.disable(logging.CRITICAL) + + try: + # Clear console output and show prompt + print("\n" + "="*50) + print(f"DEVICE: {device_name} at IP: {device_ip}") + print(f"Current hostname: {device_hostname}") + print("="*50) + print("The device is now toggling to help you identify it.") + + # Start toggling in background while waiting for input + import threading + stop_toggle = threading.Event() + + def toggle_device(): + toggle_state = False + while not stop_toggle.is_set(): + toggle_state = not toggle_state + toggle_cmd = "Power On" if toggle_state else "Power Off" + toggle_url = f"http://{device_ip}/cm?cmnd={toggle_cmd}" + try: + requests.get(toggle_url, timeout=2) + except: + pass + time.sleep(2.0) # 1/2Hz rate + + # Start toggle thread + toggle_thread = threading.Thread(target=toggle_device) + toggle_thread.daemon = True + toggle_thread.start() + + # Prompt for new hostname + print("\nPlease enter a new name for this device:") + new_hostname = input("> ").strip() + + # Stop toggling + stop_toggle.set() + toggle_thread.join(timeout=3) + + if new_hostname and new_hostname != device_hostname: + print(f"Setting new hostname to: {new_hostname}") + # Re-enable logging + logging.disable(logging.NOTSET) + return self.configure_unknown_device(device_ip, new_hostname) + else: + print("No valid hostname entered, skipping device") + # Re-enable logging + logging.disable(logging.NOTSET) + return False + + finally: + # Re-enable logging + logging.disable(logging.NOTSET) + else: + self.logger.info(f"Device {device_name} does not have a toggle button") + return self.configure_unknown_device(device_ip, device_hostname) + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {device_name} at {device_ip}: {str(e)}") + return False + else: + self.logger.info(f"Processing normal device: {device_name}") + # Create a temporary list with just this device + temp_devices = [device_info] + + # Save to current.json temporarily + current_config = {"tasmota": {"devices": temp_devices}} + with open('current.json', 'w') as f: + json.dump(current_config, f, indent=2) + + # Process the device + self.get_device_details(use_current_json=True) + return True + def get_device_details(self, use_current_json=True): """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings. Filters out devices matching unknown_device_patterns. @@ -673,7 +1011,11 @@ class TasmotaDiscovery: # Apply changes if needed for setting, value in changes_needed: try: - url = f"http://{ip}/cm?cmnd={setting}%20{value}" + # For FullTopic, we need to avoid adding a space (%20) between the command and value + if setting == "FullTopic": + url = f"http://{ip}/cm?cmnd={setting}={value}" + else: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" response = requests.get(url, timeout=5) if response.status_code == 200: if setting != 'MqttPassword': @@ -1022,6 +1364,8 @@ def main(): help='Skip UniFi discovery and use existing current.json') parser.add_argument('--process-unknown', action='store_true', help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT') + parser.add_argument('--Device', + help='Process a single device by hostname or IP address') args = parser.parse_args() @@ -1038,24 +1382,37 @@ def main(): discovery.load_config(args.config) try: - if not args.skip_unifi: - print("Step 1: Discovering Tasmota devices...") - discovery.setup_unifi_client() - tasmota_devices = discovery.get_tasmota_devices() - discovery.save_tasmota_config(tasmota_devices) + # Process a single device if --Device parameter is provided + if args.Device: + print(f"Processing single device: {args.Device}") + # Let process_single_device handle the UniFi client setup as needed + success = discovery.process_single_device(args.Device) + if success: + print(f"\nDevice {args.Device} processed successfully!") + print("- Detailed information saved to: TasmotaDevices.json") + else: + print(f"\nFailed to process device: {args.Device}") + return 1 else: - print("Skipping UniFi discovery, using existing current.json...") - - print("\nStep 2: Getting detailed version information...") - discovery.get_device_details(use_current_json=True) - - if args.process_unknown: - print("\nStep 3: Processing unknown devices...") - discovery.process_unknown_devices() - - print("\nProcess completed successfully!") - print("- Device list saved to: current.json") - print("- Detailed information saved to: TasmotaDevices.json") + # Normal processing flow + if not args.skip_unifi: + print("Step 1: Discovering Tasmota devices...") + discovery.setup_unifi_client() + tasmota_devices = discovery.get_tasmota_devices() + discovery.save_tasmota_config(tasmota_devices) + else: + print("Skipping UniFi discovery, using existing current.json...") + + if args.process_unknown: + print("\nStep 2: Processing unknown devices...") + discovery.process_unknown_devices() + else: + print("\nStep 2: Getting detailed version information...") + discovery.get_device_details(use_current_json=True) + + print("\nProcess completed successfully!") + print("- Device list saved to: current.json") + print("- Detailed information saved to: TasmotaDevices.json") except ConnectionError as e: print(f"Connection Error: {str(e)}")