diff --git a/README.md b/README.md index 08d2fe2..a6db253 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ A Python utility for discovering, monitoring, and managing Tasmota devices on a - Tracks device changes over time (new, moved, deprecated devices) - Checks and updates MQTT settings on Tasmota devices - Generates detailed device information including firmware versions +- Processes unknown devices (matching unknown_device_patterns) to set up names and MQTT ## Requirements @@ -83,6 +84,28 @@ Command-line options: - `--config`: Path to configuration file (default: network_configuration.json) - `--debug`: Enable debug logging - `--skip-unifi`: Skip UniFi discovery and use existing current.json +- `--process-unknown`: Process unknown devices (matching unknown_device_patterns) to set up names and MQTT + +## Unknown Device Processing + +The script can process devices that match patterns in the `unknown_device_patterns` list (like "tasmota_" or "ESP-" prefixed devices). When using the `--process-unknown` flag, the script will: + +1. Identify devices matching the unknown device patterns +2. Check if each device has a toggle button (indicating it's a light switch or power plug) +3. Toggle the button at 1/2Hz (on/off every two seconds) to help identify the physical device +4. **How to enter the hostname:** + - The script will display a clear prompt in the console showing the current device name and IP address + - While the device is toggling, you'll see a prompt asking for a new name for the device + - Type the new hostname directly in the console and press Enter + - All debug messages are completely suppressed during this process to keep the console clear +5. Once a hostname is entered, the script will: + - Configure the "Friendly Name 1" field with the new hostname + - Enable MQTT if not already enabled + - Configure MQTT settings from the configuration file + - Save the configuration and reboot the device +6. Move on to the next unknown device + +This feature helps automate the setup of new Tasmota devices that haven't been properly named yet. ## Output Files diff --git a/TasmotaManager.py b/TasmotaManager.py index 92081c4..b01ece0 100644 --- a/TasmotaManager.py +++ b/TasmotaManager.py @@ -141,18 +141,18 @@ class TasmotaDiscovery: raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") def is_tasmota_device(self, device: dict) -> bool: - """Determine if a device is a Tasmota device.""" + """Determine if a device is in the network_filter and not in exclude_patterns.""" name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() ip = device.get('ip', '') - # Check if device is in the configured NoT network + # Check if device is in the configured network network_filters = self.config['unifi'].get('network_filter', {}) for network in network_filters.values(): if ip.startswith(network['subnet']): - self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}") + self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}") - # First check exclusion patterns + # Check exclusion patterns exclude_patterns = network.get('exclude_patterns', []) for pattern in exclude_patterns: pattern = pattern.lower() @@ -162,19 +162,9 @@ class TasmotaDiscovery: self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") return False - # If not excluded, check if it's a Tasmota device - matches = any([ - name.startswith('tasmota'), - name.startswith('sonoff'), - name.endswith('-ts'), - hostname.startswith('tasmota'), - hostname.startswith('sonoff'), - hostname.startswith('esp-'), - any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch']) - ]) - if matches: - self.logger.debug(f"Found Tasmota device: {name}") - return True # Consider all non-excluded devices in NoT network as potential Tasmota devices + # Device is in the network and not excluded + self.logger.debug(f"Found device in network: {name}") + return True return False @@ -370,8 +360,212 @@ class TasmotaDiscovery: except Exception as e: self.logger.error(f"Error saving configuration: {e}") + def get_unknown_devices(self, use_current_json=True): + """Identify devices that match unknown_device_patterns from current.json.""" + self.logger.info("Identifying unknown devices for processing...") + unknown_devices = [] + + try: + source_file = 'current.json' if use_current_json else 'tasmota.json' + with open(source_file, 'r') as f: + data = json.load(f) + all_devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") + except FileNotFoundError: + self.logger.error(f"{source_file} not found. Run discovery first.") + return [] + except json.JSONDecodeError: + self.logger.error(f"Invalid JSON format in {source_file}") + return [] + + # Identify devices matching unknown_device_patterns + network_filters = self.config['unifi'].get('network_filter', {}) + unknown_patterns = [] + for network in network_filters.values(): + unknown_patterns.extend(network.get('unknown_device_patterns', [])) + + for device in all_devices: + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + + for pattern in unknown_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): + self.logger.debug(f"Found unknown device: {name} ({hostname})") + unknown_devices.append(device) + break + + self.logger.info(f"Found {len(unknown_devices)} unknown devices to process") + return unknown_devices + + def process_unknown_devices(self): + """Process unknown devices by checking for toggle button and configuring them. + + This method: + 1. Identifies devices matching unknown_device_patterns + 2. Checks if each device has a toggle button (indicating it's a light/switch) + 3. Toggles the button at 1/2Hz while checking for hostname changes + 4. Prompts the user to enter a new name for the device in the console + 5. Once a name is entered, configures the device with the new hostname + """ + unknown_devices = self.get_unknown_devices() + if not unknown_devices: + self.logger.info("No unknown devices found to process") + return + + self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...") + + for device in unknown_devices: + name = device.get('name', 'Unknown') + ip = device.get('ip') + + if not ip: + self.logger.warning(f"Skipping device {name} - no IP address") + continue + + self.logger.info(f"Processing unknown device: {name} at {ip}") + + # Check if device has a toggle button + try: + # Get the main page to check for toggle button + url = f"http://{ip}/" + response = requests.get(url, timeout=5) + + # Check if there's a toggle button in the response + has_toggle = "toggle" in response.text.lower() + + if has_toggle: + self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug") + + # Start toggling at 1/2Hz + original_hostname = device.get('hostname', '') + toggle_state = False + + # Temporarily disable all logging during toggling + logging.disable(logging.CRITICAL) + + try: + # Clear console output and show prompt + print("\n" + "="*50) + print(f"DEVICE: {name} at IP: {ip}") + print(f"Current hostname: {original_hostname}") + print("="*50) + print("The device is now toggling to help you identify it.") + + # Start toggling in background while waiting for input + import threading + stop_toggle = threading.Event() + + def toggle_device(): + toggle_state = False + while not stop_toggle.is_set(): + toggle_state = not toggle_state + toggle_cmd = "Power On" if toggle_state else "Power Off" + toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}" + try: + requests.get(toggle_url, timeout=2) + except: + pass + time.sleep(2.0) # 1/2Hz rate + + # Start toggle thread + toggle_thread = threading.Thread(target=toggle_device) + toggle_thread.daemon = True + toggle_thread.start() + + # Prompt for new hostname + print("\nPlease enter a new name for this device:") + new_hostname = input("> ").strip() + + # Stop toggling + stop_toggle.set() + toggle_thread.join(timeout=3) + + if new_hostname and new_hostname != original_hostname: + print(f"Setting new hostname to: {new_hostname}") + else: + print("No valid hostname entered, skipping device") + new_hostname = "" + + finally: + # Re-enable logging + logging.disable(logging.NOTSET) + + # If a new hostname was entered, configure the device + if new_hostname: + self.logger.info(f"Configuring device with new hostname: {new_hostname}") + self.configure_unknown_device(ip, new_hostname) + else: + self.logger.warning(f"No new hostname provided for {name}, skipping configuration") + else: + self.logger.info(f"Device {name} does not have a toggle button, skipping") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") + + def configure_unknown_device(self, ip, hostname): + """Configure an unknown device with the given hostname and MQTT settings.""" + try: + # Set Friendly Name + friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}" + response = requests.get(friendly_name_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Set Friendly Name to {hostname}") + else: + self.logger.error(f"Failed to set Friendly Name to {hostname}") + + # Enable MQTT if not already enabled + mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT + response = requests.get(mqtt_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Enabled MQTT for {hostname}") + else: + self.logger.error(f"Failed to enable MQTT for {hostname}") + + # Configure MQTT settings + mqtt_config = self.config.get('mqtt', {}) + if mqtt_config: + # Get the base hostname (everything before the dash) + hostname_base = hostname.split('-')[0] if '-' in hostname else hostname + + mqtt_fields = { + "MqttHost": mqtt_config.get('Host', ''), + "MqttPort": mqtt_config.get('Port', 1883), + "MqttUser": mqtt_config.get('User', ''), + "MqttPassword": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + for setting, value in mqtt_fields.items(): + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.info(f"{hostname}: Set {setting} to {value}") + else: + self.logger.info(f"{hostname}: Set MQTT Password") + else: + self.logger.error(f"{hostname}: Failed to set {setting}") + + # Save configuration (will reboot the device) + save_url = f"http://{ip}/cm?cmnd=Restart%201" + response = requests.get(save_url, timeout=5) + if response.status_code == 200: + self.logger.info(f"Saved configuration and rebooted {hostname}") + else: + self.logger.error(f"Failed to save configuration for {hostname}") + + return True + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error configuring device at {ip}: {str(e)}") + return False + def get_device_details(self, use_current_json=True): - """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings""" + """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings. + Filters out devices matching unknown_device_patterns.""" self.logger.info("Starting to gather detailed device information...") device_details = [] @@ -379,8 +573,8 @@ class TasmotaDiscovery: source_file = 'current.json' if use_current_json else 'tasmota.json' with open(source_file, 'r') as f: data = json.load(f) - devices = data.get('tasmota', {}).get('devices', []) - self.logger.debug(f"Loaded {len(devices)} devices from {source_file}") + all_devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}") except FileNotFoundError: self.logger.error(f"{source_file} not found. Run discovery first.") return @@ -388,6 +582,31 @@ class TasmotaDiscovery: self.logger.error(f"Invalid JSON format in {source_file}") return + # Filter out devices matching unknown_device_patterns + devices = [] + network_filters = self.config['unifi'].get('network_filter', {}) + unknown_patterns = [] + for network in network_filters.values(): + unknown_patterns.extend(network.get('unknown_device_patterns', [])) + + for device in all_devices: + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + + is_unknown = False + for pattern in unknown_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname): + self.logger.debug(f"Skipping unknown device: {name} ({hostname})") + is_unknown = True + break + + if not is_unknown: + devices.append(device) + + self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices") + mqtt_config = self.config.get('mqtt', {}) if not mqtt_config: self.logger.error("MQTT configuration missing from config file") @@ -609,6 +828,8 @@ def main(): help='Enable debug logging') parser.add_argument('--skip-unifi', action='store_true', help='Skip UniFi discovery and use existing current.json') + parser.add_argument('--process-unknown', action='store_true', + help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT') args = parser.parse_args() @@ -636,6 +857,10 @@ def main(): print("\nStep 2: Getting detailed version information...") discovery.get_device_details(use_current_json=True) + if args.process_unknown: + print("\nStep 3: Processing unknown devices...") + discovery.process_unknown_devices() + print("\nProcess completed successfully!") print("- Device list saved to: current.json") print("- Detailed information saved to: TasmotaDevices.json") diff --git a/network_configuration.json b/network_configuration.json index fe267e8..8632014 100644 --- a/network_configuration.json +++ b/network_configuration.json @@ -13,8 +13,9 @@ "*sonos*" ], "unknown_device_patterns": [ - "tasmota*", - "ESP-*" + "tasmota_", + "esp-", + "ESP-" ] } }