add mqtt interface and rework code to support manual and automatic mode

Signed-off-by: Thomas Schmid <tom@lfence.de>
This commit is contained in:
Thomas 2021-07-28 19:47:13 +02:00
parent 9640e28b48
commit 838a739b7c
2 changed files with 154 additions and 86 deletions

View File

@ -1,4 +1,6 @@
target: 192.168.0.99
mqtt_base_topic: kitchen/laser/
mqtt_broker: bklaser
universe: 0
max_sleep: 10.0
min_sleep: 3.0

238
main.py
View File

@ -1,47 +1,70 @@
import stupid_artnet.StupidArtnet as artnet
import time
import yaml
import argparse
from voluptuous import Schema, Required
import random
from dataclasses import dataclass
import paho.mqtt.client as mqtt
from threading import Thread, Event, Condition
import json
CONFIG_SCHEME = Schema(
{
Required('target'): str,
Required('universe'): int,
Required('min_sleep'): float,
Required('max_sleep'): float,
Required('positions'): list
}
)
CONFIG = None
COLORS = {'red': 40, 'green': 80, 'yellow': 1, "pattern": 120, "magic": 160}
current_mode = "automatic"
laser_commanded_state = None
mode_change_evt = Event()
state_changes_cv = Condition()
def clamp(value, minv, maxv):
return max(min(maxv, value), minv)
class Laser:
def __init__(self):
self.channels = [0, 0, 0, 0, 0, 0, 0, 0]
self.connected = False
self.an = None
def to_json(self):
return json.dumps({
"channels": self.channels
})
def connect(self, targetIP, universe):
self.an = artnet.StupidArtnet(targetIP=targetIP, universe=universe)
self.an.start()
self.connected = True
def _send_channels(self, channels):
if not channels:
channels = range(1,9)
for chn in channels:
self.an.set_single_value(chn, self.channels[chn - 1])
def _set_channel(self, channel, value):
self.channels[channel - 1] = value
def set_pos(self, x, y):
x = clamp(x, 1, 127)
y = clamp(x, 1, 127)
self._set_channel(5, x)
self._set_channel(6, y)
self._send_channels([5,6])
def set_color(self, color):
pass
def set_mode(self, mode):
self._set_channel(1, mode)
self._send_channels([1])
@dataclass(frozen=True)
class Cloud:
x: int
y: int
color: str
def laser_on(artnet):
artnet.set_single_value(10, 0)
artnet.show()
def laser_off(artnet):
artnet.set_single_value(10, 17)
artnet.show()
def set_mode(artnet, mode):
artnet.set_single_value(1, mode)
def set_color(artnet, color):
artnet.set_single_value(8, color)
def set_pos(artnet, pos):
artnet.set_single_value(5, pos[0])
artnet.set_single_value(6, pos[1])
def intTryParse(value):
try:
return int(value), True
@ -51,8 +74,6 @@ def intTryParse(value):
def load_config(config_path):
with open(config_path, "r") as f:
data = yaml.safe_load(f)
# validate config
CONFIG_SCHEME(data)
positions = list()
for p in data['positions']:
@ -61,79 +82,124 @@ def load_config(config_path):
data['positions'] = positions
return data
def interactive(an, config):
import keyboard
def on_mqtt_state_cmd(client, user_data, message):
global current_mode, laser_commanded_state
data = json.loads(message.payload.decode("ascii"))
x = 0
y = 0
color_keys = list(COLORS.keys())
c = 0
set_color(an, COLORS[color_keys[c]])
print_pos = lambda x,y: print(f"x:{x} y:{y}")
while True:
if keyboard.is_pressed("a"):
x = x + 1
print_pos(x,y)
if keyboard.is_pressed("d"):
x = x - 1
print_pos(x,y)
if keyboard.is_pressed("w"):
y = y - 1
print_pos(x,y)
if keyboard.is_pressed("s"):
y = y + 1
print_pos(x,y)
if keyboard.is_pressed(" "):
pass
#save point in config file
if keyboard.is_pressed("c"):
c = c + 1
if c >= len(color_keys):
c = 0
print(f"color: {color_keys[c]}")
set_color(an, COLORS[color_keys[c]])
set_pos(an, (x, y))
time.sleep(0.05)
def main(args):
config = load_config(args.config)
if "mode" not in data.keys() and "laser" not in data.keys():
return
# setup laser
an = artnet.StupidArtnet(targetIP=config['target'], universe = config['universe'])
an.start()
set_mode(an, 0xFF)
with state_changes_cv:
if "mode" in data:
new_mode = data['mode']
if args.interactive:
interactive(an, config)
if current_mode != new_mode:
current_mode = new_mode
mode_change_evt.set()
positions = config['positions']
if "laser" in data:
laser_commanded_state = data['laser']
state_changes_cv.notify_all()
def on_mqtt_connect(client, user_data, flags, rc):
base_topic = CONFIG['mqtt_base_topic']
client.publish(base_topic + "status", payload = "online", retain = True)
# state cmd
state_cmd_topic = base_topic + "state/cmd"
client.subscribe(state_cmd_topic)
client.message_callback_add(state_cmd_topic, on_mqtt_state_cmd)
def mqtt_send_laser_state(client, laser):
client.publish(topic = CONFIG["mqtt_base_topic"] + "state", payload = laser.to_json())
def auto_mode(laser, mqtt_client, stop_evt):
print("start automatic mode")
# loop through colors
positions = CONFIG['positions']
while True:
p = random.choice(positions)
# exclude current position from next choises
positions = list(set(config['positions']) - set([p]))
positions = list(set(CONFIG['positions']) - set([p]))
# set position and choose color
set_pos(an, (p.x, p.y))
laser.set_pos(p.x, p.y)
if p.color == 'random':
c = random.choice(list(COLORS.values()))
else:
c = COLORS.get(p.color)
set_color(an, c)
laser.set_color(c)
# wait random amount of times
sleep_time = config['min_sleep'] + (config['max_sleep'] - config['min_sleep']) * random.uniform(0, 1)
time.sleep(sleep_time)
mqtt_send_laser_state(mqtt_client, laser)
# wait random amount of time
sleep_time = CONFIG['min_sleep'] + (CONFIG['max_sleep'] - CONFIG['min_sleep']) * random.uniform(0, 1)
if stop_evt.wait(sleep_time):
# abort signaled
return
def manual_mode(laser, mqtt_client, stop_evt):
print("start manual mode")
while True:
with state_changes_cv:
if not laser_commanded_state:
continue
pos = laser_commanded_state.get("pos")
if pos:
print(pos)
laser.set_pos(pos["x"], pos["y"])
color = laser_commanded_state.get("color")
if color:
laser.set_color(color)
mode = laser_commanded_state.get("mode")
if mode:
laser.set_mode(mode)
mqtt_send_laser_state(mqtt_client, laser)
state_changes_cv.wait()
if stop_evt.is_set():
return
def main(args):
global CONFIG, current_mode
CONFIG = load_config(args.config)
# setup laser
laser = Laser()
laser.connect(targetIP = CONFIG['target'], universe = CONFIG['universe'])
laser.set_mode(0xFF)
# setup mqtt
client = mqtt.Client()
client.on_connect = on_mqtt_connect
client.will_set(CONFIG['mqtt_base_topic'] + "status", payload = "offline", retain = True)
client.connect(CONFIG['mqtt_broker'])
client.loop_start()
MODES = {
"automatic": auto_mode,
"manual": manual_mode
}
while True:
mode_change_evt.clear()
if current_mode not in MODES:
current_mode = "automatic"
mode_worker_fnc = MODES[current_mode]
mode_worker_fnc(laser, client, mode_change_evt)
print("change mode")
if __name__ == "__main__":
parser = argparse.ArgumentParser()