updated to new artnet_python packet

Signed-off-by: Ebbe Baß <ebbe.bass>
main
Ebbe Baß 2024-05-12 21:13:10 +02:00
parent d892eed7f6
commit afcd109946
3 changed files with 56 additions and 34 deletions

View File

@ -6,23 +6,24 @@ import paho.mqtt.client as mqtt
from getmac import get_mac_address
import board
# var's for web api server. Change them if you use another IP scheme for the DHCP.
SERVER_IP = '192.168.0.1'
SERVER_PORT = 5000
# Dynamically obtain the MAC address of the WLAN interface
# get the current MAC address from the wlan0 interface to use it as the unique id in the DB
wlan_mac_address = str(get_mac_address(interface="wlan0"))
# Replace with the GPIO pin connected to the data input of the WS2812B LED strip
# var's for pin and led count to use them in the neopixel module
LED_STRIP_PIN = board.D18
global LED_COUNT
LED_COUNT = 30
# Global variables for LED strip control
# global variables for LED strip control
global strip
strip = neopixel.NeoPixel(pin = board.D18, n = LED_COUNT, auto_write = True)
def register_tube():
# Register or reauthenticate the tube with the server
# register or reauthenticate the tube with the server
try:
response = requests.post(f'http://{SERVER_IP}:{SERVER_PORT}/register_tube', data={'mac_address': wlan_mac_address})
print(response)
@ -35,6 +36,7 @@ def register_tube():
print(f'Registration failed: {e}')
def get_assigned_params():
# request the own settings via the unique ID
try:
response = requests.get(f'http://{SERVER_IP}:{SERVER_PORT}/get_assigned_params/{wlan_mac_address}')
data = response.json()
@ -48,52 +50,59 @@ def get_assigned_params():
return None, None
def is_connected_to_wifi():
# check via subprocess command if you wifi is connected to the AP
output = subprocess.check_output(['iwgetid']).decode()
return output.split('"')[1]
def update_led_strip(rgb_values, pixel, strip):
# set retraived RGB values to the spicified pixel
strip[int(pixel)] = rgb_values
def on_message(mqttc, obj, msg):
# get the rgb value list on new mqtt message from the subscribed topic
global rgb_values_list
rgb_values_list = eval(msg.payload.decode())
if __name__ == "__main__":
# Connect to Wi-Fi
# run wifi connection check
if is_connected_to_wifi() is not None:
# Register/reauthenticate the tube
# if connected successfully run registration/reauthentication process
register_tube()
time.sleep(1)
# open MQTT client
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.connect("192.168.0.1", 1883, 60)
mqttc.on_message = on_message
mqttc.subscribe("tube-"+str(wlan_mac_address)+"/pixel_colors", 0)
# create a default rgb value list to get a black output
global rgb_values_list
rgb_values_list = eval("['[0, 0, 0]', '[0, 0, 0]', '[0, 0, 0]', '[0, 0, 0]', '[0, 0, 0]', '[0, 0, 0]']")
# start MQTT server in background loop
mqttc.loop_start()
# while True:
# try:
# for pixel in range(5):
# update_led_strip(tuple(eval(rgb_values_list[0])), pixel, strip)
while True:
# try updating the single pixels
try:
for pixel in range(5):
update_led_strip(tuple(eval(rgb_values_list[0])), pixel, strip)
# for pixel in range(5, 10):
# update_led_strip(tuple(eval(rgb_values_list[1])), pixel, strip)
for pixel in range(5, 10):
update_led_strip(tuple(eval(rgb_values_list[1])), pixel, strip)
# for pixel in range(10, 15):
# update_led_strip(tuple(eval(rgb_values_list[2])), pixel, strip)
for pixel in range(10, 15):
update_led_strip(tuple(eval(rgb_values_list[2])), pixel, strip)
# for pixel in range(15, 20):
# update_led_strip(tuple(eval(rgb_values_list[3])), pixel, strip)
for pixel in range(15, 20):
update_led_strip(tuple(eval(rgb_values_list[3])), pixel, strip)
# for pixel in range(20, 25):
# update_led_strip(tuple(eval(rgb_values_list[4])), pixel, strip)
for pixel in range(20, 25):
update_led_strip(tuple(eval(rgb_values_list[4])), pixel, strip)
# for pixel in range(25, 30):
# update_led_strip(tuple(eval(rgb_values_list[5])), pixel, strip)
# except KeyboardInterrupt:
# for led in range(LED_COUNT):
# update_led_strip((0, 0, 0), led, strip)
update_led_strip((255, 255, 255), 1, strip)
for pixel in range(25, 30):
update_led_strip(tuple(eval(rgb_values_list[5])), pixel, strip)
except KeyboardInterrupt:
# if code stop via cntrl+c blackout
for led in range(LED_COUNT):
update_led_strip((0, 0, 0), led, strip)

