rewrote the artnet check pixel update system with a threading mechanisim to also update the settings

Signed-off-by: Ebbe Baß <ebbe.bass>
main
Ebbe Baß 2024-02-12 01:27:24 +01:00
parent 170f5e1773
commit 6b3c246674
2 changed files with 79 additions and 46 deletions

View File

@ -2,9 +2,10 @@ import os
import wifi
from stupidArtnet import *
from neopixel import *
import threading
import requests
import json
import time
from threading import Thread
# Replace with your server's IP address and port
SERVER_IP = '192.168.0.1' # Change to the actual IP of the PiXelTube Master
@ -15,15 +16,16 @@ wlan_mac_address = ':'.join(['{:02x}'.format((int(os.popen(f'cat /sys/class/net/
# Replace with the GPIO pin connected to the data input of the WS2812B LED strip
LED_STRIP_PIN = 18
LED_COUNT = 60
global LED_COUNT
LED_COUNT = 30
global LEDS_PER_PIXEL
LEDS_PER_PIXEL = 5
# Global variables for LED strip control
global strip
strip = Adafruit_NeoPixel(LED_COUNT, LED_STRIP_PIN, 800000, 10, False)
strip.begin()
def test_callback(data):
print(data)
def register_tube():
# Register or reauthenticate the tube with the server
try:
@ -36,27 +38,6 @@ def register_tube():
except requests.RequestException as e:
print(f'Registration failed: {e}')
def is_connected_to_wifi():
try:
ssid = wifi.current()
return ssid is not None
except wifi.exceptions.InterfaceError:
return False
def listen_to_artnet(universe, dmx_address, strip, pixelCount):
try:
artnet = StupidArtnetServer(universe=universe, callback_function=test_callback)
u_listener = artnet.register_listener(universe)
while True:
data = artnet.get_buffer(listener_id=u_listener)
if len(data) > 0:
print(data)
for i in range(pixelCount):
strip[i] = (data[(3 * i) + 0 + dmx_address], data[(3 * i) + 1 + dmx_address], data[(3 * i) + 2 + dmx_address])
except Exception as e:
print(f"Error: {e}")
def get_assigned_params():
try:
response = requests.get(f'http://{SERVER_IP}:{SERVER_PORT}/get_assigned_params/{wlan_mac_address}')
@ -69,20 +50,72 @@ def get_assigned_params():
except requests.RequestException as e:
print(f'Failed to fetch assigned parameters: {e}')
return None, None
def is_connected_to_wifi():
try:
ssid = wifi.current()
return ssid is not None
except wifi.exceptions.InterfaceError:
return False
def update_led_strip(dmx_values, dmx_address, strip, LED_PER_PIXEL):
for i in range(LED_COUNT):
pixel_index = i // LEDS_PER_PIXEL
dmx_index = dmx_address + (pixel_index * 3)
r = dmx_values[dmx_index]
g = dmx_values[dmx_index + 1]
b = dmx_values[dmx_index + 2]
strip.setPixelColor(i, Color(r, g, b))
# Update the LED strip
strip.show()
def listen_to_artnet(universe, dmx_address, strip, LED_PER_PIXEL):
try:
artnet = StupidArtnet()
artnet.start(universe=universe)
while True:
dmx_values = artnet.listen()
if dmx_values is not None:
update_led_strip(dmx_values, dmx_address, strip, LED_PER_PIXEL)
except Exception as e:
print(f"Error: {e}")
def loopCheckSettingUpdates():
while True:
try:
global universe
global dmx_address
universe, dmx_address = get_assigned_params()
except Exception as e:
print(f"Error: {e}")
def loopUpdatePixels():
while True:
try:
listen_to_artnet(universe, dmx_address, strip, LEDS_PER_PIXEL)
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
# Connect to Wi-Fi
if is_connected_to_wifi():
# Register/reauthenticate the tube
register_tube()
time.sleep(1)
global universe
global dmx_address
universe, dmx_address = get_assigned_params()
listen_to_artnet(universe, dmx_address, strip, LEDS_PER_PIXEL)
# Fetch assigned universe and DMX address
assigned_universe, assigned_dmx_address = get_assigned_params()
settingsUpdateThread = Thread(target=loopCheckSettingUpdates, args=())
pixelUpdateThread = Thread(target=loopUpdatePixels, args=())
if assigned_universe is not None and assigned_dmx_address is not None:
# Start a thread for listening to Art-Net messages
art_net_thread = threading.Thread(target=listen_to_artnet, args=(assigned_universe, assigned_dmx_address))
art_net_thread.start()
# Wait for the thread to finish (you can add more logic here as needed)
art_net_thread.join()
settingsUpdateThread.start()
pixelUpdateThread.start()

View File

@ -1,4 +1,3 @@
import socket
from flask import Flask, render_template, request, jsonify
import json
import MySQLdb
@ -31,6 +30,14 @@ db = MySQLdb.connect(
database=config['mysql']['database'],
)
# Function to retrieve registered tubes from the database
def get_tubes():
cur = db.cursor()
cur.execute("SELECT * FROM tubes")
tubes = cur.fetchall()
cur.close()
return tubes
# Function to register a tube in the database
def register_tube(mac_address):
cur = db.cursor()
@ -55,19 +62,12 @@ def register_tube_route():
register_tube(mac_address)
return jsonify({'success': True, 'message': 'Tube registered successfully.'})
# Function to retrieve registered tubes from the database
def get_tubes():
cur = db.cursor()
cur.execute("SELECT * FROM tubes")
tubes = cur.fetchall()
cur.close()
return tubes
@app.route('/get_assigned_params/<tube_id>', methods=['GET'])
def get_assigned_params(tube_id):
@app.route('/get_assigned_params/<tube_unique_id>', methods=['GET'])
def get_assigned_params(tube_unique_id):
try:
cur = db.cursor()
cur.execute("SELECT universe, dmx_address FROM tubes WHERE mac_address = %s", (tube_id,))
cur.execute("SELECT universe, dmx_address FROM tubes WHERE mac_address = %s", (tube_unique_id,))
result = cur.fetchone()
cur.close()