From 488afdbb3dc81b35461a09b504b507f6c168b8d5 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Tue, 15 Jul 2025 01:32:57 -0500 Subject: [PATCH 1/7] Initial Creation --- TasmotaManager.py | 185 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 TasmotaManager.py diff --git a/TasmotaManager.py b/TasmotaManager.py new file mode 100644 index 0000000..58b0c4a --- /dev/null +++ b/TasmotaManager.py @@ -0,0 +1,185 @@ +import json +import logging +import os +import sys +from datetime import datetime +from typing import Optional +import requests +from urllib3.exceptions import InsecureRequestWarning +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +class UnifiClient: + def __init__(self, host: str, username: str, password: str, site_id: str = 'default', ssl_verify: bool = True): + self.base_url = f"https://{host}" + self.site_id = site_id + self.session = requests.Session() + self.session.verify = ssl_verify + self._login(username, password) + + def _login(self, username: str, password: str) -> None: + """Authenticate with the UniFi Controller.""" + login_url = f"{self.base_url}/api/login" + response = self.session.post( + login_url, + json={"username": username, "password": password} + ) + response.raise_for_status() + + def get_clients(self) -> list: + """Get all clients from the UniFi Controller.""" + url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + +class TasmotaDiscovery: + def __init__(self, debug: bool = False): + """Initialize the TasmotaDiscovery with optional debug mode.""" + log_level = logging.DEBUG if debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + self.logger = logging.getLogger(__name__) + self.config = None + self.unifi_client = None + + def load_config(self, config_path: Optional[str] = None) -> dict: + """Load configuration from JSON file.""" + if config_path is None: + config_path = os.path.join(os.path.dirname(__file__), 'config.json') + + self.logger.debug(f"Loading configuration from: {config_path}") + try: + with open(config_path, 'r') as config_file: + self.config = json.load(config_file) + self.logger.debug("Configuration loaded successfully from %s", config_path) + return self.config + except FileNotFoundError: + self.logger.error(f"Configuration file not found at {config_path}") + sys.exit(1) + except json.JSONDecodeError: + self.logger.error("Invalid JSON in configuration file") + sys.exit(1) + + def setup_unifi_client(self) -> UnifiClient: + """Initialize UniFi client with configuration.""" + self.logger.debug("Setting up UniFi client") + try: + host = self.config['unifi']['host'] + # Remove https:// if present + if host.startswith('https://'): + host = host[8:] + elif host.startswith('http://'): + host = host[7:] + + self.logger.debug(f"Connecting to UniFi Controller at {host}") + + self.unifi_client = UnifiClient( + host=host, + username=self.config['unifi']['username'], + password=self.config['unifi']['password'], + site_id=self.config['unifi'].get('site', 'default'), + ssl_verify=False + ) + self.logger.debug("UniFi client setup successful") + return self.unifi_client + except KeyError as e: + self.logger.error(f"Missing required UniFi configuration: {e}") + self.logger.debug("Connection details:") + self.logger.debug(f"Host: {self.config['unifi'].get('host', 'Not set')}") + self.logger.debug(f"Username: {self.config['unifi'].get('username', 'Not set')}") + self.logger.debug("Please verify your configuration file") + sys.exit(1) + except Exception as e: + self.logger.error(f"Error connecting to UniFi controller: {e}") + sys.exit(1) + + def is_tasmota_device(self, device: dict) -> bool: + """Determine if a device is a Tasmota device.""" + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + self.logger.debug(f"Checking device: {name} ({hostname})") + matches = any([ + name.startswith('tasmota'), + name.startswith('sonoff'), + name.endswith('-ts'), + hostname.startswith('tasmota'), + hostname.startswith('sonoff') + ]) + if matches: + self.logger.debug(f"Found Tasmota device: {name}") + return matches + + def get_tasmota_devices(self) -> list: + """Query UniFi controller and filter Tasmota devices.""" + devices = [] + self.logger.debug("Querying UniFi controller for devices") + try: + all_clients = self.unifi_client.get_clients() + self.logger.debug(f"Found {len(all_clients)} total devices") + + for device in all_clients: + if self.is_tasmota_device(device): + device_info = { + "name": device.get('name', device.get('hostname', 'Unknown')), + "ip": device.get('ip', ''), + "mac": device.get('mac', ''), + "last_seen": device.get('last_seen', ''), + "hostname": device.get('hostname', ''), + "notes": device.get('note', ''), + } + devices.append(device_info) + + self.logger.debug(f"Found {len(devices)} Tasmota devices") + return devices + except Exception as e: + self.logger.error(f"Error getting devices from UniFi controller: {e}") + return [] + + def save_tasmota_config(self, devices: list, filename: str = "tasmota_devices.json") -> None: + """Save Tasmota device information to a JSON file.""" + self.logger.debug(f"Saving Tasmota configuration to {filename}") + config = { + "tasmota": { + "devices": devices, + "generated_at": datetime.now().isoformat(), + "total_devices": len(devices) + } + } + + try: + if os.path.exists(filename): + backup_name = f"{filename}.backup" + os.rename(filename, backup_name) + self.logger.info(f"Created backup of existing configuration as {backup_name}") + + with open(filename, 'w') as f: + json.dump(config, f, indent=4) + self.logger.info(f"Successfully saved {len(devices)} Tasmota devices to {filename}") + + print("\nFound Tasmota Devices:") + for device in devices: + print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") + + except Exception as e: + self.logger.error(f"Error saving Tasmota configuration: {e}") + +def main(): + import argparse + parser = argparse.ArgumentParser(description='Discover Tasmota devices on UniFi network') + parser.add_argument('--debug', action='store_true', help='Enable debug logging') + parser.add_argument('--config', type=str, help='Path to configuration file') + parser.add_argument('--output', type=str, default='tasmota_devices.json', + help='Output file for device list (default: tasmota_devices.json)') + args = parser.parse_args() + + discovery = TasmotaDiscovery(debug=args.debug) + discovery.load_config(args.config) + discovery.setup_unifi_client() + devices = discovery.get_tasmota_devices() + discovery.save_tasmota_config(devices, args.output) + +if __name__ == '__main__': + main() \ No newline at end of file From 245646a7b7c525471a76c5f086943258643804e5 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Tue, 15 Jul 2025 01:48:20 -0500 Subject: [PATCH 2/7] Added config file --- network_configuration.json | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 network_configuration.json diff --git a/network_configuration.json b/network_configuration.json new file mode 100644 index 0000000..cffc988 --- /dev/null +++ b/network_configuration.json @@ -0,0 +1,25 @@ +{ + "unifi": { + "host": "https://192.168.6.1", + "username": "Tasmota", + "password": "TasmotaManager12!@", + "site": "default", + "network_filter": { + "NoT_network": { + "name": "NoT", + "subnet": "192.168.8", + "exclude_patterns": [ + "homeassistant*", + "*sonos*" + ] + } + } + }, + "mqtt": { + "broker": "homeassistant.Not.mgeppert.com", + "port": 1883, + "username": "mgeppert", + "password": "mgeppert", + "topic_prefix": "tasmota/" + } +} \ No newline at end of file From e106dc50fd9b43f640b68b1dcee2eb94cfe95d68 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Tue, 15 Jul 2025 09:46:16 -0500 Subject: [PATCH 3/7] More features added --- TasmotaManager.py | 624 ++++++++++++++++++++++++++++++++----- network_configuration.json | 16 +- 2 files changed, 562 insertions(+), 78 deletions(-) diff --git a/TasmotaManager.py b/TasmotaManager.py index 58b0c4a..206f445 100644 --- a/TasmotaManager.py +++ b/TasmotaManager.py @@ -6,31 +6,74 @@ from datetime import datetime from typing import Optional import requests from urllib3.exceptions import InsecureRequestWarning +import re # Import the regular expression module +import telnetlib +import time +import argparse + +# Disable SSL warnings requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) class UnifiClient: - def __init__(self, host: str, username: str, password: str, site_id: str = 'default', ssl_verify: bool = True): - self.base_url = f"https://{host}" + def __init__(self, base_url, username, password, site_id, verify_ssl=True): + self.base_url = base_url.rstrip('/') + self.username = username + self.password = password self.site_id = site_id self.session = requests.Session() - self.session.verify = ssl_verify - self._login(username, password) + self.session.verify = verify_ssl + + # Initialize cookie jar + self.session.cookies.clear() - def _login(self, username: str, password: str) -> None: + def _login(self) -> requests.Response: # Changed return type annotation """Authenticate with the UniFi Controller.""" - login_url = f"{self.base_url}/api/login" - response = self.session.post( - login_url, - json={"username": username, "password": password} - ) - response.raise_for_status() + login_url = f"{self.base_url}/api/auth/login" + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + payload = { + "username": self.username, + "password": self.password, + "remember": False + } + try: + response = self.session.post( + login_url, + json=payload, + headers=headers + ) + response.raise_for_status() + if 'X-CSRF-Token' in response.headers: + self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] + return response # Return the response object + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response.status_code == 401: + raise Exception("Authentication failed. Please verify your username and password.") from e + raise def get_clients(self) -> list: """Get all clients from the UniFi Controller.""" - url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" - response = self.session.get(url) - response.raise_for_status() - return response.json().get('data', []) + # Try the newer API endpoint first + url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If the newer endpoint fails, try the legacy endpoint + url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If both fail, try the v2 API endpoint + url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) class TasmotaDiscovery: def __init__(self, debug: bool = False): @@ -63,54 +106,78 @@ class TasmotaDiscovery: self.logger.error("Invalid JSON in configuration file") sys.exit(1) - def setup_unifi_client(self) -> UnifiClient: - """Initialize UniFi client with configuration.""" + def setup_unifi_client(self): + """Set up the UniFi client with better error handling""" self.logger.debug("Setting up UniFi client") + + if not self.config or 'unifi' not in self.config: + raise ValueError("Missing UniFi configuration") + + unifi_config = self.config['unifi'] + required_fields = ['host', 'username', 'password', 'site'] + missing_fields = [field for field in required_fields if field not in unifi_config] + + if missing_fields: + raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") + try: - host = self.config['unifi']['host'] - # Remove https:// if present - if host.startswith('https://'): - host = host[8:] - elif host.startswith('http://'): - host = host[7:] - - self.logger.debug(f"Connecting to UniFi Controller at {host}") - + self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") self.unifi_client = UnifiClient( - host=host, - username=self.config['unifi']['username'], - password=self.config['unifi']['password'], - site_id=self.config['unifi'].get('site', 'default'), - ssl_verify=False + base_url=unifi_config['host'], + username=unifi_config['username'], + password=unifi_config['password'], + site_id=unifi_config['site'], + verify_ssl=False # Add this if using self-signed certificates ) + + # Test the connection by making a simple request + response = self.unifi_client._login() + if not response: + raise ConnectionError(f"Failed to connect to UniFi controller: No response") + self.logger.debug("UniFi client setup successful") - return self.unifi_client - except KeyError as e: - self.logger.error(f"Missing required UniFi configuration: {e}") - self.logger.debug("Connection details:") - self.logger.debug(f"Host: {self.config['unifi'].get('host', 'Not set')}") - self.logger.debug(f"Username: {self.config['unifi'].get('username', 'Not set')}") - self.logger.debug("Please verify your configuration file") - sys.exit(1) + except Exception as e: - self.logger.error(f"Error connecting to UniFi controller: {e}") - sys.exit(1) + self.logger.error(f"Error setting up UniFi client: {str(e)}") + raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") def is_tasmota_device(self, device: dict) -> bool: """Determine if a device is a Tasmota device.""" name = device.get('name', '').lower() hostname = device.get('hostname', '').lower() - self.logger.debug(f"Checking device: {name} ({hostname})") - matches = any([ - name.startswith('tasmota'), - name.startswith('sonoff'), - name.endswith('-ts'), - hostname.startswith('tasmota'), - hostname.startswith('sonoff') - ]) - if matches: - self.logger.debug(f"Found Tasmota device: {name}") - return matches + ip = device.get('ip', '') + + # Check if device is in the configured NoT network + network_filters = self.config['unifi'].get('network_filter', {}) + for network in network_filters.values(): + if ip.startswith(network['subnet']): + self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}") + + # First check exclusion patterns + exclude_patterns = network.get('exclude_patterns', []) + for pattern in exclude_patterns: + pattern = pattern.lower() + # Convert glob pattern to regex pattern + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") + return False + + # If not excluded, check if it's a Tasmota device + matches = any([ + name.startswith('tasmota'), + name.startswith('sonoff'), + name.endswith('-ts'), + hostname.startswith('tasmota'), + hostname.startswith('sonoff'), + hostname.startswith('esp-'), + any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch']) + ]) + if matches: + self.logger.debug(f"Found Tasmota device: {name}") + return True # Consider all non-excluded devices in NoT network as potential Tasmota devices + + return False def get_tasmota_devices(self) -> list: """Query UniFi controller and filter Tasmota devices.""" @@ -138,48 +205,459 @@ class TasmotaDiscovery: self.logger.error(f"Error getting devices from UniFi controller: {e}") return [] - def save_tasmota_config(self, devices: list, filename: str = "tasmota_devices.json") -> None: - """Save Tasmota device information to a JSON file.""" + def save_tasmota_config(self, devices: list) -> None: + """Save Tasmota device information to a JSON file with device tracking.""" + filename = "current.json" self.logger.debug(f"Saving Tasmota configuration to {filename}") + deprecated_filename = "deprecated.json" + + current_devices = [] + deprecated_devices = [] + + # Load existing devices if file exists + if os.path.exists(filename): + try: + with open(filename, 'r') as f: + existing_config = json.load(f) + current_devices = existing_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {filename}, treating as empty") + current_devices = [] + + # Load deprecated devices if file exists + if os.path.exists(deprecated_filename): + try: + with open(deprecated_filename, 'r') as f: + deprecated_config = json.load(f) + deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {deprecated_filename}, treating as empty") + deprecated_devices = [] + + # Create new config + new_devices = [] + moved_to_deprecated = [] + restored_from_deprecated = [] + removed_from_deprecated = [] + excluded_devices = [] + + # Check for excluded devices in current and deprecated lists + network_filters = self.config['unifi'].get('network_filter', {}) + exclude_patterns = [] + for network in network_filters.values(): + exclude_patterns.extend(network.get('exclude_patterns', [])) + + # Function to check if device is excluded + def is_device_excluded(device_name: str, hostname: str = '') -> bool: + name = device_name.lower() + hostname = hostname.lower() + for pattern in exclude_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + return True + return False + + # Process current devices + for device in devices: + device_name = device['name'] + device_hostname = device.get('hostname', '') + device_ip = device['ip'] + device_mac = device['mac'] + + # Check if device should be excluded + if is_device_excluded(device_name, device_hostname): + print(f"Device {device_name} excluded by pattern - skipping") + excluded_devices.append(device_name) + continue + + # Check in current devices + existing_device = next((d for d in current_devices + if d['name'] == device_name), None) + + if existing_device: + # Device exists, check if IP or MAC changed + if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: + moved_to_deprecated.append(existing_device) + new_devices.append(device) + print(f"Device {device_name} moved to deprecated (IP/MAC changed)") + else: + new_devices.append(existing_device) # Keep existing device + else: + # New device, check if it was in deprecated + deprecated_device = next((d for d in deprecated_devices + if d['name'] == device_name), None) + if deprecated_device: + removed_from_deprecated.append(device_name) + print(f"Device {device_name} removed from deprecated (restored)") + new_devices.append(device) + print(f"Device {device_name} added to output file") + + # Find devices that are no longer present + current_names = {d['name'] for d in devices} + for existing_device in current_devices: + if existing_device['name'] not in current_names: + if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): + moved_to_deprecated.append(existing_device) + print(f"Device {existing_device['name']} moved to deprecated (no longer present)") + + # Update deprecated devices list, excluding any excluded devices + final_deprecated = [] + for device in deprecated_devices: + if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): + final_deprecated.append(device) + elif is_device_excluded(device['name'], device.get('hostname', '')): + print(f"Device {device['name']} removed from deprecated (excluded by pattern)") + + final_deprecated.extend(moved_to_deprecated) + + # Save new configuration config = { "tasmota": { - "devices": devices, + "devices": new_devices, "generated_at": datetime.now().isoformat(), - "total_devices": len(devices) + "total_devices": len(new_devices) } } - try: - if os.path.exists(filename): + # Save deprecated configuration + deprecated_config = { + "tasmota": { + "devices": final_deprecated, + "generated_at": datetime.now().isoformat(), + "total_devices": len(final_deprecated) + } + } + + # Backup existing file if it exists + if os.path.exists(filename): + try: backup_name = f"{filename}.backup" os.rename(filename, backup_name) self.logger.info(f"Created backup of existing configuration as {backup_name}") - + except Exception as e: + self.logger.error(f"Error creating backup: {e}") + + # Save files + try: with open(filename, 'w') as f: json.dump(config, f, indent=4) - self.logger.info(f"Successfully saved {len(devices)} Tasmota devices to {filename}") + with open(deprecated_filename, 'w') as f: + json.dump(deprecated_config, f, indent=4) - print("\nFound Tasmota Devices:") - for device in devices: + self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") + self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") + + print("\nDevice Status Summary:") + if excluded_devices: + print("\nExcluded Devices:") + for name in excluded_devices: + print(f"- {name}") + + if moved_to_deprecated: + print("\nMoved to deprecated:") + for device in moved_to_deprecated: + print(f"- {device['name']}") + + if removed_from_deprecated: + print("\nRestored from deprecated:") + for name in removed_from_deprecated: + print(f"- {name}") + + print("\nCurrent Tasmota Devices:") + for device in new_devices: print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") except Exception as e: - self.logger.error(f"Error saving Tasmota configuration: {e}") + self.logger.error(f"Error saving configuration: {e}") + + def get_device_details(self, use_current_json=True): + """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings""" + self.logger.info("Starting to gather detailed device information...") + device_details = [] + + try: + source_file = 'current.json' if use_current_json else 'tasmota.json' + with open(source_file, 'r') as f: + data = json.load(f) + devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(devices)} devices from {source_file}") + except FileNotFoundError: + self.logger.error(f"{source_file} not found. Run discovery first.") + return + except json.JSONDecodeError: + self.logger.error(f"Invalid JSON format in {source_file}") + return + + mqtt_config = self.config.get('mqtt', {}) + if not mqtt_config: + self.logger.error("MQTT configuration missing from config file") + return + + def check_mqtt_settings(ip, name, mqtt_status): + """Check and update MQTT settings if they don't match config""" + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + mqtt_fields = { + "Host": mqtt_config.get('Host', ''), + "Port": mqtt_config.get('Port', 1883), + "User": mqtt_config.get('User', ''), + "Password": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + device_mqtt = mqtt_status.get('MqttHost', {}) + changes_needed = [] + force_password_update = False + + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['Host']: + changes_needed.append(('MqttHost', mqtt_fields['Host'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['Port']: + changes_needed.append(('MqttPort', mqtt_fields['Port'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['User']: + changes_needed.append(('MqttUser', mqtt_fields['User'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['Password'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting + if mqtt_config.get('NoRetain', True): + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + + # Apply changes if needed + for setting, value in changes_needed: + try: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + else: + self.logger.error(f"{name}: Failed to update {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") + + return len(changes_needed) > 0 + + for device in devices: + if not isinstance(device, dict): + self.logger.warning(f"Skipping invalid device entry: {device}") + continue + + name = device.get('name', 'Unknown') + ip = device.get('ip') + mac = device.get('mac') + + if not ip: + self.logger.warning(f"Skipping device {name} - no IP address") + continue + + self.logger.info(f"Checking device: {name} at {ip}") + + try: + # Get Status 2 for firmware version + url_status = f"http://{ip}/cm?cmnd=Status%202" + response = requests.get(url_status, timeout=5) + status_data = response.json() + + # Get Status 5 for network info + url_network = f"http://{ip}/cm?cmnd=Status%205" + response = requests.get(url_network, timeout=5) + network_data = response.json() + + # Get Status 6 for MQTT info + url_mqtt = f"http://{ip}/cm?cmnd=Status%206" + response = requests.get(url_mqtt, timeout=5) + mqtt_data = response.json() + + # Check and update MQTT settings if needed + mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) + + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), + "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), + "mqtt_status": "Updated" if mqtt_updated else "Verified", + "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), + "status": "online" + } + self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": "Unknown", + "status": "offline", + "error": str(e) + } + + device_details.append(device_detail) + time.sleep(0.5) + + # Save all device details at once + try: + with open('TasmotaDevices.json', 'w') as f: + json.dump(device_details, f, indent=2) + self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") + except Exception as e: + self.logger.error(f"Error saving device details: {e}") + + def check_mqtt_settings(ip, name, mqtt_status): + """Check and update MQTT settings if they don't match config""" + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + mqtt_fields = { + "Host": mqtt_config.get('Host', ''), + "Port": mqtt_config.get('Port', 1883), + "User": mqtt_config.get('User', ''), + "Password": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + device_mqtt = mqtt_status.get('MqttHost', {}) + changes_needed = [] + force_password_update = False + + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['Host']: + changes_needed.append(('MqttHost', mqtt_fields['Host'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['Port']: + changes_needed.append(('MqttPort', mqtt_fields['Port'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['User']: + changes_needed.append(('MqttUser', mqtt_fields['User'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['Password'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting + if mqtt_config.get('NoRetain', True): + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + + # Apply changes if needed + for setting, value in changes_needed: + try: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + else: + self.logger.error(f"{name}: Failed to update {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") + + return len(changes_needed) > 0 def main(): - import argparse - parser = argparse.ArgumentParser(description='Discover Tasmota devices on UniFi network') - parser.add_argument('--debug', action='store_true', help='Enable debug logging') - parser.add_argument('--config', type=str, help='Path to configuration file') - parser.add_argument('--output', type=str, default='tasmota_devices.json', - help='Output file for device list (default: tasmota_devices.json)') + parser = argparse.ArgumentParser(description='Tasmota Device Manager') + parser.add_argument('--config', default='network_configuration.json', + help='Path to configuration file') + parser.add_argument('--debug', action='store_true', + help='Enable debug logging') + parser.add_argument('--skip-unifi', action='store_true', + help='Skip UniFi discovery and use existing current.json') + args = parser.parse_args() - + + # Set up logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + print("Starting Tasmota Device Discovery and Version Check...") + + # Create TasmotaDiscovery instance discovery = TasmotaDiscovery(debug=args.debug) discovery.load_config(args.config) - discovery.setup_unifi_client() - devices = discovery.get_tasmota_devices() - discovery.save_tasmota_config(devices, args.output) + + try: + if not args.skip_unifi: + print("Step 1: Discovering Tasmota devices...") + discovery.setup_unifi_client() + tasmota_devices = discovery.get_tasmota_devices() + discovery.save_tasmota_config(tasmota_devices) + else: + print("Skipping UniFi discovery, using existing current.json...") + + print("\nStep 2: Getting detailed version information...") + discovery.get_device_details(use_current_json=True) + + print("\nProcess completed successfully!") + print("- Device list saved to: current.json") + print("- Detailed information saved to: TasmotaDevices.json") + + except ConnectionError as e: + print(f"Connection Error: {str(e)}") + print("\nTrying to proceed with existing current.json...") + try: + discovery.get_device_details(use_current_json=True) + print("\nSuccessfully retrieved device details from existing current.json") + except Exception as inner_e: + print(f"Error processing existing devices: {str(inner_e)}") + return 1 + except Exception as e: + print(f"Error: {str(e)}") + if args.debug: + import traceback + traceback.print_exc() + return 1 + + return 0 if __name__ == '__main__': main() \ No newline at end of file diff --git a/network_configuration.json b/network_configuration.json index cffc988..c5484f1 100644 --- a/network_configuration.json +++ b/network_configuration.json @@ -11,15 +11,21 @@ "exclude_patterns": [ "homeassistant*", "*sonos*" + ], + "unknown_device_patterns": [ + "tasmota*", + "ESP-*" ] } } }, "mqtt": { - "broker": "homeassistant.Not.mgeppert.com", - "port": 1883, - "username": "mgeppert", - "password": "mgeppert", - "topic_prefix": "tasmota/" + "Host": "homeassistant.Not.mgeppert.com", + "Port": 1883, + "User": "mgeppert", + "Password": "mgeppert", + "Topic": "%hostname_base%", + "FullTopic": "%prefix%/%topic%/", + "NoRetain": false } } \ No newline at end of file From 3342a3620fd164e22fcefccf91ebcfc4a7cf9c70 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Sat, 19 Jul 2025 15:49:53 -0500 Subject: [PATCH 4/7] Pushing saves --- README.md | 161 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 156 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 047477f..107fb8d 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,159 @@ -# Sample GitLab Project +# Tasmota Device Manager -This sample project shows how a project in GitLab looks for demonstration purposes. It contains issues, merge requests and Markdown files in many branches, -named and filled with lorem ipsum. +A tool for discovering, configuring, and managing Tasmota devices on a network. -You can look around to get an idea how to structure your project and, when done, you can safely delete this project. +## Device Discovery Script -[Learn more about creating GitLab projects.](https://docs.gitlab.com/ee/gitlab-basics/create-project.html) +The `discover_devices.py` script implements the Device Discovery process for Tasmota devices on a network. It connects to a Unifi Switch, retrieves a list of connected devices, and filters for potential Tasmota devices based on network_filter information in the config file. + +### Features + +- Connects to Unifi Switch using configured credentials +- Retrieves list of all connected devices +- Filters devices based on network and subnet configuration +- Classifies devices into three categories: + - Valid hostname devices (ready for configuration) + - Default hostname devices (need proper naming) + - Deprecated devices (no longer available) +- Generates JSON files for each device category +- Provides detailed logging +- Supports debug mode for troubleshooting + +### Prerequisites + +- Python 3.8+ +- Network access to Unifi Switch +- API Key for the Unifi Switch +- Network access to all potential Tasmota devices + +### Installation + +1. Clone this repository +2. Create and activate a virtual environment: + ``` + python -m venv .venv + source .venv/bin/activate # On Windows: .venv\Scripts\activate + ``` +3. Install dependencies: + ``` + pip install -r requirements.txt + ``` +4. Configure access credentials in `config.yaml` (see Configuration section) + +### Configuration + +Create a `config.yaml` file with the following structure: + +```yaml +unifi: + type: UDMSE # Unifi Dream Machine SE + host: https://192.168.6.1 + port: 8443 # Default Unifi Controller port + API_Key: "nIfTdZAXVUGQgyNqsATluTja-noaNLAk" # API Key for Unifi Controller + + network_filter: + NoT_network: + name: "NoT" + subnet: "192.168.8" + exclude_patterns: + - "homeassistant*" + - "*sonos*" + default_name_patterns: + - "tasmota*" + - "ESP-*" + +tasmota: + mqtt_settings: + host: "homeassistant.NoT.mgeppert.com" + port: 1883 + user: "mgeppert" + password: "mgeppert" + topic: "%hostname_base%" + full_topic: "%prefix%/%topic%/" + no_retain: false + +other_settings: # Yet to be defined +``` + +### Usage + +Run the device discovery script: + +``` +python discover_devices.py +``` + +#### Command-line Options + +- `-h, --help`: Show help message and exit +- `-d, --debug`: Enable debug mode with verbose logging +- `-c, --config`: Specify a custom config file path (default: config.yaml) + +### Output Files + +The script generates the following JSON files: + +- `valid_hostnames.json`: Devices with valid hostnames that can be configured +- `default_hostnames.json`: Devices with default hostnames that need proper naming +- `deprecated_hostnames.json`: Devices that were previously valid but are no longer available + +### Logging + +Logs are stored in the `logs/` directory: + +- `discovery.log`: Device discovery process logs + +Log level is set to INFO by default, or DEBUG when using the `-d` flag. + +## Next Steps + +After running the device discovery script: + +1. For devices with default hostnames, assign proper hostnames based on location and function +2. For devices with valid hostnames, proceed with configuration auditing and standardization + +## Testing + +The project includes scripts to verify and test the device discovery functionality without requiring the actual dependencies to be installed. + +### Syntax Verification + +To verify the syntax of the device discovery script without running it: + +``` +python verify_syntax.py +``` + +This will check that the script has valid Python syntax without executing it or requiring any dependencies. + +### Functional Testing + +To test the core functionality of the device discovery script with mocked dependencies: + +``` +python test_discovery.py +``` + +This script mocks the external dependencies (yaml, requests, urllib3) and tests: +- Pattern matching functionality +- Device filtering based on network and subnet +- Device classification into valid and default hostname groups + +The test script uses sample device data to simulate the discovery process and verify that the filtering and classification logic works correctly. + +## Troubleshooting + +### Common Issues + +- **No devices retrieved from Unifi Controller**: Check Unifi credentials and network connectivity +- **No devices match the filter criteria**: Check network_filter configuration in config.yaml +- **Error loading configuration**: Verify config.yaml syntax and structure +- **Missing dependencies**: Make sure to install all required packages from requirements.txt + +### Debug Mode + +Run the script with the `-d` flag to enable debug mode with verbose logging: + +``` +python discover_devices.py -d +``` From d4bdf43ad600fbc8546a4dd7e70b099066442ed3 Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Sun, 20 Jul 2025 12:10:05 -0500 Subject: [PATCH 5/7] Add discover_devices.py and config.yaml to repository --- config.yaml | 36 ++ discover_devices.py | 869 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 905 insertions(+) create mode 100644 config.yaml create mode 100644 discover_devices.py diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..3e42dac --- /dev/null +++ b/config.yaml @@ -0,0 +1,36 @@ +unifi: + type: UDMSE # Unifi Dream Machine SE + host: https://192.168.6.1 + username: Tasmota + password: TasmotaManager12!@ + port: 8443 # Default Unifi Controller port + site: "default" # Site ID for Unifi Controller + API_Key: "nIfTdZAXVUGQgyNqsATluTja-noaNLAk" # API Key for Unifi Controller + + network_filter: + NoT_network: + name: "NoT" + # Updated to support multiple subnets + subnets: + - "192.168.5" # Main network + - "192.168.7" # IoT network + - "192.168.8" # Tasmota network + - "192.168.9" # Camera network + exclude_patterns: + - "homeassistant*" + - "*sonos*" + default_name_patterns: + - "tasmota*" + - "ESP-*" + +tasmota: + mqtt_settings: + host: "homeassistant.NoT.mgeppert.com" + port: 1883 + user: "mgeppert" + password: "mgeppert" + topic: "%hostname_base%" + full_topic: "%prefix%/%topic%/" + no_retain: false + +other_settings: # Yet to be defined \ No newline at end of file diff --git a/discover_devices.py b/discover_devices.py new file mode 100644 index 0000000..baafa98 --- /dev/null +++ b/discover_devices.py @@ -0,0 +1,869 @@ +#!/usr/bin/env python3 +""" +Tasmota Device Discovery Script + +This script implements the Device Discovery process for Tasmota devices on a network. +It connects to a Unifi Switch, retrieves a list of connected devices, and filters for +potential Tasmota devices based on network_filter information in the config file. + +Usage: + python discover_devices.py [options] + +Options: + -h, --help Show this help message and exit + -d, --debug Enable debug mode with verbose logging + -c, --config Specify a custom config file path (default: config.yaml) +""" + +import argparse +import yaml +import logging +import os +import sys +import re +import json +from typing import Dict, List, Any, Optional, Tuple +import requests +from urllib3.exceptions import InsecureRequestWarning + +# Suppress only the single InsecureRequestWarning from urllib3 +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +# Set up logging +logger = logging.getLogger("tasmota_discovery") + +# File paths +VALID_HOSTNAMES_FILE = "valid_hostnames.json" +DEFAULT_HOSTNAMES_FILE = "default_hostnames.json" +DEPRECATED_HOSTNAMES_FILE = "deprecated_hostnames.json" +UNIDENTIFIED_DEVICES_FILE = "unidentified_devices.json" + +class UnifiClient: + """Client for interacting with the Unifi Controller API.""" + + def __init__(self, config: Dict[str, Any], debug: bool = False): + """ + Initialize the Unifi client with configuration. + + Args: + config: Dictionary containing Unifi configuration + debug: Whether to enable debug mode + """ + self.host = config.get('host') + self.port = config.get('port', 8443) + self.type = config.get('type', 'UDMSE') + self.site_id = config.get('site', 'default') # Site ID parameter from config + self.api_key = config.get('API_Key') # API Key for authentication + self.debug = debug + self.session = requests.Session() + self.base_url = self.host # Using host without port number as per issue requirement + self.is_authenticated = False + self.available_sites = [] # Will store available sites from the controller + self.application_version = "Unknown" # Will store the application version + + # Check for required configuration parameters + if not self.host: + raise ValueError("Missing required Unifi host parameter") + + # Set up API key authentication + if self.api_key: + logger.debug("API Key authentication will be used") + # Add API key to session headers + self.session.headers.update({ + 'X-API-KEY': self.api_key, + 'Accept': 'application/json' + }) + self.is_authenticated = True # With API key, we're pre-authenticated + + # Retrieve and validate the site ID + self.retrieve_available_sites() + + # Get the application version + self.get_application_version() + else: + raise ValueError("Missing required Unifi API_Key parameter") + + + def retrieve_available_sites(self) -> bool: + """ + Retrieve available sites from the Unifi Controller and validate the configured site ID. + + Returns: + bool: True if site ID is valid, False otherwise + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return False + + try: + # Try different API endpoints for retrieving sites + endpoints = [ + "/proxy/network/integration/v1/sites", # Integration API endpoint + "/proxy/network/api/self/sites", # Newer API endpoint + "/api/self/sites", # Legacy API endpoint + "/v2/api/sites" # v2 API endpoint + ] + + logger.info("Attempting to retrieve available sites from Unifi Controller") + + for endpoint in endpoints: + sites_url = f"{self.base_url}{endpoint}" + logger.debug(f"Trying sites API endpoint: {sites_url}") + + try: + response = self.session.get(sites_url, verify=False, timeout=5) + + if response.status_code == 200: + sites_data = response.json() + + # Handle different response formats + if isinstance(sites_data, list): + self.available_sites = sites_data + elif isinstance(sites_data, dict) and 'data' in sites_data: + self.available_sites = sites_data.get('data', []) + else: + logger.debug(f"Unexpected response format from sites API endpoint: {sites_url}") + continue + + # Log the available sites + site_names = [site.get('name', site.get('desc', 'Unknown')) for site in self.available_sites] + site_ids = [site.get('id', site.get('name', 'Unknown')) for site in self.available_sites] + + logger.info(f"Retrieved {len(self.available_sites)} sites from Unifi Controller") + logger.debug(f"Available sites: {', '.join(site_names)}") + logger.debug(f"Available site IDs: {', '.join(site_ids)}") + + # Validate the configured site ID + valid_site = False + for site in self.available_sites: + site_id = site.get('id', site.get('name', '')) + if site_id == self.site_id: + valid_site = True + site_name = site.get('name', site.get('desc', 'Unknown')) + logger.info(f"Configured site ID '{self.site_id}' is valid (Site name: {site_name})") + break + + if not valid_site: + logger.warning(f"Configured site ID '{self.site_id}' not found in available sites") + if self.site_id == 'default' and len(self.available_sites) > 0: + # Try to use the first available site as default + first_site = self.available_sites[0] + self.site_id = first_site.get('id', first_site.get('name', 'default')) + site_name = first_site.get('name', first_site.get('desc', 'Unknown')) + logger.info(f"Using first available site as default: '{self.site_id}' (Site name: {site_name})") + valid_site = True + + return valid_site + else: + logger.debug(f"Sites API endpoint failed: {sites_url} - Status code: {response.status_code}") + + except Exception as e: + logger.debug(f"Error with sites API endpoint {sites_url}: {str(e)}") + + logger.error("Failed to retrieve available sites from Unifi Controller") + return False + + except Exception as e: + logger.error(f"Error retrieving available sites from Unifi Controller: {str(e)}") + return False + + def get_application_version(self) -> str: + """ + Retrieve the application version from the Unifi Controller. + + Returns: + str: The application version or hardware/model information if version not found + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return "Unknown" + + try: + # Try different API endpoints for retrieving application version + endpoints = [ + "/proxy/network/integration/v1/info", # Integration API info endpoint (from issue description) + "/proxy/network/api/s/{site_id}/status", # Site-specific status endpoint + "/proxy/network/api/status", # Network status endpoint + "/api/system", # System info endpoint + "/proxy/network/api/system", # Network system endpoint + "/v1/api/system/info", # v1 system info endpoint + "/proxy/protect/api/system", # Protect system endpoint + "/proxy/network/v1/api/system/info", # Network v1 system info + "/proxy/network/api/s/{site_id}/stat/sysinfo", # Site-specific system info + "/api/s/{site_id}/stat/sysinfo" # Legacy site-specific system info + ] + + logger.info("Attempting to retrieve application version from Unifi Controller") + + # Variables to store fallback information if no explicit version is found + hardware_info = None + model_name = None + + for endpoint in endpoints: + # Replace {site_id} placeholder with actual site_id if present + current_endpoint = endpoint.replace("{site_id}", self.site_id) + version_url = f"{self.base_url}{current_endpoint}" + logger.debug(f"Trying version API endpoint: {version_url}") + + try: + response = self.session.get(version_url, verify=False, timeout=5) + + if response.status_code == 200: + try: + version_data = response.json() + logger.debug(f"Response from {current_endpoint}: {version_data}") + + # Handle different response formats + # Try common paths where version information might be found + version = None + + # Check for version in meta.version + if isinstance(version_data, dict) and 'meta' in version_data and 'version' in version_data['meta']: + version = version_data['meta']['version'] + + # Check for version in data.version + elif isinstance(version_data, dict) and 'data' in version_data: + data = version_data['data'] + if isinstance(data, list) and len(data) > 0 and 'version' in data[0]: + version = data[0]['version'] + elif isinstance(data, dict) and 'version' in data: + version = data['version'] + + # Check for version directly in the response + elif isinstance(version_data, dict) and 'version' in version_data: + version = version_data['version'] + + # Check for firmware_version + elif isinstance(version_data, dict) and 'firmware_version' in version_data: + version = version_data['firmware_version'] + + # Check for firmware + elif isinstance(version_data, dict) and 'firmware' in version_data: + version = version_data['firmware'] + + # Check for controller.version + elif isinstance(version_data, dict) and 'controller' in version_data and 'version' in version_data['controller']: + version = version_data['controller']['version'] + + # Check for applicationVersion (from issue description) + elif isinstance(version_data, dict) and 'applicationVersion' in version_data: + version = version_data['applicationVersion'] + logger.debug(f"Found applicationVersion in response: {version}") + + # Store hardware and model information as fallback + if isinstance(version_data, dict): + # Check for hardware information + if 'hardware' in version_data and isinstance(version_data['hardware'], dict) and 'shortname' in version_data['hardware']: + hardware_info = version_data['hardware']['shortname'] + + # Check for model/name information + if 'name' in version_data: + model_name = version_data['name'] + + if version: + self.application_version = str(version) + logger.info(f"Retrieved application version from Unifi Controller: {self.application_version}") + return self.application_version + + except ValueError as json_error: + logger.debug(f"Invalid JSON response from endpoint: {version_url} - Error: {str(json_error)}") + # Try to extract version from raw response if JSON parsing fails + try: + raw_response = response.text + logger.debug(f"Raw response from {current_endpoint}: {raw_response[:200]}...") # Log first 200 chars + + # Look for version patterns in raw response + import re + version_match = re.search(r'version["\']?\s*:\s*["\']([^"\']+)["\']', raw_response, re.IGNORECASE) + if version_match: + version = version_match.group(1) + self.application_version = str(version) + logger.info(f"Retrieved application version from raw response: {self.application_version}") + return self.application_version + except Exception as raw_error: + logger.debug(f"Error processing raw response: {str(raw_error)}") + else: + logger.debug(f"Version API endpoint failed: {version_url} - Status code: {response.status_code}") + + except Exception as e: + logger.debug(f"Error with version API endpoint {version_url}: {str(e)}") + + # If we have hardware or model information, use that as a fallback + if hardware_info and model_name: + fallback_version = f"{hardware_info} - {model_name}" + self.application_version = fallback_version + logger.info(f"Using hardware and model information as version: {fallback_version}") + return fallback_version + elif hardware_info: + self.application_version = hardware_info + logger.info(f"Using hardware information as version: {hardware_info}") + return hardware_info + elif model_name: + self.application_version = model_name + logger.info(f"Using model name as version: {model_name}") + return model_name + + logger.warning("Failed to retrieve application version from Unifi Controller") + return "Unknown" + + except Exception as e: + logger.error(f"Error retrieving application version from Unifi Controller: {str(e)}") + return "Unknown" + + def get_devices(self) -> List[Dict[str, Any]]: + """ + Retrieve list of all connected devices from the Unifi Controller using the specified + site-specific integration API endpoint. Handles pagination to retrieve all devices. + + Returns: + List of dictionaries containing device information + """ + if not self.is_authenticated: + logger.error("Not authenticated with Unifi Controller. API key authentication is required.") + return [] + + try: + logger.info("Retrieving devices from Unifi Controller using site-specific integration API endpoint") + + # Ensure we have a valid site ID + if not self.available_sites: + logger.debug("No available sites found, attempting to retrieve sites") + if not self.retrieve_available_sites(): + logger.error("Failed to retrieve and validate site ID, cannot use site-specific endpoint") + return [] + + # Use the specific site-specific integration API endpoint from the issue description + # Format: /proxy/network/integration/v1/sites/{site_id}/clients + site_integration_url = f"{self.base_url}/proxy/network/integration/v1/sites/{self.site_id}/clients" + logger.debug(f"Using site-specific integration API endpoint: {site_integration_url}") + + # Initialize variables for pagination + all_devices = [] + offset = 0 + limit = 100 # Increase the limit to reduce the number of API calls + total_count = None + + # Loop until we've retrieved all devices + while True: + try: + # Add pagination parameters to the URL + paginated_url = f"{site_integration_url}?offset={offset}&limit={limit}" + logger.debug(f"Retrieving page with offset={offset}, limit={limit}") + + response = self.session.get(paginated_url, verify=False, timeout=10) # Increased timeout for larger responses + + if response.status_code == 200: + response_data = response.json() + + # Based on the test results, the response is a dictionary with keys: offset, limit, count, totalCount, data + # The 'data' field contains the actual client information + if isinstance(response_data, dict): + # Extract pagination information + current_offset = response_data.get('offset', 0) + current_limit = response_data.get('limit', 0) + current_count = response_data.get('count', 0) + + # Set total_count if not already set + if total_count is None: + total_count = response_data.get('totalCount', 0) + logger.info(f"Total number of devices: {total_count}") + + if 'data' in response_data: + devices = response_data.get('data', []) + logger.info(f"Retrieved {len(devices)} devices from page {offset//limit + 1} (offset={offset}, limit={limit})") + + # Log the first device for debugging (if available and first page) + if len(devices) > 0 and offset == 0: + logger.debug(f"First device sample: {devices[0]}") + logger.debug(f"Keys in first device: {', '.join(devices[0].keys())}") + + # Add devices to the all_devices list + all_devices.extend(devices) + + # Update offset for next page + offset += current_count + + # Check if we've retrieved all devices + if offset >= total_count or current_count == 0: + logger.info(f"Retrieved all {len(all_devices)} devices") + break + else: + logger.error("No 'data' field in response") + break + elif isinstance(response_data, list): + devices = response_data + logger.info(f"Retrieved {len(devices)} devices from Unifi Controller (direct list)") + + # Log the first device for debugging (if available and first page) + if len(devices) > 0 and offset == 0: + logger.debug(f"First device sample: {devices[0]}") + + # Add devices to the all_devices list + all_devices.extend(devices) + + # Since we don't have pagination information, we can't continue + break + else: + logger.error(f"Unexpected response format: {type(response_data)}") + logger.debug(f"Response content: {response_data}") + break + else: + logger.error(f"Site-specific integration API endpoint failed: {response.status_code}") + try: + error_content = response.text + logger.debug(f"Error response content: {error_content}") + except Exception: + pass + break + except Exception as e: + logger.error(f"Error with site-specific integration API endpoint: {str(e)}") + break + + # Transform the response to match the expected format for our application + logger.info(f"Transforming {len(all_devices)} devices to application format") + transformed_devices = [] + for device in all_devices: + transformed_device = { + 'hostname': device.get('name', ''), + 'ip': device.get('ipAddress', ''), + 'mac': device.get('macAddress', ''), + 'status': 'connected' if device.get('connectedAt') else 'disconnected', + 'id': device.get('id', ''), + 'type': device.get('type', ''), + 'connected_at': device.get('connectedAt', ''), + 'uplink_device_id': device.get('uplinkDeviceId', '') + } + transformed_devices.append(transformed_device) + + return transformed_devices + + except Exception as e: + logger.error(f"Error in get_devices method: {str(e)}") + return [] + + +class DeviceDiscovery: + """Main class for Tasmota device discovery.""" + + def __init__(self, config_path: str, debug: bool = False): + """ + Initialize the device discovery with configuration. + + Args: + config_path: Path to the configuration file + debug: Whether to enable debug mode + """ + self.config_path = config_path + self.debug = debug + self.config = self._load_config() + self.unifi_client = UnifiClient(self.config.get('unifi', {}), debug) + + def _load_config(self) -> Dict[str, Any]: + """ + Load configuration from YAML file. + + Returns: + Dictionary containing configuration + """ + try: + with open(self.config_path, 'r') as file: + config = yaml.safe_load(file) + logger.debug(f"Loaded configuration from {self.config_path}") + return config + except Exception as e: + logger.error(f"Error loading configuration from {self.config_path}: {str(e)}") + sys.exit(1) + + def _match_pattern(self, hostname: str, patterns: List[str]) -> bool: + """ + Check if hostname matches any of the given patterns. + + Args: + hostname: Hostname to check + patterns: List of glob patterns to match against + + Returns: + True if hostname matches any pattern, False otherwise + """ + if not hostname or not patterns: + return False + + for pattern in patterns: + # Convert glob pattern to regex + regex_pattern = pattern.replace("*", ".*") + if re.match(f"^{regex_pattern}$", hostname, re.IGNORECASE): + return True + return False + + def filter_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Filter devices based on network_filter criteria. + + Args: + devices: List of devices from Unifi Controller + + Returns: + Tuple of (included_devices, excluded_devices, unidentified_devices) + """ + included_devices = [] + excluded_devices = [] + unidentified_devices = [] + + # Track devices that have been processed to avoid duplicates + processed_devices = set() + + network_filters = self.config.get('unifi', {}).get('network_filter', {}) + + for network_name, network_config in network_filters.items(): + # Support both old (subnet) and new (subnets) configuration formats + subnet = network_config.get('subnet') + subnets = network_config.get('subnets', []) + + # If subnet is specified but subnets is not, add subnet to subnets for backward compatibility + if subnet and not subnets: + subnets = [subnet] + + exclude_patterns = network_config.get('exclude_patterns', []) + + logger.debug(f"Filtering devices for network {network_name} with subnets {subnets}") + + for device in devices: + # Skip devices that have already been processed + device_id = device.get('id') + if device_id in processed_devices: + continue + + ip = device.get('ip') + hostname = device.get('hostname', '') + + # Check if device is in any of the specified subnets + in_subnet = False + matching_subnet = None + + if ip: + for subnet_prefix in subnets: + if ip.startswith(subnet_prefix): + in_subnet = True + matching_subnet = subnet_prefix + break + + if not in_subnet: + # Add to unidentified devices if not in any of the specified subnets + logger.debug(f"Unidentified device {hostname} ({ip}) - not in any configured subnet") + unidentified_devices.append(device) + processed_devices.add(device_id) + continue + + # Check if device should be excluded + if self._match_pattern(hostname, exclude_patterns): + logger.debug(f"Excluding device {hostname} ({ip}) - matches exclude pattern") + excluded_devices.append(device) + else: + logger.debug(f"Including device {hostname} ({ip}) - matched subnet {matching_subnet}") + included_devices.append(device) + + # Mark device as processed + processed_devices.add(device_id) + + logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})") + return included_devices, excluded_devices, unidentified_devices + + def classify_devices(self, devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Classify devices into valid hostname and default hostname groups. + + Args: + devices: List of filtered devices + + Returns: + Tuple of (valid_hostname_devices, default_hostname_devices) + """ + valid_hostname_devices = [] + default_hostname_devices = [] + + network_filters = self.config.get('unifi', {}).get('network_filter', {}) + + for device in devices: + hostname = device.get('hostname', '') + + # Check if hostname matches any default name pattern + is_default = False + for network_name, network_config in network_filters.items(): + default_patterns = network_config.get('default_name_patterns', []) + if self._match_pattern(hostname, default_patterns): + logger.debug(f"Device {hostname} matches default name pattern") + default_hostname_devices.append(device) + is_default = True + break + + if not is_default: + logger.debug(f"Device {hostname} has valid hostname") + valid_hostname_devices.append(device) + + logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default") + return valid_hostname_devices, default_hostname_devices + + def process_existing_files(self, valid_devices: List[Dict[str, Any]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]: + """ + Process existing hostname files and update them based on current devices. + + Args: + valid_devices: List of devices with valid hostnames + + Returns: + Tuple of (updated_valid_devices, deprecated_devices) + """ + existing_valid_devices = [] + deprecated_devices = [] + + # Load existing valid hostnames if file exists + if os.path.exists(VALID_HOSTNAMES_FILE): + try: + with open(VALID_HOSTNAMES_FILE, 'r') as file: + existing_valid_devices = json.load(file) + logger.debug(f"Loaded {len(existing_valid_devices)} existing valid hostnames") + except Exception as e: + logger.error(f"Error loading existing valid hostnames: {str(e)}") + existing_valid_devices = [] + + # Delete deprecated file if it exists + if os.path.exists(DEPRECATED_HOSTNAMES_FILE): + try: + os.remove(DEPRECATED_HOSTNAMES_FILE) + logger.debug("Deleted existing deprecated hostnames file") + except Exception as e: + logger.error(f"Error deleting deprecated hostnames file: {str(e)}") + + if not existing_valid_devices: + # If no existing valid hostnames, just return the current valid devices + return valid_devices, [] + + # Create lookup dictionaries for faster processing + current_devices_by_hostname = {device.get('hostname'): device for device in valid_devices if device.get('hostname')} + existing_devices_by_hostname = {device.get('hostname'): device for device in existing_valid_devices if device.get('hostname')} + + updated_valid_devices = [] + + # Process current devices + for hostname, device in current_devices_by_hostname.items(): + if hostname in existing_devices_by_hostname: + # Device exists in both current and existing lists + existing_device = existing_devices_by_hostname[hostname] + + # Check if any fields have changed + changed_fields = [] + for key, value in device.items(): + if key in existing_device and existing_device[key] != value: + changed_fields.append(key) + + if changed_fields: + logger.info(f"Device {hostname} has changed fields: {', '.join(changed_fields)}") + + # Use the current device data + updated_valid_devices.append(device) + else: + # New device not in existing list + logger.info(f"New device found: {hostname}") + updated_valid_devices.append(device) + + # Find deprecated devices (in existing but not in current) + for hostname, device in existing_devices_by_hostname.items(): + if hostname not in current_devices_by_hostname: + logger.info(f"Device {hostname} is no longer available, marking as deprecated") + deprecated_devices.append(device) + + return updated_valid_devices, deprecated_devices + + def save_device_files(self, valid_devices: List[Dict[str, Any]], default_devices: List[Dict[str, Any]], + deprecated_devices: List[Dict[str, Any]]) -> None: + """ + Save device information to respective files. + + Args: + valid_devices: List of devices with valid hostnames + default_devices: List of devices with default hostnames + deprecated_devices: List of devices that are deprecated + """ + try: + with open(VALID_HOSTNAMES_FILE, 'w') as file: + json.dump(valid_devices, file, indent=2) + logger.info(f"Saved {len(valid_devices)} valid hostnames to {VALID_HOSTNAMES_FILE}") + + with open(DEFAULT_HOSTNAMES_FILE, 'w') as file: + json.dump(default_devices, file, indent=2) + logger.info(f"Saved {len(default_devices)} default hostnames to {DEFAULT_HOSTNAMES_FILE}") + + if deprecated_devices: + with open(DEPRECATED_HOSTNAMES_FILE, 'w') as file: + json.dump(deprecated_devices, file, indent=2) + logger.info(f"Saved {len(deprecated_devices)} deprecated hostnames to {DEPRECATED_HOSTNAMES_FILE}") + except Exception as e: + logger.error(f"Error saving device files: {str(e)}") + + def save_unidentified_devices(self, unidentified_devices: List[Dict[str, Any]]) -> None: + """ + Save unidentified devices to a file when debug mode is enabled. + + Args: + unidentified_devices: List of devices that were not identified during filtering + """ + if not self.debug: + return + + if not unidentified_devices: + logger.debug("No unidentified devices to save") + return + + try: + with open(UNIDENTIFIED_DEVICES_FILE, 'w') as file: + json.dump(unidentified_devices, file, indent=2) + logger.info(f"Saved {len(unidentified_devices)} unidentified devices to {UNIDENTIFIED_DEVICES_FILE}") + except Exception as e: + logger.error(f"Error saving unidentified devices file: {str(e)}") + + def discover(self) -> None: + """ + Run the device discovery process. + + This method retrieves devices from the Unifi Controller using the site-specific + integration API endpoint, filters and classifies them, and then processes them + against existing device files. + """ + logger.info("Starting Tasmota device discovery") + + # Get devices from Unifi Controller using the site-specific integration API endpoint + devices = self.unifi_client.get_devices() + + if not devices: + logger.warning("No devices retrieved from Unifi Controller") + + # Try to use existing valid_hostnames.json file as fallback + if os.path.exists(VALID_HOSTNAMES_FILE): + try: + with open(VALID_HOSTNAMES_FILE, 'r') as file: + valid_hostname_devices = json.load(file) + logger.info(f"Loaded {len(valid_hostname_devices)} devices from {VALID_HOSTNAMES_FILE} as fallback") + + # Process existing files and update device lists + updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices) + + # Initialize default_hostname_devices as an empty list + default_hostname_devices = [] + + # Save device information to files + self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices) + + logger.info("Device discovery completed successfully using fallback method") + return + except Exception as e: + logger.error(f"Error loading {VALID_HOSTNAMES_FILE}: {str(e)}") + logger.warning("Cannot proceed with device discovery") + return + else: + logger.warning("No valid hostnames file found for fallback") + logger.warning("Cannot proceed with device discovery") + return + + logger.info(f"Retrieved {len(devices)} devices from Unifi Controller") + + # Filter devices based on network_filter criteria + included_devices, excluded_devices, unidentified_devices = self.filter_devices(devices) + + # Save unidentified devices to a file when debug mode is enabled + self.save_unidentified_devices(unidentified_devices) + + if not included_devices: + logger.warning("No devices match the filter criteria") + return + + logger.info(f"Filtered {len(included_devices)} devices (excluded {len(excluded_devices)}, unidentified {len(unidentified_devices)})") + + # Classify devices into valid and default hostname groups + valid_hostname_devices, default_hostname_devices = self.classify_devices(included_devices) + + logger.info(f"Classified devices: {len(valid_hostname_devices)} valid, {len(default_hostname_devices)} default") + + # Process existing files and update device lists + updated_valid_devices, deprecated_devices = self.process_existing_files(valid_hostname_devices) + + # Save device information to files + self.save_device_files(updated_valid_devices, default_hostname_devices, deprecated_devices) + + logger.info("Device discovery completed successfully") + + +def setup_logging(debug: bool) -> None: + """ + Set up logging configuration. + + Args: + debug: Whether to enable debug mode + """ + log_level = logging.DEBUG if debug else logging.INFO + log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + # Create logs directory if it doesn't exist + os.makedirs('logs', exist_ok=True) + + # Configure file handler + file_handler = logging.FileHandler('logs/discovery.log') + file_handler.setLevel(log_level) + file_handler.setFormatter(logging.Formatter(log_format)) + + # Configure console handler + console_handler = logging.StreamHandler() + console_handler.setLevel(log_level) + console_handler.setFormatter(logging.Formatter(log_format)) + + # Configure root logger + root_logger = logging.getLogger() + root_logger.setLevel(log_level) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # Configure tasmota_discovery logger + logger.setLevel(log_level) + + if debug: + logger.debug("Debug mode enabled") + + +def parse_arguments() -> argparse.Namespace: + """ + Parse command-line arguments. + + Returns: + Parsed arguments + """ + parser = argparse.ArgumentParser(description='Tasmota Device Discovery') + parser.add_argument('-d', '--debug', action='store_true', help='Enable debug mode with verbose logging') + parser.add_argument('-c', '--config', default='config.yaml', help='Specify a custom config file path') + return parser.parse_args() + + +def main() -> None: + """ + Main entry point for the script. + """ + args = parse_arguments() + + # Set up logging + setup_logging(args.debug) + + try: + # Initialize device discovery + discovery = DeviceDiscovery(args.config, args.debug) + + # Run discovery process + discovery.discover() + + except Exception as e: + logger.error(f"Error during device discovery: {str(e)}") + if args.debug: + import traceback + logger.debug(traceback.format_exc()) + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file From 73f7acfd8c3b61677eae4eb0089326f75b20026e Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Sun, 3 Aug 2025 21:49:38 -0500 Subject: [PATCH 6/7] Prepare for GitLab migration: Add README, .gitignore, and update TasmotaManager.py --- .gitignore | 35 +++++++++++++++++ README.md | 96 +++++++++++++++++++++++++++++++++++++++++++++++ TasmotaManager.py | 1 - 3 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 README.md diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..277dfc2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Python bytecode files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +dist/ +build/ +*.egg-info/ + +# Virtual environments +venv/ +env/ +ENV/ + +# IDE files +.idea/ +.vscode/ +*.swp +*.swo + +# Logs +*.log + +# Local configuration that might contain sensitive information +network_configuration.json + +# Backup files +*.backup + +# Generated data files with sensitive network information +current.json +deprecated.json +TasmotaDevices.json +*.json.backup \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..08d2fe2 --- /dev/null +++ b/README.md @@ -0,0 +1,96 @@ +# TasmotaManager + +A Python utility for discovering, monitoring, and managing Tasmota devices on a network using UniFi Controller. + +## Features + +- Discovers Tasmota devices on the network via UniFi Controller API +- Tracks device changes over time (new, moved, deprecated devices) +- Checks and updates MQTT settings on Tasmota devices +- Generates detailed device information including firmware versions + +## Requirements + +- Python 3.6+ +- UniFi Controller with API access +- Network with Tasmota devices + +## Dependencies + +- requests +- urllib3 +- Standard library modules (json, logging, os, sys, datetime, re, time, argparse) + +## Installation + +1. Clone this repository +2. Install required packages: + ```bash + pip install requests urllib3 + ``` +3. Create a configuration file (see below) + +## Configuration + +Create a `network_configuration.json` file with the following structure: + +```json +{ + "unifi": { + "host": "https://your-unifi-controller.local", + "username": "your-username", + "password": "your-password", + "site": "default", + "network_filter": { + "network_name": { + "name": "Human-readable name", + "subnet": "192.168.1", + "exclude_patterns": [ + "device-to-exclude*" + ], + "unknown_device_patterns": [ + "tasmota*", + "ESP-*" + ] + } + } + }, + "mqtt": { + "Host": "mqtt-broker.local", + "Port": 1883, + "User": "mqtt-user", + "Password": "mqtt-password", + "Topic": "%hostname_base%", + "FullTopic": "%prefix%/%topic%/", + "NoRetain": false + } +} +``` + +## Usage + +Basic usage: +```bash +python TasmotaManager.py +``` + +With options: +```bash +python TasmotaManager.py --config custom_config.json --debug --skip-unifi +``` + +Command-line options: +- `--config`: Path to configuration file (default: network_configuration.json) +- `--debug`: Enable debug logging +- `--skip-unifi`: Skip UniFi discovery and use existing current.json + +## Output Files + +The script generates several output files: +- `current.json`: List of currently active Tasmota devices +- `deprecated.json`: Devices that were previously active but are no longer present +- `TasmotaDevices.json`: Detailed information about each device + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. \ No newline at end of file diff --git a/TasmotaManager.py b/TasmotaManager.py index 206f445..92081c4 100644 --- a/TasmotaManager.py +++ b/TasmotaManager.py @@ -7,7 +7,6 @@ from typing import Optional import requests from urllib3.exceptions import InsecureRequestWarning import re # Import the regular expression module -import telnetlib import time import argparse From b57a0ffa88808414bdec4f03ecc30d543ca0436a Mon Sep 17 00:00:00 2001 From: Mike Geppert Date: Sun, 3 Aug 2025 22:22:24 -0500 Subject: [PATCH 7/7] saving files --- GITLAB_MIGRATION.md | 108 +++++++++++++++++++++++++++++++++++++ network_configuration.json | 2 +- 2 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 GITLAB_MIGRATION.md diff --git a/GITLAB_MIGRATION.md b/GITLAB_MIGRATION.md new file mode 100644 index 0000000..6bb1686 --- /dev/null +++ b/GITLAB_MIGRATION.md @@ -0,0 +1,108 @@ +# GitLab Migration Instructions + +This document provides instructions for completing the migration of the TasmotaManager project to GitLab at 192.168.5.10. + +## Completed Steps + +The following steps have already been completed: + +1. Git repository initialized in the TasmotaManager directory +2. `.gitignore` file created to exclude sensitive files: + - `network_configuration.json` (contains credentials) + - Data files with network information (`current.json`, `deprecated.json`, `TasmotaDevices.json`) + - Backup files +3. `README.md` file created with project documentation +4. Non-sensitive files added and committed to the local Git repository: + - `TasmotaManager.py` + - `.gitignore` + - `README.md` + +## Remaining Steps + +To complete the migration to GitLab, follow these steps: + +### Option 1: Create a new repository in GitLab + +1. Log in to GitLab at http://192.168.5.10 +2. Navigate to the Manager project/group +3. Create a new project named "TasmotaManager" +4. Follow the instructions provided by GitLab to push an existing repository: + +```bash +# If using HTTP +git remote add gitlab http://192.168.5.10/Manager/TasmotaManager.git +git push -u gitlab inital + +# If using SSH +git remote add gitlab git@192.168.5.10:Manager/TasmotaManager.git +git push -u gitlab inital +``` + +### Option 2: Add as a subproject to the existing Manager project + +If TasmotaManager should be a subproject or subdirectory within the Manager project: + +1. Clone the Manager project: +```bash +git clone http://192.168.5.10/Manager.git +``` + +2. Create a TasmotaManager directory within the Manager project: +```bash +cd Manager +mkdir TasmotaManager +``` + +3. Copy the files from the local TasmotaManager repository: +```bash +cp /home/mgeppert/git_work/scripts/TasmotaManager/TasmotaManager.py TasmotaManager/ +cp /home/mgeppert/git_work/scripts/TasmotaManager/.gitignore TasmotaManager/ +cp /home/mgeppert/git_work/scripts/TasmotaManager/README.md TasmotaManager/ +``` + +4. Add, commit, and push the changes: +```bash +git add TasmotaManager +git commit -m "Add TasmotaManager as a subproject" +git push +``` + +### Option 3: Add as a Git submodule + +If TasmotaManager should be maintained as a separate repository but included in the Manager project: + +1. First, create the TasmotaManager repository in GitLab (see Option 1) +2. Clone the Manager project: +```bash +git clone http://192.168.5.10/Manager.git +``` + +3. Add the TasmotaManager as a submodule: +```bash +cd Manager +git submodule add http://192.168.5.10/TasmotaManager.git TasmotaManager +git commit -m "Add TasmotaManager as a submodule" +git push +``` + +## Configuration Files + +Remember that sensitive configuration files are excluded from version control. After cloning or setting up the repository, you'll need to: + +1. Create a `network_configuration.json` file with your UniFi Controller and MQTT settings +2. Run the script to generate the data files: +```bash +python TasmotaManager.py +``` + +## Troubleshooting + +If you encounter authentication issues when pushing to GitLab: + +1. Ensure you have the correct access rights to the repository +2. Try using a personal access token instead of password authentication: +```bash +git remote set-url gitlab http://username:token@192.168.5.10/Manager/TasmotaManager.git +``` + +3. If using SSH, ensure your SSH key is added to your GitLab account \ No newline at end of file diff --git a/network_configuration.json b/network_configuration.json index c5484f1..fe267e8 100644 --- a/network_configuration.json +++ b/network_configuration.json @@ -20,7 +20,7 @@ } }, "mqtt": { - "Host": "homeassistant.Not.mgeppert.com", + "Host": "homeassistant.NoT.mgeppert.com", "Port": 1883, "User": "mgeppert", "Password": "mgeppert",