Merge branch 'inital' into 'master'

Inital

See merge request tasmota/manager!31
This commit is contained in:
Mike Geppert 2025-08-17 21:58:31 +00:00
commit 8b42e7435e
5 changed files with 929 additions and 5 deletions

35
.gitignore vendored Normal file
View File

@ -0,0 +1,35 @@
# Python bytecode files
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
dist/
build/
*.egg-info/
# Virtual environments
venv/
env/
ENV/
# IDE files
.idea/
.vscode/
*.swp
*.swo
# Logs
*.log
# Local configuration that might contain sensitive information
network_configuration.json
# Backup files
*.backup
# Generated data files with sensitive network information
current.json
deprecated.json
TasmotaDevices.json
*.json.backup

108
GITLAB_MIGRATION.md Normal file
View File

@ -0,0 +1,108 @@
# GitLab Migration Instructions
This document provides instructions for completing the migration of the TasmotaManager project to GitLab at 192.168.5.10.
## Completed Steps
The following steps have already been completed:
1. Git repository initialized in the TasmotaManager directory
2. `.gitignore` file created to exclude sensitive files:
- `network_configuration.json` (contains credentials)
- Data files with network information (`current.json`, `deprecated.json`, `TasmotaDevices.json`)
- Backup files
3. `README.md` file created with project documentation
4. Non-sensitive files added and committed to the local Git repository:
- `TasmotaManager.py`
- `.gitignore`
- `README.md`
## Remaining Steps
To complete the migration to GitLab, follow these steps:
### Option 1: Create a new repository in GitLab
1. Log in to GitLab at http://192.168.5.10
2. Navigate to the Manager project/group
3. Create a new project named "TasmotaManager"
4. Follow the instructions provided by GitLab to push an existing repository:
```bash
# If using HTTP
git remote add gitlab http://192.168.5.10/Manager/TasmotaManager.git
git push -u gitlab inital
# If using SSH
git remote add gitlab git@192.168.5.10:Manager/TasmotaManager.git
git push -u gitlab inital
```
### Option 2: Add as a subproject to the existing Manager project
If TasmotaManager should be a subproject or subdirectory within the Manager project:
1. Clone the Manager project:
```bash
git clone http://192.168.5.10/Manager.git
```
2. Create a TasmotaManager directory within the Manager project:
```bash
cd Manager
mkdir TasmotaManager
```
3. Copy the files from the local TasmotaManager repository:
```bash
cp /home/mgeppert/git_work/scripts/TasmotaManager/TasmotaManager.py TasmotaManager/
cp /home/mgeppert/git_work/scripts/TasmotaManager/.gitignore TasmotaManager/
cp /home/mgeppert/git_work/scripts/TasmotaManager/README.md TasmotaManager/
```
4. Add, commit, and push the changes:
```bash
git add TasmotaManager
git commit -m "Add TasmotaManager as a subproject"
git push
```
### Option 3: Add as a Git submodule
If TasmotaManager should be maintained as a separate repository but included in the Manager project:
1. First, create the TasmotaManager repository in GitLab (see Option 1)
2. Clone the Manager project:
```bash
git clone http://192.168.5.10/Manager.git
```
3. Add the TasmotaManager as a submodule:
```bash
cd Manager
git submodule add http://192.168.5.10/TasmotaManager.git TasmotaManager
git commit -m "Add TasmotaManager as a submodule"
git push
```
## Configuration Files
Remember that sensitive configuration files are excluded from version control. After cloning or setting up the repository, you'll need to:
1. Create a `network_configuration.json` file with your UniFi Controller and MQTT settings
2. Run the script to generate the data files:
```bash
python TasmotaManager.py
```
## Troubleshooting
If you encounter authentication issues when pushing to GitLab:
1. Ensure you have the correct access rights to the repository
2. Try using a personal access token instead of password authentication:
```bash
git remote set-url gitlab http://username:token@192.168.5.10/Manager/TasmotaManager.git
```
3. If using SSH, ensure your SSH key is added to your GitLab account

