switched from queue to shared mem

Signed-off-by: Ebbe Baß <ebbe.bass>
main
Ebbe Baß 2024-02-18 00:59:18 +01:00
parent dd5d64eed3
commit 2c264ac6d6
1 changed files with 8 additions and 8 deletions

View File

@ -7,7 +7,7 @@ import os
from getmac import get_mac_address from getmac import get_mac_address
import time import time
import sys import sys
from multiprocessing import Process, Queue from multiprocessing import Process, shared_memory
import asyncio import asyncio
app = Flask(__name__) app = Flask(__name__)
@ -109,13 +109,13 @@ def connect_mqtt():
client.connect("localhost", 1883) client.connect("localhost", 1883)
return client return client
def mqtt_publisher(data_queue): def mqtt_publisher(shared_mem):
# Create and start a thread for each universe # Create and start a thread for each universe
mqtt_client = connect_mqtt() mqtt_client = connect_mqtt()
artnetBindIp = get_eth0_ip() artnetBindIp = get_eth0_ip()
artNet = Artnet.Artnet(BINDIP = artnetBindIp, DEBUG = True, SHORTNAME = "PiXelTubeMaster", LONGNAME = "PiXelTubeMaster", PORT = 6454) artNet = Artnet.Artnet(BINDIP = artnetBindIp, DEBUG = True, SHORTNAME = "PiXelTubeMaster", LONGNAME = "PiXelTubeMaster", PORT = 6454)
while True: while True:
tube_index = data_queue.get() tube_index = list(bytes(shared_mem.buf[:24]).decode())
try: try:
# Gets whatever the last Art-Net packet we received is # Gets whatever the last Art-Net packet we received is
artNetPacket = artNet.readPacket() artNetPacket = artNet.readPacket()
@ -140,24 +140,24 @@ def mqtt_publisher(data_queue):
artNet.close() artNet.close()
sys.exit() sys.exit()
def tube_index_updater(data_queue): def tube_index_updater(shared_mem):
while True: while True:
try: try:
cur = db.cursor() cur = db.cursor()
cur.execute("SELECT mac_address, universe, dmx_address FROM tubes") cur.execute("SELECT mac_address, universe, dmx_address FROM tubes")
tube_index = cur.fetchall() tube_index = cur.fetchall()
cur.close() cur.close()
data_queue.put(tube_index) shared_mem.buf[:24] = str(tube_index).encode()
print("Updated tube index with values: "+str(tube_index)) print("Updated tube index with values: "+str(tube_index))
except Exception as e: except Exception as e:
print(e) print(e)
time.sleep(5) time.sleep(5)
if __name__ == "__main__": if __name__ == "__main__":
data_queue = Queue() shared_mem = shared_memory.SharedMemory(name='tube_index', size=1024, create=True)
ti_updater_thread = Process(target=tube_index_updater, args=(data_queue, )) ti_updater_thread = Process(target=tube_index_updater, args=(shared_mem, ))
ti_updater_thread.start() ti_updater_thread.start()
publisher_thread = Process(target=mqtt_publisher, args=(data_queue, )) publisher_thread = Process(target=mqtt_publisher, args=(shared_mem, ))
publisher_thread.start() publisher_thread.start()
flask_thread = Process(target=flask_api) flask_thread = Process(target=flask_api)
flask_thread.start() flask_thread.start()