View File

@ -10,8 +10,10 @@ from multiprocessing import Process, Pipe, Queue
from queue import Empty
import json
# create flask webaüüs
app = Flask(__name__)
# get macaddress of wlan0 interface
wlan_mac_address = str(get_mac_address(interface="wlan0"))
# Read configuration from config.json
@ -30,7 +32,8 @@ except FileNotFoundError:
}
with open('config.json', 'w') as config_file:
json.dump(config, config_file, indent=4)
# create DB client from var's
database = config['mysql']['database']
db = connect(
@ -42,6 +45,7 @@ db = connect(
db.autocommit(True)
# set client id via unique id
mqtt_client_id = "PiXelTubeMaster-"+wlan_mac_address
# Function to register a tube in the database
@ -83,6 +87,7 @@ def get_assigned_params(tube_unique_id):
except Exception as e:
return jsonify({'success': False, 'message': f'Error: {e}'})
# run flask api
def flask_api():
app.run(host='192.168.0.1', port=5000)
@ -96,10 +101,11 @@ def get_eth0_ip():
exit
def on_connect(client, userdata, flags, reason_code, properties):
if reason_code == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", str(reason_code))
# print debug message when mqtt client is connected
if reason_code == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", str(reason_code))
def connect_mqtt():
# Set Connecting Client ID
@ -110,14 +116,18 @@ def connect_mqtt():
return client
def mqtt_publisher(ti_queue):
# set index and old data index to none as default value
tube_index = None
tube_index_old = None
# Create and start a thread for each universe
mqtt_client = connect_mqtt()
artnetBindIp = get_eth0_ip()
# start arnet listner
artNet = Artnet.Artnet(BINDIP = artnetBindIp, DEBUG = True, SHORTNAME = "PiXelTubeMaster", LONGNAME = "PiXelTubeMaster", PORT = 6454)
# start publishing process
while True:
try:
# try to get new tube index and if it fails use old index
try:
tube_index = ti_queue.get(block=False)
tube_index_old = tube_index
@ -127,12 +137,12 @@ def mqtt_publisher(ti_queue):
else:
tube_index = None
# Gets whatever the last Art-Net packet we received is
artNetPacket = artNet.readPacket()
artNetPacket = artNet.readBuffer()
# Make sure we actually *have* a packet
if artNetPacket is not None:
if tube_index is not None:
for index_row in tube_index:
if artNetPacket.universe == int(index_row[1]):
if artNetPacket[int(index_row[1])]:
dmx_address = int(index_row[2])
#Define RGB values per pixel
p1_g, p1_b, p1_r, p2_g, p2_b, p2_r, p3_g, p3_b, p3_r, p4_g, p4_b, p4_r, p5_g, p5_b, p5_r, p6_g, p6_b, p6_r = artNetPacket.data[dmx_address-1], artNetPacket.data[dmx_address], artNetPacket.data[dmx_address+1], artNetPacket.data[dmx_address+2], artNetPacket.data[dmx_address+3], artNetPacket.data[dmx_address+4], artNetPacket.data[dmx_address+5], artNetPacket.data[dmx_address+6], artNetPacket.data[dmx_address+7], artNetPacket.data[dmx_address+8], artNetPacket.data[dmx_address+9], artNetPacket.data[dmx_address+10], artNetPacket.data[dmx_address+11], artNetPacket.data[dmx_address+12], artNetPacket.data[dmx_address+13], artNetPacket.data[dmx_address+14], artNetPacket.data[dmx_address+15], artNetPacket.data[dmx_address+16]
@ -145,11 +155,13 @@ def mqtt_publisher(ti_queue):
result_str = [str(color) for color in colors]
result = str(result_str)
mqtt_client.publish(p1_topic, result)
# close artnet if KeyboardInterrupt
except KeyboardInterrupt:
artNet.close()
def tube_index_updater(ti_queue):
while True:
# try to create db cursor and get tube index data tu put in into queue
try:
cur = db.cursor()
cur.execute("SELECT mac_address, universe, dmx_address FROM tubes")
@ -161,6 +173,7 @@ def tube_index_updater(ti_queue):
time.sleep(5)
if __name__ == "__main__":
# start pipe as tube index receiver sender
(ti_receiver,ti_sender) = Pipe(True)
ti_queue = Queue()
ti_updater_thread = Process(target=tube_index_updater, args=(ti_queue, ))

View File

@ -27,9 +27,9 @@ artNet = Artnet.Artnet(artnetBindIp, DEBUG=debug)
while True:
try:
# Read latest ArtNet packet
artNetPacket = artNet.readPacket()
artNetPacket = artNet.readBuffer()[artnetUniverse]
# Print out the universe of said packet
print("My univ: "+str(artNetPacket.universe))
print("My univ: "+str(artNetPacket.data))
# Big red stop button just in case
except KeyboardInterrupt: