import stupid_artnet.StupidArtnet as artnet import yaml import argparse import random from dataclasses import dataclass import paho.mqtt.client as mqtt from threading import Thread, Event, Condition import json CONFIG = None COLORS = {'red': 40, 'green': 80, 'yellow': 1, "pattern": 120, "magic": 160} current_mode = "automatic" laser_commanded_state = None mode_change_evt = Event() state_changes_cv = Condition() def clamp(value, minv, maxv): return max(min(maxv, value), minv) class Laser: def __init__(self): self.channels = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0] self.connected = False self.an = None def to_json(self): return json.dumps({ "channels": self.channels }) def connect(self, targetIP, universe): self.an = artnet.StupidArtnet(targetIP=targetIP, universe=universe) self.an.start() self.connected = True def _send_channels(self, channels): if not channels: channels = range(1,9) for chn in channels: self.an.set_single_value(chn, self.channels[chn - 1]) def _set_channel(self, channel, value): self.channels[channel - 1] = value def _set_and_send_channel(self, channel, value): self._set_channel(channel, value) self._send_channels([channel]) def set_pos(self, x, y): x = clamp(x, 1, 127) y = clamp(y, 1, 127) self._set_channel(5, x) self._set_channel(6, y) self._send_channels([5,6]) def set_color(self, color): self._set_and_send_channel(8, color) def set_strobe(self, strobe): self._set_and_send_channel(3, strobe) def set_point_speed(self, point_speed): self._set_and_send_channel(4, point_speed) def set_mode(self, mode): self._set_and_send_channel(1, mode) def set_animation(self, animation): self._set_and_send_channel(2, animation) def set_zoom(self, zoom): self._set_and_send_channel(7, zoom) def set_reset(self, reset): self._set_and_send_channel(9, reset) def set_color_effect(self, color_effect): self._set_and_send_channel(10, color_effect) @dataclass(frozen=True) class Cloud: x: int y: int color: str def intTryParse(value): try: return int(value), True except ValueError: return value, False def load_config(config_path): with open(config_path, "r") as f: data = yaml.safe_load(f) positions = list() for p in data['positions']: positions.append(Cloud(x = p['x'], y = p['y'], color=p['color'])) data['positions'] = positions return data def on_mqtt_state_cmd(client, user_data, message): global current_mode, laser_commanded_state data = json.loads(message.payload.decode("ascii")) if "mode" not in data.keys() and "laser" not in data.keys(): return with state_changes_cv: if "mode" in data: new_mode = data['mode'] if current_mode != new_mode: current_mode = new_mode mode_change_evt.set() if "laser" in data: laser_commanded_state = data['laser'] state_changes_cv.notify_all() def on_mqtt_connect(client, user_data, flags, rc): base_topic = CONFIG['mqtt_base_topic'] client.publish(base_topic + "status", payload = "online", retain = True) # state cmd state_cmd_topic = base_topic + "state/cmd" client.subscribe(state_cmd_topic) client.message_callback_add(state_cmd_topic, on_mqtt_state_cmd) def mqtt_send_laser_state(client, laser): client.publish(topic = CONFIG["mqtt_base_topic"] + "state", payload = laser.to_json()) def auto_mode(laser, mqtt_client, stop_evt): print("start automatic mode") # loop through colors positions = CONFIG['positions'] while True: p = random.choice(positions) # exclude current position from next choises positions = list(set(CONFIG['positions']) - set([p])) # set position and choose color laser.set_pos(p.x, p.y) if p.color == 'random': c = random.choice(list(COLORS.values())) else: c = COLORS.get(p.color) laser.set_color(c) mqtt_send_laser_state(mqtt_client, laser) # wait random amount of time sleep_time = CONFIG['min_sleep'] + (CONFIG['max_sleep'] - CONFIG['min_sleep']) * random.uniform(0, 1) if stop_evt.wait(sleep_time): # abort signaled return def manual_mode(laser, mqtt_client, stop_evt): print("start manual mode") while True: with state_changes_cv: if not laser_commanded_state: continue pos = laser_commanded_state.get("pos") if pos: print(pos) laser.set_pos(pos["x"], pos["y"]) color = laser_commanded_state.get("color") if color: laser.set_color(color) mode = laser_commanded_state.get("mode") if mode: laser.set_mode(mode) strobe = laser_commanded_state.get("strobe") if strobe: laser.set_strobe(strobe) zoom = laser_commanded_state.get("zoom") if zoom: laser.set_point_speed(zoom) animation = laser_commanded_state.get("animation") if animation: laser.set_point_speed(animation) point_speed = laser_commanded_state.get("point_speed") if point_speed: laser.set_point_speed(point_speed) color = laser_commanded_state.get("color") if color: laser.set_color(color) color_effect = laser_commanded_state.get("color_effect") if color_effect: laser.set_color_effect(color_effect) mqtt_send_laser_state(mqtt_client, laser) state_changes_cv.wait() if stop_evt.is_set(): return def main(args): global CONFIG, current_mode CONFIG = load_config(args.config) # setup laser laser = Laser() laser.connect(targetIP = CONFIG['target'], universe = CONFIG['universe']) laser.set_mode(0xFF) # setup mqtt client = mqtt.Client() client.on_connect = on_mqtt_connect client.will_set(CONFIG['mqtt_base_topic'] + "status", payload = "offline", retain = True) client.connect(CONFIG['mqtt_broker']) client.loop_start() MODES = { "automatic": auto_mode, "manual": manual_mode } while True: mode_change_evt.clear() if current_mode not in MODES: current_mode = "automatic" mode_worker_fnc = MODES[current_mode] mode_worker_fnc(laser, client, mode_change_evt) print("change mode") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--config") parser.add_argument("--interactive", action="store_true") args = parser.parse_args() main(args)