Implement unknown device processing with interactive toggle feature and update documentation
This commit is contained in:
parent
e34e25b951
commit
040fbd68d8
23
README.md
23
README.md
@ -8,6 +8,7 @@ A Python utility for discovering, monitoring, and managing Tasmota devices on a
|
||||
- Tracks device changes over time (new, moved, deprecated devices)
|
||||
- Checks and updates MQTT settings on Tasmota devices
|
||||
- Generates detailed device information including firmware versions
|
||||
- Processes unknown devices (matching unknown_device_patterns) to set up names and MQTT
|
||||
|
||||
## Requirements
|
||||
|
||||
@ -83,6 +84,28 @@ Command-line options:
|
||||
- `--config`: Path to configuration file (default: network_configuration.json)
|
||||
- `--debug`: Enable debug logging
|
||||
- `--skip-unifi`: Skip UniFi discovery and use existing current.json
|
||||
- `--process-unknown`: Process unknown devices (matching unknown_device_patterns) to set up names and MQTT
|
||||
|
||||
## Unknown Device Processing
|
||||
|
||||
The script can process devices that match patterns in the `unknown_device_patterns` list (like "tasmota_" or "ESP-" prefixed devices). When using the `--process-unknown` flag, the script will:
|
||||
|
||||
1. Identify devices matching the unknown device patterns
|
||||
2. Check if each device has a toggle button (indicating it's a light switch or power plug)
|
||||
3. Toggle the button at 1/2Hz (on/off every two seconds) to help identify the physical device
|
||||
4. **How to enter the hostname:**
|
||||
- The script will display a clear prompt in the console showing the current device name and IP address
|
||||
- While the device is toggling, you'll see a prompt asking for a new name for the device
|
||||
- Type the new hostname directly in the console and press Enter
|
||||
- All debug messages are completely suppressed during this process to keep the console clear
|
||||
5. Once a hostname is entered, the script will:
|
||||
- Configure the "Friendly Name 1" field with the new hostname
|
||||
- Enable MQTT if not already enabled
|
||||
- Configure MQTT settings from the configuration file
|
||||
- Save the configuration and reboot the device
|
||||
6. Move on to the next unknown device
|
||||
|
||||
This feature helps automate the setup of new Tasmota devices that haven't been properly named yet.
|
||||
|
||||
## Output Files
|
||||
|
||||
|
||||
@ -141,18 +141,18 @@ class TasmotaDiscovery:
|
||||
raise ConnectionError(f"Failed to connect to UniFi controller: {str(e)}")
|
||||
|
||||
def is_tasmota_device(self, device: dict) -> bool:
|
||||
"""Determine if a device is a Tasmota device."""
|
||||
"""Determine if a device is in the network_filter and not in exclude_patterns."""
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
ip = device.get('ip', '')
|
||||
|
||||
# Check if device is in the configured NoT network
|
||||
# Check if device is in the configured network
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
for network in network_filters.values():
|
||||
if ip.startswith(network['subnet']):
|
||||
self.logger.debug(f"Checking device in NoT network: {name} ({hostname}) IP: {ip}")
|
||||
self.logger.debug(f"Checking device in network: {name} ({hostname}) IP: {ip}")
|
||||
|
||||
# First check exclusion patterns
|
||||
# Check exclusion patterns
|
||||
exclude_patterns = network.get('exclude_patterns', [])
|
||||
for pattern in exclude_patterns:
|
||||
pattern = pattern.lower()
|
||||
@ -162,19 +162,9 @@ class TasmotaDiscovery:
|
||||
self.logger.debug(f"Excluding device due to pattern '{pattern}': {name} ({hostname})")
|
||||
return False
|
||||
|
||||
# If not excluded, check if it's a Tasmota device
|
||||
matches = any([
|
||||
name.startswith('tasmota'),
|
||||
name.startswith('sonoff'),
|
||||
name.endswith('-ts'),
|
||||
hostname.startswith('tasmota'),
|
||||
hostname.startswith('sonoff'),
|
||||
hostname.startswith('esp-'),
|
||||
any(hostname.endswith(suffix) for suffix in ['-fan', '-lamp', '-light', '-switch'])
|
||||
])
|
||||
if matches:
|
||||
self.logger.debug(f"Found Tasmota device: {name}")
|
||||
return True # Consider all non-excluded devices in NoT network as potential Tasmota devices
|
||||
# Device is in the network and not excluded
|
||||
self.logger.debug(f"Found device in network: {name}")
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@ -370,8 +360,212 @@ class TasmotaDiscovery:
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error saving configuration: {e}")
|
||||
|
||||
def get_unknown_devices(self, use_current_json=True):
|
||||
"""Identify devices that match unknown_device_patterns from current.json."""
|
||||
self.logger.info("Identifying unknown devices for processing...")
|
||||
unknown_devices = []
|
||||
|
||||
try:
|
||||
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
||||
with open(source_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
all_devices = data.get('tasmota', {}).get('devices', [])
|
||||
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
||||
except FileNotFoundError:
|
||||
self.logger.error(f"{source_file} not found. Run discovery first.")
|
||||
return []
|
||||
except json.JSONDecodeError:
|
||||
self.logger.error(f"Invalid JSON format in {source_file}")
|
||||
return []
|
||||
|
||||
# Identify devices matching unknown_device_patterns
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
unknown_patterns = []
|
||||
for network in network_filters.values():
|
||||
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
||||
|
||||
for device in all_devices:
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
|
||||
for pattern in unknown_patterns:
|
||||
pattern = pattern.lower()
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
|
||||
self.logger.debug(f"Found unknown device: {name} ({hostname})")
|
||||
unknown_devices.append(device)
|
||||
break
|
||||
|
||||
self.logger.info(f"Found {len(unknown_devices)} unknown devices to process")
|
||||
return unknown_devices
|
||||
|
||||
def process_unknown_devices(self):
|
||||
"""Process unknown devices by checking for toggle button and configuring them.
|
||||
|
||||
This method:
|
||||
1. Identifies devices matching unknown_device_patterns
|
||||
2. Checks if each device has a toggle button (indicating it's a light/switch)
|
||||
3. Toggles the button at 1/2Hz while checking for hostname changes
|
||||
4. Prompts the user to enter a new name for the device in the console
|
||||
5. Once a name is entered, configures the device with the new hostname
|
||||
"""
|
||||
unknown_devices = self.get_unknown_devices()
|
||||
if not unknown_devices:
|
||||
self.logger.info("No unknown devices found to process")
|
||||
return
|
||||
|
||||
self.logger.info(f"Starting to process {len(unknown_devices)} unknown devices...")
|
||||
|
||||
for device in unknown_devices:
|
||||
name = device.get('name', 'Unknown')
|
||||
ip = device.get('ip')
|
||||
|
||||
if not ip:
|
||||
self.logger.warning(f"Skipping device {name} - no IP address")
|
||||
continue
|
||||
|
||||
self.logger.info(f"Processing unknown device: {name} at {ip}")
|
||||
|
||||
# Check if device has a toggle button
|
||||
try:
|
||||
# Get the main page to check for toggle button
|
||||
url = f"http://{ip}/"
|
||||
response = requests.get(url, timeout=5)
|
||||
|
||||
# Check if there's a toggle button in the response
|
||||
has_toggle = "toggle" in response.text.lower()
|
||||
|
||||
if has_toggle:
|
||||
self.logger.info(f"Device {name} has a toggle button, assuming it's a light switch or power plug")
|
||||
|
||||
# Start toggling at 1/2Hz
|
||||
original_hostname = device.get('hostname', '')
|
||||
toggle_state = False
|
||||
|
||||
# Temporarily disable all logging during toggling
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
||||
try:
|
||||
# Clear console output and show prompt
|
||||
print("\n" + "="*50)
|
||||
print(f"DEVICE: {name} at IP: {ip}")
|
||||
print(f"Current hostname: {original_hostname}")
|
||||
print("="*50)
|
||||
print("The device is now toggling to help you identify it.")
|
||||
|
||||
# Start toggling in background while waiting for input
|
||||
import threading
|
||||
stop_toggle = threading.Event()
|
||||
|
||||
def toggle_device():
|
||||
toggle_state = False
|
||||
while not stop_toggle.is_set():
|
||||
toggle_state = not toggle_state
|
||||
toggle_cmd = "Power On" if toggle_state else "Power Off"
|
||||
toggle_url = f"http://{ip}/cm?cmnd={toggle_cmd}"
|
||||
try:
|
||||
requests.get(toggle_url, timeout=2)
|
||||
except:
|
||||
pass
|
||||
time.sleep(2.0) # 1/2Hz rate
|
||||
|
||||
# Start toggle thread
|
||||
toggle_thread = threading.Thread(target=toggle_device)
|
||||
toggle_thread.daemon = True
|
||||
toggle_thread.start()
|
||||
|
||||
# Prompt for new hostname
|
||||
print("\nPlease enter a new name for this device:")
|
||||
new_hostname = input("> ").strip()
|
||||
|
||||
# Stop toggling
|
||||
stop_toggle.set()
|
||||
toggle_thread.join(timeout=3)
|
||||
|
||||
if new_hostname and new_hostname != original_hostname:
|
||||
print(f"Setting new hostname to: {new_hostname}")
|
||||
else:
|
||||
print("No valid hostname entered, skipping device")
|
||||
new_hostname = ""
|
||||
|
||||
finally:
|
||||
# Re-enable logging
|
||||
logging.disable(logging.NOTSET)
|
||||
|
||||
# If a new hostname was entered, configure the device
|
||||
if new_hostname:
|
||||
self.logger.info(f"Configuring device with new hostname: {new_hostname}")
|
||||
self.configure_unknown_device(ip, new_hostname)
|
||||
else:
|
||||
self.logger.warning(f"No new hostname provided for {name}, skipping configuration")
|
||||
else:
|
||||
self.logger.info(f"Device {name} does not have a toggle button, skipping")
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Error connecting to {name} at {ip}: {str(e)}")
|
||||
|
||||
def configure_unknown_device(self, ip, hostname):
|
||||
"""Configure an unknown device with the given hostname and MQTT settings."""
|
||||
try:
|
||||
# Set Friendly Name
|
||||
friendly_name_url = f"http://{ip}/cm?cmnd=FriendlyName1%20{hostname}"
|
||||
response = requests.get(friendly_name_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Set Friendly Name to {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to set Friendly Name to {hostname}")
|
||||
|
||||
# Enable MQTT if not already enabled
|
||||
mqtt_url = f"http://{ip}/cm?cmnd=SetOption3%20ON" # Enable MQTT
|
||||
response = requests.get(mqtt_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Enabled MQTT for {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to enable MQTT for {hostname}")
|
||||
|
||||
# Configure MQTT settings
|
||||
mqtt_config = self.config.get('mqtt', {})
|
||||
if mqtt_config:
|
||||
# Get the base hostname (everything before the dash)
|
||||
hostname_base = hostname.split('-')[0] if '-' in hostname else hostname
|
||||
|
||||
mqtt_fields = {
|
||||
"MqttHost": mqtt_config.get('Host', ''),
|
||||
"MqttPort": mqtt_config.get('Port', 1883),
|
||||
"MqttUser": mqtt_config.get('User', ''),
|
||||
"MqttPassword": mqtt_config.get('Password', ''),
|
||||
"Topic": hostname_base if mqtt_config.get('Topic') == '%hostname_base%' else mqtt_config.get('Topic', ''),
|
||||
"FullTopic": mqtt_config.get('FullTopic', '%prefix%/%topic%/'),
|
||||
}
|
||||
|
||||
for setting, value in mqtt_fields.items():
|
||||
url = f"http://{ip}/cm?cmnd={setting}%20{value}"
|
||||
response = requests.get(url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
if setting != 'MqttPassword':
|
||||
self.logger.info(f"{hostname}: Set {setting} to {value}")
|
||||
else:
|
||||
self.logger.info(f"{hostname}: Set MQTT Password")
|
||||
else:
|
||||
self.logger.error(f"{hostname}: Failed to set {setting}")
|
||||
|
||||
# Save configuration (will reboot the device)
|
||||
save_url = f"http://{ip}/cm?cmnd=Restart%201"
|
||||
response = requests.get(save_url, timeout=5)
|
||||
if response.status_code == 200:
|
||||
self.logger.info(f"Saved configuration and rebooted {hostname}")
|
||||
else:
|
||||
self.logger.error(f"Failed to save configuration for {hostname}")
|
||||
|
||||
return True
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
self.logger.error(f"Error configuring device at {ip}: {str(e)}")
|
||||
return False
|
||||
|
||||
def get_device_details(self, use_current_json=True):
|
||||
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings"""
|
||||
"""Connect to each Tasmota device via HTTP, gather details and validate MQTT settings.
|
||||
Filters out devices matching unknown_device_patterns."""
|
||||
self.logger.info("Starting to gather detailed device information...")
|
||||
device_details = []
|
||||
|
||||
@ -379,8 +573,8 @@ class TasmotaDiscovery:
|
||||
source_file = 'current.json' if use_current_json else 'tasmota.json'
|
||||
with open(source_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
devices = data.get('tasmota', {}).get('devices', [])
|
||||
self.logger.debug(f"Loaded {len(devices)} devices from {source_file}")
|
||||
all_devices = data.get('tasmota', {}).get('devices', [])
|
||||
self.logger.debug(f"Loaded {len(all_devices)} devices from {source_file}")
|
||||
except FileNotFoundError:
|
||||
self.logger.error(f"{source_file} not found. Run discovery first.")
|
||||
return
|
||||
@ -388,6 +582,31 @@ class TasmotaDiscovery:
|
||||
self.logger.error(f"Invalid JSON format in {source_file}")
|
||||
return
|
||||
|
||||
# Filter out devices matching unknown_device_patterns
|
||||
devices = []
|
||||
network_filters = self.config['unifi'].get('network_filter', {})
|
||||
unknown_patterns = []
|
||||
for network in network_filters.values():
|
||||
unknown_patterns.extend(network.get('unknown_device_patterns', []))
|
||||
|
||||
for device in all_devices:
|
||||
name = device.get('name', '').lower()
|
||||
hostname = device.get('hostname', '').lower()
|
||||
|
||||
is_unknown = False
|
||||
for pattern in unknown_patterns:
|
||||
pattern = pattern.lower()
|
||||
pattern = pattern.replace('.', r'\.').replace('*', '.*')
|
||||
if re.match(f"^{pattern}", name) or re.match(f"^{pattern}", hostname):
|
||||
self.logger.debug(f"Skipping unknown device: {name} ({hostname})")
|
||||
is_unknown = True
|
||||
break
|
||||
|
||||
if not is_unknown:
|
||||
devices.append(device)
|
||||
|
||||
self.logger.debug(f"Processing {len(devices)} devices after filtering unknown devices")
|
||||
|
||||
mqtt_config = self.config.get('mqtt', {})
|
||||
if not mqtt_config:
|
||||
self.logger.error("MQTT configuration missing from config file")
|
||||
@ -609,6 +828,8 @@ def main():
|
||||
help='Enable debug logging')
|
||||
parser.add_argument('--skip-unifi', action='store_true',
|
||||
help='Skip UniFi discovery and use existing current.json')
|
||||
parser.add_argument('--process-unknown', action='store_true',
|
||||
help='Process unknown devices (matching unknown_device_patterns) to set up names and MQTT')
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
@ -636,6 +857,10 @@ def main():
|
||||
print("\nStep 2: Getting detailed version information...")
|
||||
discovery.get_device_details(use_current_json=True)
|
||||
|
||||
if args.process_unknown:
|
||||
print("\nStep 3: Processing unknown devices...")
|
||||
discovery.process_unknown_devices()
|
||||
|
||||
print("\nProcess completed successfully!")
|
||||
print("- Device list saved to: current.json")
|
||||
print("- Detailed information saved to: TasmotaDevices.json")
|
||||
|
||||
@ -13,8 +13,9 @@
|
||||
"*sonos*"
|
||||
],
|
||||
"unknown_device_patterns": [
|
||||
"tasmota*",
|
||||
"ESP-*"
|
||||
"tasmota_",
|
||||
"esp-",
|
||||
"ESP-"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user