View File

@ -1,8 +1,96 @@
# Sample GitLab Project # TasmotaManager
This sample project shows how a project in GitLab looks for demonstration purposes. It contains issues, merge requests and Markdown files in many branches, A Python utility for discovering, monitoring, and managing Tasmota devices on a network using UniFi Controller.
named and filled with lorem ipsum.
You can look around to get an idea how to structure your project and, when done, you can safely delete this project. ## Features
[Learn more about creating GitLab projects.](https://docs.gitlab.com/ee/gitlab-basics/create-project.html) - Discovers Tasmota devices on the network via UniFi Controller API
- Tracks device changes over time (new, moved, deprecated devices)
- Checks and updates MQTT settings on Tasmota devices
- Generates detailed device information including firmware versions
## Requirements
- Python 3.6+
- UniFi Controller with API access
- Network with Tasmota devices
## Dependencies
- requests
- urllib3
- Standard library modules (json, logging, os, sys, datetime, re, time, argparse)
## Installation
1. Clone this repository
2. Install required packages:
```bash
pip install requests urllib3
```
3. Create a configuration file (see below)
## Configuration
Create a `network_configuration.json` file with the following structure:
```json
{
"unifi": {
"host": "https://your-unifi-controller.local",
"username": "your-username",
"password": "your-password",
"site": "default",
"network_filter": {
"network_name": {
"name": "Human-readable name",
"subnet": "192.168.1",
"exclude_patterns": [
"device-to-exclude*"
],
"unknown_device_patterns": [
"tasmota*",
"ESP-*"
]
}
}
},
"mqtt": {
"Host": "mqtt-broker.local",
"Port": 1883,
"User": "mqtt-user",
"Password": "mqtt-password",
"Topic": "%hostname_base%",
"FullTopic": "%prefix%/%topic%/",
"NoRetain": false
}
}
```
## Usage
Basic usage:
```bash
python TasmotaManager.py
```
With options:
```bash
python TasmotaManager.py --config custom_config.json --debug --skip-unifi
```
Command-line options:
- `--config`: Path to configuration file (default: network_configuration.json)
- `--debug`: Enable debug logging
- `--skip-unifi`: Skip UniFi discovery and use existing current.json
## Output Files
The script generates several output files:
- `current.json`: List of currently active Tasmota devices
- `deprecated.json`: Devices that were previously active but are no longer present
- `TasmotaDevices.json`: Detailed information about each device
## License
This project is licensed under the MIT License - see the LICENSE file for details.

662
TasmotaManager.py Normal file
View File

@ -0,0 +1,662 @@
import json
import logging
import os
import sys
from datetime import datetime
from typing import Optional
import requests
from urllib3.exceptions import InsecureRequestWarning
import re # Import the regular expression module
import time
import argparse
# Disable SSL warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
class UnifiClient:
def __init__(self, base_url, username, password, site_id, verify_ssl=True):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.site_id = site_id
self.session = requests.Session()
self.session.verify = verify_ssl
# Initialize cookie jar
self.session.cookies.clear()
def _login(self) -> requests.Response: # Changed return type annotation
"""Authenticate with the UniFi Controller."""
login_url = f"{self.base_url}/api/auth/login"
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
payload = {
"username": self.username,
"password": self.password,
"remember": False
}
try:
response = self.session.post(
login_url,
json=payload,
headers=headers
)
response.raise_for_status()
if 'X-CSRF-Token' in response.headers:
self.session.headers['X-CSRF-Token'] = response.headers['X-CSRF-Token']
return response # Return the response object
except requests.exceptions.RequestException as e:
if hasattr(e, 'response') and e.response.status_code == 401:
raise Exception("Authentication failed. Please verify your username and password.") from e
raise
def get_clients(self) -> list:
"""Get all clients from the UniFi Controller."""
# Try the newer API endpoint first
url = f"{self.base_url}/proxy/network/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If the newer endpoint fails, try the legacy endpoint
url = f"{self.base_url}/api/s/{self.site_id}/stat/sta"
try:
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
except requests.exceptions.RequestException as e:
# If both fail, try the v2 API endpoint
url = f"{self.base_url}/v2/api/site/{self.site_id}/clients"
response = self.session.get(url)
response.raise_for_status()
return response.json().get('data', [])
class TasmotaDiscovery:
def __init__(self, debug: bool = False):
"""Initialize the TasmotaDiscovery with optional debug mode."""
log_level = logging.DEBUG if debug else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
self.logger = logging.getLogger(__name__)
self.config = None
self.unifi_client = None
def load_config(self, config_path: Optional[str] = None) -> dict:
"""Load configuration from JSON file."""
if config_path is None:
config_path = os.path.join(os.path.dirname(__file__), 'config.json')
self.logger.debug(f"Loading configuration from: {config_path}")
try:
with open(config_path, 'r') as config_file:
self.config = json.load(config_file)
self.logger.debug("Configuration loaded successfully from %s", config_path)
return self.config
except FileNotFoundError:
self.logger.error(f"Configuration file not found at {config_path}")
sys.exit(1)
except json.JSONDecodeError:
self.logger.error("Invalid JSON in configuration file")
sys.exit(1)
def setup_unifi_client(self):
"""Set up the UniFi client with better error handling"""
self.logger.debug("Setting up UniFi client")
if not self.config or 'unifi' not in self.config:
raise ValueError("Missing UniFi configuration")
unifi_config = self.config['unifi']
required_fields = ['host', 'username', 'password', 'site']
missing_fields = [field for field in required_fields if field not in unifi_config]
if missing_fields:
raise ValueError(f"Missing required UniFi configuration fields: {', '.join(missing_fields)}")
try:
self.logger.debug(f"Connecting to UniFi Controller at {unifi_config['host']}")
self.unifi_client = UnifiClient(
base_url=unifi_config['host'],
username=unifi_config['username'],
password=unifi_config['password'],
site_id=unifi_config['site'],
verify_ssl=False # Add this if using self-signed certificates
)
# Test the connection by making a simple request
response = self.unifi_client._login()
if not response:
raise ConnectionError(f"Failed to connect to UniFi controller: No response")
self.logger.debug("UniFi client setup successful")
except Exception as e:
self.logger.error(f"Error setting up UniFi client: {str(e)}")
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
def is_tasmota_device(self, device: dict) -> bool:
"""Determine if a device is a Tasmota device."""
name = device.get('name', '').lower()
hostname = device.get('hostname', '').lower()
ip = device.get('ip', '')
# Check if device is in the configured NoT network
network_filters = self.config['unifi'].get('network_filter', {})
for network in network_filters.values():
if ip.startswith(network['subnet']):
self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}")
# First check exclusion patterns
exclude_patterns = network.get('exclude_patterns', [])
for pattern in exclude_patterns:
pattern = pattern.lower()
# Convert glob pattern to regex pattern
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
return False
# If not excluded, check if it's a Tasmota device
matches = any([
name.startswith('tasmota'),
name.startswith('sonoff'),
name.endswith('-ts'),
hostname.startswith('tasmota'),
hostname.startswith('sonoff'),
hostname.startswith('esp-'),
any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch'])
])
if matches:
self.logger.debug(f"Found Tasmota device: {name}")
return True # Consider all non-excluded devices in NoT network as potential Tasmota devices
return False
def get_tasmota_devices(self) -> list:
"""Query UniFi controller and filter Tasmota devices."""
devices = []
self.logger.debug("Querying UniFi controller for devices")
try:
all_clients = self.unifi_client.get_clients()
self.logger.debug(f"Found {len(all_clients)} total devices")
for device in all_clients:
if self.is_tasmota_device(device):
device_info = {
"name": device.get('name', device.get('hostname', 'Unknown')),
"ip": device.get('ip', ''),
"mac": device.get('mac', ''),
"last_seen": device.get('last_seen', ''),
"hostname": device.get('hostname', ''),
"notes": device.get('note', ''),
}
devices.append(device_info)
self.logger.debug(f"Found {len(devices)} Tasmota devices")
return devices
except Exception as e:
self.logger.error(f"Error getting devices from UniFi controller: {e}")
return []
def save_tasmota_config(self, devices: list) -> None:
"""Save Tasmota device information to a JSON file with device tracking."""
filename = "current.json"
self.logger.debug(f"Saving Tasmota configuration to {filename}")
deprecated_filename = "deprecated.json"
current_devices = []
deprecated_devices = []
# Load existing devices if file exists
if os.path.exists(filename):
try:
with open(filename, 'r') as f:
existing_config = json.load(f)
current_devices = existing_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {filename}, treating as empty")
current_devices = []
# Load deprecated devices if file exists
if os.path.exists(deprecated_filename):
try:
with open(deprecated_filename, 'r') as f:
deprecated_config = json.load(f)
deprecated_devices = deprecated_config.get('tasmota', {}).get('devices', [])
except json.JSONDecodeError:
self.logger.error(f"Error reading {deprecated_filename}, treating as empty")
deprecated_devices = []
# Create new config
new_devices = []
moved_to_deprecated = []
restored_from_deprecated = []
removed_from_deprecated = []
excluded_devices = []
# Check for excluded devices in current and deprecated lists
network_filters = self.config['unifi'].get('network_filter', {})
exclude_patterns = []
for network in network_filters.values():
exclude_patterns.extend(network.get('exclude_patterns', []))
# Function to check if device is excluded
def is_device_excluded(device_name: str, hostname: str = '') -> bool:
name = device_name.lower()
hostname = hostname.lower()
for pattern in exclude_patterns:
pattern = pattern.lower()
pattern = pattern.replace('.', r'\.').replace('*', '.*')
if re.match(f"^{pattern}$", name) or re.match(f"^{pattern}$", hostname):
return True
return False
# Process current devices
for device in devices:
device_name = device['name']
device_hostname = device.get('hostname', '')
device_ip = device['ip']
device_mac = device['mac']
# Check if device should be excluded
if is_device_excluded(device_name, device_hostname):
print(f"Device {device_name} excluded by pattern - skipping")
excluded_devices.append(device_name)
continue
# Check in current devices
existing_device = next((d for d in current_devices
if d['name'] == device_name), None)
if existing_device:
# Device exists, check if IP or MAC changed
if existing_device['ip'] != device_ip or existing_device['mac'] != device_mac:
moved_to_deprecated.append(existing_device)
new_devices.append(device)
print(f"Device {device_name} moved to deprecated (IP/MAC changed)")
else:
new_devices.append(existing_device) # Keep existing device
else:
# New device, check if it was in deprecated
deprecated_device = next((d for d in deprecated_devices
if d['name'] == device_name), None)
if deprecated_device:
removed_from_deprecated.append(device_name)
print(f"Device {device_name} removed from deprecated (restored)")
new_devices.append(device)
print(f"Device {device_name} added to output file")
# Find devices that are no longer present
current_names = {d['name'] for d in devices}
for existing_device in current_devices:
if existing_device['name'] not in current_names:
if not is_device_excluded(existing_device['name'], existing_device.get('hostname', '')):
moved_to_deprecated.append(existing_device)
print(f"Device {existing_device['name']} moved to deprecated (no longer present)")
# Update deprecated devices list, excluding any excluded devices
final_deprecated = []
for device in deprecated_devices:
if device['name'] not in removed_from_deprecated and not is_device_excluded(device['name'], device.get('hostname', '')):
final_deprecated.append(device)
elif is_device_excluded(device['name'], device.get('hostname', '')):
print(f"Device {device['name']} removed from deprecated (excluded by pattern)")
final_deprecated.extend(moved_to_deprecated)
# Save new configuration
config = {
"tasmota": {
"devices": new_devices,
"generated_at": datetime.now().isoformat(),
"total_devices": len(new_devices)
}
}
# Save deprecated configuration
deprecated_config = {
"tasmota": {
"devices": final_deprecated,
"generated_at": datetime.now().isoformat(),
"total_devices": len(final_deprecated)
}
}
# Backup existing file if it exists
if os.path.exists(filename):
try:
backup_name = f"{filename}.backup"
os.rename(filename, backup_name)
self.logger.info(f"Created backup of existing configuration as {backup_name}")
except Exception as e:
self.logger.error(f"Error creating backup: {e}")
# Save files
try:
with open(filename, 'w') as f:
json.dump(config, f, indent=4)
with open(deprecated_filename, 'w') as f:
json.dump(deprecated_config, f, indent=4)
self.logger.info(f"Successfully saved {len(new_devices)} Tasmota devices to {filename}")
self.logger.info(f"Successfully saved {len(final_deprecated)} deprecated devices to {deprecated_filename}")
print("\nDevice Status Summary:")
if excluded_devices:
print("\nExcluded Devices:")
for name in excluded_devices:
print(f"- {name}")
if moved_to_deprecated:
print("\nMoved to deprecated:")
for device in moved_to_deprecated:
print(f"- {device['name']}")
if removed_from_deprecated:
print("\nRestored from deprecated:")
for name in removed_from_deprecated:
print(f"- {name}")
print("\nCurrent Tasmota Devices:")
for device in new_devices:
print(f"Name: {device['name']:<20} IP: {device['ip']:<15} MAC: {device['mac']}")
except Exception as e:
self.logger.error(f"Error saving configuration: {e}")
def get_device_details(self, use_current_json=True):
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings"""
self.logger.info("Starting to gather detailed device information...")
device_details = []
try:
source_file = 'current.json' if use_current_json else 'tasmota.json'
with open(source_file, 'r') as f:
data = json.load(f)
devices = data.get('tasmota', {}).get('devices', [])
self.logger.debug(f"Loaded {len(devices)} devices from {source_file}")
except FileNotFoundError:
self.logger.error(f"{source_file} not found. Run discovery first.")
return
except json.JSONDecodeError:
self.logger.error(f"Invalid JSON format in {source_file}")
return
mqtt_config = self.config.get('mqtt', {})
if not mqtt_config:
self.logger.error("MQTT configuration missing from config file")
return
def check_mqtt_settings(ip, name, mqtt_status):
"""Check and update MQTT settings if they don't match config"""
# Get the base hostname (everything before the dash)
hostname_base = name.split('-')[0] if '-' in name else name
mqtt_fields = {
"Host": mqtt_config.get('Host', ''),
"Port": mqtt_config.get('Port', 1883),
"User": mqtt_config.get('User', ''),
"Password": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
device_mqtt = mqtt_status.get('MqttHost', {})
changes_needed = []
force_password_update = False
# Check each MQTT setting
if device_mqtt.get('Host') != mqtt_fields['Host']:
changes_needed.append(('MqttHost', mqtt_fields['Host']))
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
force_password_update = True
if device_mqtt.get('Port') != mqtt_fields['Port']:
changes_needed.append(('MqttPort', mqtt_fields['Port']))
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
force_password_update = True
if device_mqtt.get('User') != mqtt_fields['User']:
changes_needed.append(('MqttUser', mqtt_fields['User']))
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
force_password_update = True
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
changes_needed.append(('Topic', mqtt_fields['Topic']))
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
force_password_update = True
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
force_password_update = True
# Add password update if any MQTT setting changed or user was updated
if force_password_update:
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
self.logger.debug(f"{name}: MQTT Password will be updated")
# Check NoRetain setting
if mqtt_config.get('NoRetain', True):
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
# Apply changes if needed
for setting, value in changes_needed:
try:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.debug(f"{name}: Updated {setting} to {value}")
else:
self.logger.debug(f"{name}: Updated MQTT Password")
else:
self.logger.error(f"{name}: Failed to update {setting}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
return len(changes_needed) > 0
for device in devices:
if not isinstance(device, dict):
self.logger.warning(f"Skipping invalid device entry: {device}")
continue
name = device.get('name', 'Unknown')
ip = device.get('ip')
mac = device.get('mac')
if not ip:
self.logger.warning(f"Skipping device {name} - no IP address")
continue
self.logger.info(f"Checking device: {name} at {ip}")
try:
# Get Status 2 for firmware version
url_status = f"http://{ip}/cm?cmnd=Status%202"
response = requests.get(url_status, timeout=5)
status_data = response.json()
# Get Status 5 for network info
url_network = f"http://{ip}/cm?cmnd=Status%205"
response = requests.get(url_network, timeout=5)
network_data = response.json()
# Get Status 6 for MQTT info
url_mqtt = f"http://{ip}/cm?cmnd=Status%206"
response = requests.get(url_mqtt, timeout=5)
mqtt_data = response.json()
# Check and update MQTT settings if needed
mqtt_updated = check_mqtt_settings(ip, name, mqtt_data)
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": status_data.get("StatusFWR", {}).get("Version", "Unknown"),
"hostname": network_data.get("StatusNET", {}).get("Hostname", "Unknown"),
"mqtt_status": "Updated" if mqtt_updated else "Verified",
"last_checked": time.strftime("%Y-%m-%d %H:%M:%S"),
"status": "online"
}
self.logger.info(f"Successfully got version for {name}: {device_detail['version']}")
except requests.exceptions.RequestException as e:
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
device_detail = {
"name": name,
"ip": ip,
"mac": mac,
"version": "Unknown",
"status": "offline",
"error": str(e)
}
device_details.append(device_detail)
time.sleep(0.5)
# Save all device details at once
try:
with open('TasmotaDevices.json', 'w') as f:
json.dump(device_details, f, indent=2)
self.logger.info(f"Device details saved to TasmotaDevices.json ({len(device_details)} devices)")
except Exception as e:
self.logger.error(f"Error saving device details: {e}")
def check_mqtt_settings(ip, name, mqtt_status):
"""Check and update MQTT settings if they don't match config"""
# Get the base hostname (everything before the dash)
hostname_base = name.split('-')[0] if '-' in name else name
mqtt_fields = {
"Host": mqtt_config.get('Host', ''),
"Port": mqtt_config.get('Port', 1883),
"User": mqtt_config.get('User', ''),
"Password": mqtt_config.get('Password', ''),
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
}
device_mqtt = mqtt_status.get('MqttHost', {})
changes_needed = []
force_password_update = False
# Check each MQTT setting
if device_mqtt.get('Host') != mqtt_fields['Host']:
changes_needed.append(('MqttHost', mqtt_fields['Host']))
self.logger.debug(f"{name}: MQTT Host mismatch - Device: {device_mqtt.get('Host')}, Config: {mqtt_fields['Host']}")
force_password_update = True
if device_mqtt.get('Port') != mqtt_fields['Port']:
changes_needed.append(('MqttPort', mqtt_fields['Port']))
self.logger.debug(f"{name}: MQTT Port mismatch - Device: {device_mqtt.get('Port')}, Config: {mqtt_fields['Port']}")
force_password_update = True
if device_mqtt.get('User') != mqtt_fields['User']:
changes_needed.append(('MqttUser', mqtt_fields['User']))
self.logger.debug(f"{name}: MQTT User mismatch - Device: {device_mqtt.get('User')}, Config: {mqtt_fields['User']}")
force_password_update = True
if device_mqtt.get('Topic') != mqtt_fields['Topic']:
changes_needed.append(('Topic', mqtt_fields['Topic']))
self.logger.debug(f"{name}: MQTT Topic mismatch - Device: {device_mqtt.get('Topic')}, Config: {mqtt_fields['Topic']}")
force_password_update = True
if device_mqtt.get('FullTopic') != mqtt_fields['FullTopic']:
changes_needed.append(('FullTopic', mqtt_fields['FullTopic']))
self.logger.debug(f"{name}: MQTT FullTopic mismatch - Device: {device_mqtt.get('FullTopic')}, Config: {mqtt_fields['FullTopic']}")
force_password_update = True
# Add password update if any MQTT setting changed or user was updated
if force_password_update:
changes_needed.append(('MqttPassword', mqtt_fields['Password']))
self.logger.debug(f"{name}: MQTT Password will be updated")
# Check NoRetain setting
if mqtt_config.get('NoRetain', True):
changes_needed.append(('SetOption62', '1')) # 1 = No Retain
# Apply changes if needed
for setting, value in changes_needed:
try:
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
response = requests.get(url, timeout=5)
if response.status_code == 200:
if setting != 'MqttPassword':
self.logger.debug(f"{name}: Updated {setting} to {value}")
else:
self.logger.debug(f"{name}: Updated MQTT Password")
else:
self.logger.error(f"{name}: Failed to update {setting}")
except requests.exceptions.RequestException as e:
self.logger.error(f"{name}: Error updating {setting}: {str(e)}")
return len(changes_needed) > 0
def main():
parser = argparse.ArgumentParser(description='Tasmota Device Manager')
parser.add_argument('--config', default='network_configuration.json',
help='Path to configuration file')
parser.add_argument('--debug', action='store_true',
help='Enable debug logging')
parser.add_argument('--skip-unifi', action='store_true',
help='Skip UniFi discovery and use existing current.json')
args = parser.parse_args()
# Set up logging
log_level = logging.DEBUG if args.debug else logging.INFO
logging.basicConfig(level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
print("Starting Tasmota Device Discovery and Version Check...")
# Create TasmotaDiscovery instance
discovery = TasmotaDiscovery(debug=args.debug)
discovery.load_config(args.config)
try:
if not args.skip_unifi:
print("Step 1: Discovering Tasmota devices...")
discovery.setup_unifi_client()
tasmota_devices = discovery.get_tasmota_devices()
discovery.save_tasmota_config(tasmota_devices)
else:
print("Skipping UniFi discovery, using existing current.json...")
print("\nStep 2: Getting detailed version information...")
discovery.get_device_details(use_current_json=True)
print("\nProcess completed successfully!")
print("- Device list saved to: current.json")
print("- Detailed information saved to: TasmotaDevices.json")
except ConnectionError as e:
print(f"Connection Error: {str(e)}")
print("\nTrying to proceed with existing current.json...")
try:
discovery.get_device_details(use_current_json=True)
print("\nSuccessfully retrieved device details from existing current.json")
except Exception as inner_e:
print(f"Error processing existing devices: {str(inner_e)}")
return 1
except Exception as e:
print(f"Error: {str(e)}")
if args.debug:
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == '__main__':
main()

View File

@ -0,0 +1,31 @@
{
"unifi": {
"host": "https://192.168.6.1",
"username": "Tasmota",
"password": "TasmotaManager12!@",
"site": "default",
"network_filter": {
"NoT_network": {
"name": "NoT",
"subnet": "192.168.8",
"exclude_patterns": [
"homeassistant*",
"*sonos*"
],
"unknown_device_patterns": [
"tasmota*",
"ESP-*"
]
}
}
},
"mqtt": {
"Host": "homeassistant.NoT.mgeppert.com",
"Port": 1883,
"User": "mgeppert",
"Password": "mgeppert",
"Topic": "%hostname_base%",
"FullTopic": "%prefix%/%topic%/",
"NoRetain": false
}
}