cloud_laser/main.py

255 lines
7.0 KiB
Python
Raw Permalink Normal View History

import stupid_artnet.StupidArtnet as artnet
import yaml
import argparse
import random
from dataclasses import dataclass
import paho.mqtt.client as mqtt
from threading import Thread, Event, Condition
import json
CONFIG = None
COLORS = {'red': 40, 'green': 80, 'yellow': 1, "pattern": 120, "magic": 160}
current_mode = "automatic"
laser_commanded_state = None
mode_change_evt = Event()
state_changes_cv = Condition()
def clamp(value, minv, maxv):
return max(min(maxv, value), minv)
class Laser:
def __init__(self):
self.channels = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
self.connected = False
self.an = None
def to_json(self):
return json.dumps({
"channels": self.channels
})
def connect(self, targetIP, universe):
self.an = artnet.StupidArtnet(targetIP=targetIP, universe=universe)
self.an.start()
self.connected = True
def _send_channels(self, channels):
if not channels:
channels = range(1,9)
for chn in channels:
self.an.set_single_value(chn, self.channels[chn - 1])
def _set_channel(self, channel, value):
self.channels[channel - 1] = value
def _set_and_send_channel(self, channel, value):
self._set_channel(channel, value)
self._send_channels([channel])
def set_pos(self, x, y):
x = clamp(x, 1, 127)
y = clamp(y, 1, 127)
self._set_channel(5, x)
self._set_channel(6, y)
self._send_channels([5,6])
def set_color(self, color):
self._set_and_send_channel(8, color)
def set_strobe(self, strobe):
self._set_and_send_channel(3, strobe)
def set_point_speed(self, point_speed):
self._set_and_send_channel(4, point_speed)
def set_mode(self, mode):
self._set_and_send_channel(1, mode)
def set_animation(self, animation):
self._set_and_send_channel(2, animation)
def set_zoom(self, zoom):
self._set_and_send_channel(7, zoom)
def set_reset(self, reset):
self._set_and_send_channel(9, reset)
def set_color_effect(self, color_effect):
self._set_and_send_channel(10, color_effect)
@dataclass(frozen=True)
class Cloud:
x: int
y: int
color: str
def intTryParse(value):
try:
return int(value), True
except ValueError:
return value, False
def load_config(config_path):
with open(config_path, "r") as f:
data = yaml.safe_load(f)
positions = list()
for p in data['positions']:
positions.append(Cloud(x = p['x'], y = p['y'], color=p['color']))
data['positions'] = positions
return data
def on_mqtt_state_cmd(client, user_data, message):
global current_mode, laser_commanded_state
data = json.loads(message.payload.decode("ascii"))
if "mode" not in data.keys() and "laser" not in data.keys():
return
with state_changes_cv:
if "mode" in data:
new_mode = data['mode']
if current_mode != new_mode:
current_mode = new_mode
mode_change_evt.set()
if "laser" in data:
laser_commanded_state = data['laser']
state_changes_cv.notify_all()
def on_mqtt_connect(client, user_data, flags, rc):
base_topic = CONFIG['mqtt_base_topic']
client.publish(base_topic + "status", payload = "online", retain = True)
# state cmd
state_cmd_topic = base_topic + "state/cmd"
client.subscribe(state_cmd_topic)
client.message_callback_add(state_cmd_topic, on_mqtt_state_cmd)
def mqtt_send_laser_state(client, laser):
client.publish(topic = CONFIG["mqtt_base_topic"] + "state", payload = laser.to_json())
def auto_mode(laser, mqtt_client, stop_evt):
print("start automatic mode")
# loop through colors
positions = CONFIG['positions']
while True:
p = random.choice(positions)
# exclude current position from next choises
positions = list(set(CONFIG['positions']) - set([p]))
# set position and choose color
laser.set_pos(p.x, p.y)
if p.color == 'random':
c = random.choice(list(COLORS.values()))
else:
c = COLORS.get(p.color)
laser.set_color(c)
mqtt_send_laser_state(mqtt_client, laser)
# wait random amount of time
sleep_time = CONFIG['min_sleep'] + (CONFIG['max_sleep'] - CONFIG['min_sleep']) * random.uniform(0, 1)
if stop_evt.wait(sleep_time):
# abort signaled
return
def manual_mode(laser, mqtt_client, stop_evt):
print("start manual mode")
while True:
with state_changes_cv:
if not laser_commanded_state:
continue
pos = laser_commanded_state.get("pos")
if pos:
print(pos)
laser.set_pos(pos["x"], pos["y"])
color = laser_commanded_state.get("color")
if color:
laser.set_color(color)
mode = laser_commanded_state.get("mode")
if mode:
laser.set_mode(mode)
strobe = laser_commanded_state.get("strobe")
if strobe:
laser.set_strobe(strobe)
zoom = laser_commanded_state.get("zoom")
if zoom:
laser.set_point_speed(zoom)
animation = laser_commanded_state.get("animation")
if animation:
laser.set_point_speed(animation)
point_speed = laser_commanded_state.get("point_speed")
if point_speed:
laser.set_point_speed(point_speed)
color = laser_commanded_state.get("color")
if color:
laser.set_color(color)
color_effect = laser_commanded_state.get("color_effect")
if color_effect:
laser.set_color_effect(color_effect)
mqtt_send_laser_state(mqtt_client, laser)
state_changes_cv.wait()
if stop_evt.is_set():
return
def main(args):
global CONFIG, current_mode
CONFIG = load_config(args.config)
# setup laser
laser = Laser()
laser.connect(targetIP = CONFIG['target'], universe = CONFIG['universe'])
laser.set_mode(0xFF)
# setup mqtt
client = mqtt.Client()
client.on_connect = on_mqtt_connect
client.will_set(CONFIG['mqtt_base_topic'] + "status", payload = "offline", retain = True)
client.connect(CONFIG['mqtt_broker'])
client.loop_start()
MODES = {
"automatic": auto_mode,
"manual": manual_mode
}
while True:
mode_change_evt.clear()
if current_mode not in MODES:
current_mode = "automatic"
mode_worker_fnc = MODES[current_mode]
mode_worker_fnc(laser, client, mode_change_evt)
print("change mode")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--config")
parser.add_argument("--interactive", action="store_true")
args = parser.parse_args()
main(args)