diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..277dfc2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +# Python bytecode files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +dist/ +build/ +*.egg-info/ + +# Virtual environments +venv/ +env/ +ENV/ + +# IDE files +.idea/ +.vscode/ +*.swp +*.swo + +# Logs +*.log + +# Local configuration that might contain sensitive information +network_configuration.json + +# Backup files +*.backup + +# Generated data files with sensitive network information +current.json +deprecated.json +TasmotaDevices.json +*.json.backup \ No newline at end of file diff --git a/GITLAB_MIGRATION.md b/GITLAB_MIGRATION.md new file mode 100644 index 0000000..6bb1686 --- /dev/null +++ b/GITLAB_MIGRATION.md @@ -0,0 +1,108 @@ +# GitLab Migration Instructions + +This document provides instructions for completing the migration of the TasmotaManager project to GitLab at 192.168.5.10. + +## Completed Steps + +The following steps have already been completed: + +1. Git repository initialized in the TasmotaManager directory +2. `.gitignore` file created to exclude sensitive files: + - `network_configuration.json` (contains credentials) + - Data files with network information (`current.json`, `deprecated.json`, `TasmotaDevices.json`) + - Backup files +3. `README.md` file created with project documentation +4. Non-sensitive files added and committed to the local Git repository: + - `TasmotaManager.py` + - `.gitignore` + - `README.md` + +## Remaining Steps + +To complete the migration to GitLab, follow these steps: + +### Option 1: Create a new repository in GitLab + +1. Log in to GitLab at http://192.168.5.10 +2. Navigate to the Manager project/group +3. Create a new project named "TasmotaManager" +4. Follow the instructions provided by GitLab to push an existing repository: + +```bash +# If using HTTP +git remote add gitlab http://192.168.5.10/Manager/TasmotaManager.git +git push -u gitlab inital + +# If using SSH +git remote add gitlab git@192.168.5.10:Manager/TasmotaManager.git +git push -u gitlab inital +``` + +### Option 2: Add as a subproject to the existing Manager project + +If TasmotaManager should be a subproject or subdirectory within the Manager project: + +1. Clone the Manager project: +```bash +git clone http://192.168.5.10/Manager.git +``` + +2. Create a TasmotaManager directory within the Manager project: +```bash +cd Manager +mkdir TasmotaManager +``` + +3. Copy the files from the local TasmotaManager repository: +```bash +cp /home/mgeppert/git_work/scripts/TasmotaManager/TasmotaManager.py TasmotaManager/ +cp /home/mgeppert/git_work/scripts/TasmotaManager/.gitignore TasmotaManager/ +cp /home/mgeppert/git_work/scripts/TasmotaManager/README.md TasmotaManager/ +``` + +4. Add, commit, and push the changes: +```bash +git add TasmotaManager +git commit -m "Add TasmotaManager as a subproject" +git push +``` + +### Option 3: Add as a Git submodule + +If TasmotaManager should be maintained as a separate repository but included in the Manager project: + +1. First, create the TasmotaManager repository in GitLab (see Option 1) +2. Clone the Manager project: +```bash +git clone http://192.168.5.10/Manager.git +``` + +3. Add the TasmotaManager as a submodule: +```bash +cd Manager +git submodule add http://192.168.5.10/TasmotaManager.git TasmotaManager +git commit -m "Add TasmotaManager as a submodule" +git push +``` + +## Configuration Files + +Remember that sensitive configuration files are excluded from version control. After cloning or setting up the repository, you'll need to: + +1. Create a `network_configuration.json` file with your UniFi Controller and MQTT settings +2. Run the script to generate the data files: +```bash +python TasmotaManager.py +``` + +## Troubleshooting + +If you encounter authentication issues when pushing to GitLab: + +1. Ensure you have the correct access rights to the repository +2. Try using a personal access token instead of password authentication: +```bash +git remote set-url gitlab http://username:token@192.168.5.10/Manager/TasmotaManager.git +``` + +3. If using SSH, ensure your SSH key is added to your GitLab account \ No newline at end of file diff --git a/README.md b/README.md index 047477f..08d2fe2 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,96 @@ -# Sample GitLab Project +# TasmotaManager -This sample project shows how a project in GitLab looks for demonstration purposes. It contains issues, merge requests and Markdown files in many branches, -named and filled with lorem ipsum. +A Python utility for discovering, monitoring, and managing Tasmota devices on a network using UniFi Controller. -You can look around to get an idea how to structure your project and, when done, you can safely delete this project. +## Features -[Learn more about creating GitLab projects.](https://docs.gitlab.com/ee/gitlab-basics/create-project.html) +- Discovers Tasmota devices on the network via UniFi Controller API +- Tracks device changes over time (new, moved, deprecated devices) +- Checks and updates MQTT settings on Tasmota devices +- Generates detailed device information including firmware versions + +## Requirements + +- Python 3.6+ +- UniFi Controller with API access +- Network with Tasmota devices + +## Dependencies + +- requests +- urllib3 +- Standard library modules (json, logging, os, sys, datetime, re, time, argparse) + +## Installation + +1. Clone this repository +2. Install required packages: + ```bash + pip install requests urllib3 + ``` +3. Create a configuration file (see below) + +## Configuration + +Create a `network_configuration.json` file with the following structure: + +```json +{ + "unifi": { + "host": "https://your-unifi-controller.local", + "username": "your-username", + "password": "your-password", + "site": "default", + "network_filter": { + "network_name": { + "name": "Human-readable name", + "subnet": "192.168.1", + "exclude_patterns": [ + "device-to-exclude*" + ], + "unknown_device_patterns": [ + "tasmota*", + "ESP-*" + ] + } + } + }, + "mqtt": { + "Host": "mqtt-broker.local", + "Port": 1883, + "User": "mqtt-user", + "Password": "mqtt-password", + "Topic": "%hostname_base%", + "FullTopic": "%prefix%/%topic%/", + "NoRetain": false + } +} +``` + +## Usage + +Basic usage: +```bash +python TasmotaManager.py +``` + +With options: +```bash +python TasmotaManager.py --config custom_config.json --debug --skip-unifi +``` + +Command-line options: +- `--config`: Path to configuration file (default: network_configuration.json) +- `--debug`: Enable debug logging +- `--skip-unifi`: Skip UniFi discovery and use existing current.json + +## Output Files + +The script generates several output files: +- `current.json`: List of currently active Tasmota devices +- `deprecated.json`: Devices that were previously active but are no longer present +- `TasmotaDevices.json`: Detailed information about each device + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. \ No newline at end of file diff --git a/TasmotaManager.py b/TasmotaManager.py new file mode 100644 index 0000000..92081c4 --- /dev/null +++ b/TasmotaManager.py @@ -0,0 +1,662 @@ +import json +import logging +import os +import sys +from datetime import datetime +from typing import Optional +import requests +from urllib3.exceptions import InsecureRequestWarning +import re # Import the regular expression module +import time +import argparse + +# Disable SSL warnings +requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning) + +class UnifiClient: + def __init__(self, base_url, username, password, site_id, verify_ssl=True): + self.base_url = base_url.rstrip('/') + self.username = username + self.password = password + self.site_id = site_id + self.session = requests.Session() + self.session.verify = verify_ssl + + # Initialize cookie jar + self.session.cookies.clear() + + def _login(self) -> requests.Response: # Changed return type annotation + """Authenticate with the UniFi Controller.""" + login_url = f"{self.base_url}/api/auth/login" + headers = { + 'Content-Type': 'application/json', + 'Accept': 'application/json' + } + payload = { + "username": self.username, + "password": self.password, + "remember": False + } + try: + response = self.session.post( + login_url, + json=payload, + headers=headers + ) + response.raise_for_status() + if 'X-CSRF-Token' in response.headers: + self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token'] + return response # Return the response object + except requests.exceptions.RequestException as e: + if hasattr(e, 'response') and e.response.status_code == 401: + raise Exception("Authentication failed. Please verify your username and password.") from e + raise + + def get_clients(self) -> list: + """Get all clients from the UniFi Controller.""" + # Try the newer API endpoint first + url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If the newer endpoint fails, try the legacy endpoint + url = f"{self.base_url}/api/s/{self.site_id}/stat/sta" + try: + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + except requests.exceptions.RequestException as e: + # If both fail, try the v2 API endpoint + url = f"{self.base_url}/v2/api/site/{self.site_id}/clients" + response = self.session.get(url) + response.raise_for_status() + return response.json().get('data', []) + +class TasmotaDiscovery: + def __init__(self, debug: bool = False): + """Initialize the TasmotaDiscovery with optional debug mode.""" + log_level = logging.DEBUG if debug else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + self.logger = logging.getLogger(__name__) + self.config = None + self.unifi_client = None + + def load_config(self, config_path: Optional[str] = None) -> dict: + """Load configuration from JSON file.""" + if config_path is None: + config_path = os.path.join(os.path.dirname(__file__), 'config.json') + + self.logger.debug(f"Loading configuration from: {config_path}") + try: + with open(config_path, 'r') as config_file: + self.config = json.load(config_file) + self.logger.debug("Configuration loaded successfully from %s", config_path) + return self.config + except FileNotFoundError: + self.logger.error(f"Configuration file not found at {config_path}") + sys.exit(1) + except json.JSONDecodeError: + self.logger.error("Invalid JSON in configuration file") + sys.exit(1) + + def setup_unifi_client(self): + """Set up the UniFi client with better error handling""" + self.logger.debug("Setting up UniFi client") + + if not self.config or 'unifi' not in self.config: + raise ValueError("Missing UniFi configuration") + + unifi_config = self.config['unifi'] + required_fields = ['host', 'username', 'password', 'site'] + missing_fields = [field for field in required_fields if field not in unifi_config] + + if missing_fields: + raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}") + + try: + self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}") + self.unifi_client = UnifiClient( + base_url=unifi_config['host'], + username=unifi_config['username'], + password=unifi_config['password'], + site_id=unifi_config['site'], + verify_ssl=False # Add this if using self-signed certificates + ) + + # Test the connection by making a simple request + response = self.unifi_client._login() + if not response: + raise ConnectionError(f"Failed to connect to UniFi controller: No response") + + self.logger.debug("UniFi client setup successful") + + except Exception as e: + self.logger.error(f"Error setting up UniFi client: {str(e)}") + raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}") + + def is_tasmota_device(self, device: dict) -> bool: + """Determine if a device is a Tasmota device.""" + name = device.get('name', '').lower() + hostname = device.get('hostname', '').lower() + ip = device.get('ip', '') + + # Check if device is in the configured NoT network + network_filters = self.config['unifi'].get('network_filter', {}) + for network in network_filters.values(): + if ip.startswith(network['subnet']): + self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}") + + # First check exclusion patterns + exclude_patterns = network.get('exclude_patterns', []) + for pattern in exclude_patterns: + pattern = pattern.lower() + # Convert glob pattern to regex pattern + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})") + return False + + # If not excluded, check if it's a Tasmota device + matches = any([ + name.startswith('tasmota'), + name.startswith('sonoff'), + name.endswith('-ts'), + hostname.startswith('tasmota'), + hostname.startswith('sonoff'), + hostname.startswith('esp-'), + any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch']) + ]) + if matches: + self.logger.debug(f"Found Tasmota device: {name}") + return True # Consider all non-excluded devices in NoT network as potential Tasmota devices + + return False + + def get_tasmota_devices(self) -> list: + """Query UniFi controller and filter Tasmota devices.""" + devices = [] + self.logger.debug("Querying UniFi controller for devices") + try: + all_clients = self.unifi_client.get_clients() + self.logger.debug(f"Found {len(all_clients)} total devices") + + for device in all_clients: + if self.is_tasmota_device(device): + device_info = { + "name": device.get('name', device.get('hostname', 'Unknown')), + "ip": device.get('ip', ''), + "mac": device.get('mac', ''), + "last_seen": device.get('last_seen', ''), + "hostname": device.get('hostname', ''), + "notes": device.get('note', ''), + } + devices.append(device_info) + + self.logger.debug(f"Found {len(devices)} Tasmota devices") + return devices + except Exception as e: + self.logger.error(f"Error getting devices from UniFi controller: {e}") + return [] + + def save_tasmota_config(self, devices: list) -> None: + """Save Tasmota device information to a JSON file with device tracking.""" + filename = "current.json" + self.logger.debug(f"Saving Tasmota configuration to {filename}") + deprecated_filename = "deprecated.json" + + current_devices = [] + deprecated_devices = [] + + # Load existing devices if file exists + if os.path.exists(filename): + try: + with open(filename, 'r') as f: + existing_config = json.load(f) + current_devices = existing_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {filename}, treating as empty") + current_devices = [] + + # Load deprecated devices if file exists + if os.path.exists(deprecated_filename): + try: + with open(deprecated_filename, 'r') as f: + deprecated_config = json.load(f) + deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', []) + except json.JSONDecodeError: + self.logger.error(f"Error reading {deprecated_filename}, treating as empty") + deprecated_devices = [] + + # Create new config + new_devices = [] + moved_to_deprecated = [] + restored_from_deprecated = [] + removed_from_deprecated = [] + excluded_devices = [] + + # Check for excluded devices in current and deprecated lists + network_filters = self.config['unifi'].get('network_filter', {}) + exclude_patterns = [] + for network in network_filters.values(): + exclude_patterns.extend(network.get('exclude_patterns', [])) + + # Function to check if device is excluded + def is_device_excluded(device_name: str, hostname: str = '') -> bool: + name = device_name.lower() + hostname = hostname.lower() + for pattern in exclude_patterns: + pattern = pattern.lower() + pattern = pattern.replace('.', r'\.').replace('*', '.*') + if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname): + return True + return False + + # Process current devices + for device in devices: + device_name = device['name'] + device_hostname = device.get('hostname', '') + device_ip = device['ip'] + device_mac = device['mac'] + + # Check if device should be excluded + if is_device_excluded(device_name, device_hostname): + print(f"Device {device_name} excluded by pattern - skipping") + excluded_devices.append(device_name) + continue + + # Check in current devices + existing_device = next((d for d in current_devices + if d['name'] == device_name), None) + + if existing_device: + # Device exists, check if IP or MAC changed + if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac: + moved_to_deprecated.append(existing_device) + new_devices.append(device) + print(f"Device {device_name} moved to deprecated (IP/MAC changed)") + else: + new_devices.append(existing_device) # Keep existing device + else: + # New device, check if it was in deprecated + deprecated_device = next((d for d in deprecated_devices + if d['name'] == device_name), None) + if deprecated_device: + removed_from_deprecated.append(device_name) + print(f"Device {device_name} removed from deprecated (restored)") + new_devices.append(device) + print(f"Device {device_name} added to output file") + + # Find devices that are no longer present + current_names = {d['name'] for d in devices} + for existing_device in current_devices: + if existing_device['name'] not in current_names: + if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')): + moved_to_deprecated.append(existing_device) + print(f"Device {existing_device['name']} moved to deprecated (no longer present)") + + # Update deprecated devices list, excluding any excluded devices + final_deprecated = [] + for device in deprecated_devices: + if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')): + final_deprecated.append(device) + elif is_device_excluded(device['name'], device.get('hostname', '')): + print(f"Device {device['name']} removed from deprecated (excluded by pattern)") + + final_deprecated.extend(moved_to_deprecated) + + # Save new configuration + config = { + "tasmota": { + "devices": new_devices, + "generated_at": datetime.now().isoformat(), + "total_devices": len(new_devices) + } + } + + # Save deprecated configuration + deprecated_config = { + "tasmota": { + "devices": final_deprecated, + "generated_at": datetime.now().isoformat(), + "total_devices": len(final_deprecated) + } + } + + # Backup existing file if it exists + if os.path.exists(filename): + try: + backup_name = f"{filename}.backup" + os.rename(filename, backup_name) + self.logger.info(f"Created backup of existing configuration as {backup_name}") + except Exception as e: + self.logger.error(f"Error creating backup: {e}") + + # Save files + try: + with open(filename, 'w') as f: + json.dump(config, f, indent=4) + with open(deprecated_filename, 'w') as f: + json.dump(deprecated_config, f, indent=4) + + self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}") + self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}") + + print("\nDevice Status Summary:") + if excluded_devices: + print("\nExcluded Devices:") + for name in excluded_devices: + print(f"- {name}") + + if moved_to_deprecated: + print("\nMoved to deprecated:") + for device in moved_to_deprecated: + print(f"- {device['name']}") + + if removed_from_deprecated: + print("\nRestored from deprecated:") + for name in removed_from_deprecated: + print(f"- {name}") + + print("\nCurrent Tasmota Devices:") + for device in new_devices: + print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}") + + except Exception as e: + self.logger.error(f"Error saving configuration: {e}") + + def get_device_details(self, use_current_json=True): + """Connect to each Tasmota device via HTTP, gather details and validate MQTT settings""" + self.logger.info("Starting to gather detailed device information...") + device_details = [] + + try: + source_file = 'current.json' if use_current_json else 'tasmota.json' + with open(source_file, 'r') as f: + data = json.load(f) + devices = data.get('tasmota', {}).get('devices', []) + self.logger.debug(f"Loaded {len(devices)} devices from {source_file}") + except FileNotFoundError: + self.logger.error(f"{source_file} not found. Run discovery first.") + return + except json.JSONDecodeError: + self.logger.error(f"Invalid JSON format in {source_file}") + return + + mqtt_config = self.config.get('mqtt', {}) + if not mqtt_config: + self.logger.error("MQTT configuration missing from config file") + return + + def check_mqtt_settings(ip, name, mqtt_status): + """Check and update MQTT settings if they don't match config""" + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + mqtt_fields = { + "Host": mqtt_config.get('Host', ''), + "Port": mqtt_config.get('Port', 1883), + "User": mqtt_config.get('User', ''), + "Password": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + device_mqtt = mqtt_status.get('MqttHost', {}) + changes_needed = [] + force_password_update = False + + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['Host']: + changes_needed.append(('MqttHost', mqtt_fields['Host'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['Port']: + changes_needed.append(('MqttPort', mqtt_fields['Port'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['User']: + changes_needed.append(('MqttUser', mqtt_fields['User'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['Password'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting + if mqtt_config.get('NoRetain', True): + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + + # Apply changes if needed + for setting, value in changes_needed: + try: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + else: + self.logger.error(f"{name}: Failed to update {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") + + return len(changes_needed) > 0 + + for device in devices: + if not isinstance(device, dict): + self.logger.warning(f"Skipping invalid device entry: {device}") + continue + + name = device.get('name', 'Unknown') + ip = device.get('ip') + mac = device.get('mac') + + if not ip: + self.logger.warning(f"Skipping device {name} - no IP address") + continue + + self.logger.info(f"Checking device: {name} at {ip}") + + try: + # Get Status 2 for firmware version + url_status = f"http://{ip}/cm?cmnd=Status%202" + response = requests.get(url_status, timeout=5) + status_data = response.json() + + # Get Status 5 for network info + url_network = f"http://{ip}/cm?cmnd=Status%205" + response = requests.get(url_network, timeout=5) + network_data = response.json() + + # Get Status 6 for MQTT info + url_mqtt = f"http://{ip}/cm?cmnd=Status%206" + response = requests.get(url_mqtt, timeout=5) + mqtt_data = response.json() + + # Check and update MQTT settings if needed + mqtt_updated = check_mqtt_settings(ip, name, mqtt_data) + + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": status_data.get("StatusFWR", {}).get("Version", "Unknown"), + "hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"), + "mqtt_status": "Updated" if mqtt_updated else "Verified", + "last_checked": time.strftime("%Y-%m-%d %H:%M:%S"), + "status": "online" + } + self.logger.info(f"Successfully got version for {name}: {device_detail['version']}") + + except requests.exceptions.RequestException as e: + self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}") + device_detail = { + "name": name, + "ip": ip, + "mac": mac, + "version": "Unknown", + "status": "offline", + "error": str(e) + } + + device_details.append(device_detail) + time.sleep(0.5) + + # Save all device details at once + try: + with open('TasmotaDevices.json', 'w') as f: + json.dump(device_details, f, indent=2) + self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)") + except Exception as e: + self.logger.error(f"Error saving device details: {e}") + + def check_mqtt_settings(ip, name, mqtt_status): + """Check and update MQTT settings if they don't match config""" + # Get the base hostname (everything before the dash) + hostname_base = name.split('-')[0] if '-' in name else name + + mqtt_fields = { + "Host": mqtt_config.get('Host', ''), + "Port": mqtt_config.get('Port', 1883), + "User": mqtt_config.get('User', ''), + "Password": mqtt_config.get('Password', ''), + "Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''), + "FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'), + } + + device_mqtt = mqtt_status.get('MqttHost', {}) + changes_needed = [] + force_password_update = False + + # Check each MQTT setting + if device_mqtt.get('Host') != mqtt_fields['Host']: + changes_needed.append(('MqttHost', mqtt_fields['Host'])) + self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}") + force_password_update = True + + if device_mqtt.get('Port') != mqtt_fields['Port']: + changes_needed.append(('MqttPort', mqtt_fields['Port'])) + self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}") + force_password_update = True + + if device_mqtt.get('User') != mqtt_fields['User']: + changes_needed.append(('MqttUser', mqtt_fields['User'])) + self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}") + force_password_update = True + + if device_mqtt.get('Topic') != mqtt_fields['Topic']: + changes_needed.append(('Topic', mqtt_fields['Topic'])) + self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}") + force_password_update = True + + if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']: + changes_needed.append(('FullTopic', mqtt_fields['FullTopic'])) + self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}") + force_password_update = True + + # Add password update if any MQTT setting changed or user was updated + if force_password_update: + changes_needed.append(('MqttPassword', mqtt_fields['Password'])) + self.logger.debug(f"{name}: MQTT Password will be updated") + + # Check NoRetain setting + if mqtt_config.get('NoRetain', True): + changes_needed.append(('SetOption62', '1')) # 1 = No Retain + + # Apply changes if needed + for setting, value in changes_needed: + try: + url = f"http://{ip}/cm?cmnd={setting}%20{value}" + response = requests.get(url, timeout=5) + if response.status_code == 200: + if setting != 'MqttPassword': + self.logger.debug(f"{name}: Updated {setting} to {value}") + else: + self.logger.debug(f"{name}: Updated MQTT Password") + else: + self.logger.error(f"{name}: Failed to update {setting}") + except requests.exceptions.RequestException as e: + self.logger.error(f"{name}: Error updating {setting}: {str(e)}") + + return len(changes_needed) > 0 + +def main(): + parser = argparse.ArgumentParser(description='Tasmota Device Manager') + parser.add_argument('--config', default='network_configuration.json', + help='Path to configuration file') + parser.add_argument('--debug', action='store_true', + help='Enable debug logging') + parser.add_argument('--skip-unifi', action='store_true', + help='Skip UniFi discovery and use existing current.json') + + args = parser.parse_args() + + # Set up logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig(level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + print("Starting Tasmota Device Discovery and Version Check...") + + # Create TasmotaDiscovery instance + discovery = TasmotaDiscovery(debug=args.debug) + discovery.load_config(args.config) + + try: + if not args.skip_unifi: + print("Step 1: Discovering Tasmota devices...") + discovery.setup_unifi_client() + tasmota_devices = discovery.get_tasmota_devices() + discovery.save_tasmota_config(tasmota_devices) + else: + print("Skipping UniFi discovery, using existing current.json...") + + print("\nStep 2: Getting detailed version information...") + discovery.get_device_details(use_current_json=True) + + print("\nProcess completed successfully!") + print("- Device list saved to: current.json") + print("- Detailed information saved to: TasmotaDevices.json") + + except ConnectionError as e: + print(f"Connection Error: {str(e)}") + print("\nTrying to proceed with existing current.json...") + try: + discovery.get_device_details(use_current_json=True) + print("\nSuccessfully retrieved device details from existing current.json") + except Exception as inner_e: + print(f"Error processing existing devices: {str(inner_e)}") + return 1 + except Exception as e: + print(f"Error: {str(e)}") + if args.debug: + import traceback + traceback.print_exc() + return 1 + + return 0 + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/network_configuration.json b/network_configuration.json new file mode 100644 index 0000000..fe267e8 --- /dev/null +++ b/network_configuration.json @@ -0,0 +1,31 @@ +{ + "unifi": { + "host": "https://192.168.6.1", + "username": "Tasmota", + "password": "TasmotaManager12!@", + "site": "default", + "network_filter": { + "NoT_network": { + "name": "NoT", + "subnet": "192.168.8", + "exclude_patterns": [ + "homeassistant*", + "*sonos*" + ], + "unknown_device_patterns": [ + "tasmota*", + "ESP-*" + ] + } + } + }, + "mqtt": { + "Host": "homeassistant.NoT.mgeppert.com", + "Port": 1883, + "User": "mgeppert", + "Password": "mgeppert", + "Topic": "%hostname_base%", + "FullTopic": "%prefix%/%topic%/", + "NoRetain": false + } +} \ No newline at end of file