commit 43aa5630c91fa6aa146b476c9d5ca5ced315e912 Author: Thomas Schmid Date: Tue Jul 27 21:56:18 2021 +0200 Initial Commit Signed-off-by: Thomas Schmid diff --git a/cloudlaser.service b/cloudlaser.service new file mode 100644 index 0000000..0dd0e06 --- /dev/null +++ b/cloudlaser.service @@ -0,0 +1,8 @@ +[Unit] +Description=cloudlaser + +[Service] +ExecStart=/usr/bin/python3 /home/root/laser_cloud/main.py --config /home/root/laser_cloud/config.yaml + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..4f5e2f0 --- /dev/null +++ b/config.yaml @@ -0,0 +1,11 @@ +target: 192.168.0.99 +universe: 0 +max_sleep: 10.0 +min_sleep: 3.0 +positions: + - x: 10 + y: 10 + color: "pattern" + - x: 50 + y: 20 + color: "yellow" \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..a772568 --- /dev/null +++ b/main.py @@ -0,0 +1,124 @@ +import stupid_artnet.StupidArtnet as artnet +import time +import yaml +import argparse +from voluptuous import Schema, Required +import random +from dataclasses import dataclass + +CONFIG_SCHEME = Schema( + { + Required('target'): str, + Required('universe'): int, + Required('min_sleep'): float, + Required('max_sleep'): float, + Required('positions'): list + } +) + +COLORS = {'red': 40, 'green': 80, 'yellow': 1, "pattern": 120, "magic": 160} + +@dataclass(frozen=True) +class Cloud: + x: int + y: int + color: str + +def laser_on(artnet): + artnet.set_single_value(10, 0) + artnet.show() + +def laser_off(artnet): + artnet.set_single_value(10, 17) + artnet.show() + +def set_mode(artnet, mode): + artnet.set_single_value(1, mode) + +def set_color(artnet, color): + artnet.set_single_value(8, color) + +def set_pos(artnet, pos): + artnet.set_single_value(5, pos[0]) + artnet.set_single_value(6, pos[1]) + +def intTryParse(value): + try: + return int(value), True + except ValueError: + return value, False + +def load_config(config_path): + with open(config_path, "r") as f: + data = yaml.safe_load(f) + # validate config + CONFIG_SCHEME(data) + + positions = list() + for p in data['positions']: + positions.append(Cloud(x = p['x'], y = p['y'], color=p['color'])) + + data['positions'] = positions + return data + +def interactive(an, config): + import keyboard + + x = 0 + y = 0 + + set_color(an, 0x100) + + while True: + if keyboard.is_pressed("w"): + x = x + 1 + if keyboard.is_pressed("s"): + x = x - 1 + if keyboard.is_pressed("a"): + y = y - 1 + if keyboard.is_pressed("d"): + y = y + 1 + + set_pos(an, (x, y)) + time.sleep(0.05) + +def main(args): + config = load_config(args.config) + + # setup laser + an = artnet.StupidArtnet(targetIP=config['target'], universe = config['universe']) + an.start() + set_mode(an, 0xFF) + + if args.interactive: + interactive(an, config) + + positions = config['positions'] + + # loop through colors + while True: + p = random.choice(positions) + + # exclude current position from next choises + positions = list(set(config['positions']) - set([p])) + + # set position and choose color + set_pos(an, (p.x, p.y)) + + if p.color == 'random': + c = random.choice(list(COLORS.values())) + else: + c = COLORS.get(p.color) + + set_color(an, c) + + # wait random amount of times + sleep_time = config['min_sleep'] + (config['max_sleep'] - config['min_sleep']) * random.uniform(0, 1) + time.sleep(sleep_time) + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--config") + parser.add_argument("--interactive", action="store_true") + args = parser.parse_args() + main(args) diff --git a/stupid_artnet/StupidArtnet.py b/stupid_artnet/StupidArtnet.py new file mode 100644 index 0000000..28677a4 --- /dev/null +++ b/stupid_artnet/StupidArtnet.py @@ -0,0 +1,364 @@ +"""(Very) Simple Implementation of Artnet. + +Python Version: 3.6 +Source: http://artisticlicence.com/WebSiteMaster/User%20Guides/art-net.pdf + http://art-net.org.uk/wordpress/structure/streaming-packets/artdmx-packet-definition/ + +NOTES +- For simplicity: NET and SUBNET not used by default but optional + +""" + +import socket +from threading import Timer + + +class StupidArtnet(): + """(Very) simple implementation of Artnet.""" + + UDP_PORT = 6454 + + def __init__(self, targetIP='127.0.0.1', universe=0, packet_size=512, fps=30, receiver_needs_even_packet_size=True, broadcast=False): + """Class Initialization.""" + # Instance variables + self.TARGET_IP = targetIP + self.SEQUENCE = 0 + self.PHYSICAL = 0 + self.UNIVERSE = universe + self.SUB = 0 + self.NET = 0 + self.PACKET_SIZE = self.put_in_range(packet_size, 2, 512, receiver_needs_even_packet_size) + self.HEADER = bytearray() + self.BUFFER = bytearray(self.PACKET_SIZE) + + self.bMakeEven = receiver_needs_even_packet_size + + self.bIsSimplified = True # simplify use of universe, net and subnet + + # UDP SOCKET + self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + if broadcast: + self.s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + # Timer + self.fps = fps + + self.make_header() + + def __del__(self): + self.stop() + self.close() + + def __str__(self): + """Printable object state.""" + s = "===================================\n" + s += "Stupid Artnet initialized\n" + s += "Target IP: %s:%i \n" % (self.TARGET_IP, self.UDP_PORT) + s += "Universe: %i \n" % self.UNIVERSE + if not (self.bIsSimplified): + s += "Subnet: %i \n" % self.SUB + s += "Net: %i \n" % self.NET + s += "Packet Size: %i \n" % self.PACKET_SIZE + s += "===================================" + + return s + + def make_header(self): + """Make packet header.""" + # 0 - id (7 x bytes + Null) + self.HEADER = bytearray() + self.HEADER.extend(bytearray('Art-Net', 'utf8')) + self.HEADER.append(0x0) + # 8 - opcode (2 x 8 low byte first) + self.HEADER.append(0x00) + self.HEADER.append(0x50) # ArtDmx data packet + # 10 - prototocol version (2 x 8 high byte first) + self.HEADER.append(0x0) + self.HEADER.append(14) + # 12 - sequence (int 8), NULL for not implemented + self.HEADER.append(self.SEQUENCE) + # 13 - physical port (int 8) + self.HEADER.append(0x00) + # 14 - universe, (2 x 8 low byte first) + if (self.bIsSimplified): + # not quite correct but good enough for most cases: + # the whole net subnet is simplified + # by transforming a single uint16 into its 8 bit parts + # you will most likely not see any differences in small networks + v = self.shift_this(self.UNIVERSE) # convert to MSB / LSB + self.HEADER.append(v[1]) + self.HEADER.append(v[0]) + # 14 - universe, subnet (2 x 4 bits each) + # 15 - net (7 bit value) + else: + # as specified in Artnet 4 (remember to set the value manually after): + # Bit 3 - 0 = Universe (1-16) + # Bit 7 - 4 = Subnet (1-16) + # Bit 14 - 8 = Net (1-128) + # Bit 15 = 0 + # this means 16 * 16 * 128 = 32768 universes per port + # a subnet is a group of 16 Universes + # 16 subnets will make a net, there are 128 of them + self.HEADER.append(self.SUB << 4 | self.UNIVERSE) + self.HEADER.append(self.NET & 0xFF) + # 16 - packet size (2 x 8 high byte first) + v = self.shift_this(self.PACKET_SIZE) # convert to MSB / LSB + self.HEADER.append(v[0]) + self.HEADER.append(v[1]) + + def show(self): + """Finally send data.""" + packet = bytearray() + packet.extend(self.HEADER) + packet.extend(self.BUFFER) + try: + self.s.sendto(packet, (self.TARGET_IP, self.UDP_PORT)) + except Exception as e: + print("ERROR: Socket error with exception: %s" % e) + finally: + self.SEQUENCE = (self.SEQUENCE + 1) % 256 + + def close(self): + """Close UDP socket.""" + self.s.close() + + ## + # THREADING + ## + + def start(self): + """Starts thread clock.""" + self.show() + self.__clock = Timer((1000.0 / self.fps) / 1000.0, self.start) + self.__clock.daemon = True + self.__clock.start() + + def stop(self): + """Stops thread clock.""" + try: + self.__clock.cancel() + except: + pass + + ## + # SETTERS - HEADER + ## + + def set_universe(self, universe): + """Setter for universe (0 - 15 / 256). + + Mind if protocol has been simplified + """ + # This is ugly, trying to keep interface easy + # With simplified mode the universe will be split into two + # values, (uni and sub) which is correct anyway. Net will always be 0 + if (self.bIsSimplified): + self.UNIVERSE = self.put_in_range(universe, 0, 255, False) + else: + self.UNIVERSE = self.put_in_range(universe, 0, 15, False) + self.make_header() + + def set_subnet(self, sub): + """Setter for subnet address (0 - 15). + + Set simplify to false to use + """ + self.SUB = self.put_in_range(sub, 0, 15, False) + self.make_header() + + def set_net(self, net): + """Setter for net address (0 - 127). + + Set simplify to false to use + """ + self.NET = self.put_in_range(net, 0, 127, False) + self.make_header() + + def set_packet_size(self, packet_size): + """Setter for packet size (2 - 512, even only).""" + self.PACKET_SIZE = self.put_in_range(packet_size, 2, 512, self.bMakeEven) + self.make_header() + + ## + # SETTERS - DATA + ## + + def clear(self): + """Clear DMX buffer.""" + self.BUFFER = bytearray(self.PACKET_SIZE) + + def set(self, p): + """Set buffer.""" + if len(self.BUFFER) != self.PACKET_SIZE: + print("ERROR: packet does not match declared packet size") + return + self.BUFFER = p + + def set_16bit(self, address, value, high_first=False): + """Set single 16bit value in DMX buffer.""" + if address > self.PACKET_SIZE: + print("ERROR: Address given greater than defined packet size") + return + if address < 1 or address > 512 - 1: + print("ERROR: Address out of range") + return + value = self.put_in_range(value, 0, 65535, False) + + # Check for endianess + if (high_first): + self.BUFFER[address - 1] = (value >> 8) & 0xFF # high + self.BUFFER[address] = (value) & 0xFF # low + else: + self.BUFFER[address - 1] = (value) & 0xFF # low + self.BUFFER[address] = (value >> 8) & 0xFF # high + + def set_single_value(self, address, value): + """Set single value in DMX buffer.""" + if address > self.PACKET_SIZE: + print("ERROR: Address given greater than defined packet size") + return + if address < 1 or address > 512: + print("ERROR: Address out of range") + return + self.BUFFER[address - 1] = self.put_in_range(value, 0, 255, False) + + def set_single_rem(self, address, value): + """Set single value while blacking out others.""" + if address > self.PACKET_SIZE: + print("ERROR: Address given greater than defined packet size") + return + if address < 1 or address > 512: + print("ERROR: Address out of range") + return + self.clear() + self.BUFFER[address - 1] = self.put_in_range(value, 0, 255, False) + + def set_rgb(self, address, r, g, b): + """Set RGB from start address.""" + if address > self.PACKET_SIZE: + print("ERROR: Address given greater than defined packet size") + return + if address < 1 or address > 510: + print("ERROR: Address out of range") + return + + self.BUFFER[address - 1] = self.put_in_range(r, 0, 255, False) + self.BUFFER[address] = self.put_in_range(g, 0, 255, False) + self.BUFFER[address + 1] = self.put_in_range(b, 0, 255, False) + + ## + # AUX + ## + + def set_simplified(self, bDoSimplify): + """Builds Header accordingly. + + True - Header sends 16 bit universe value (OK but incorrect) + False - Headers sends Universe - Net and Subnet values as protocol + It is recommended that you set these values with .set_net() and set_physical + """ + if (bDoSimplify == self.bIsSimplified): + return + self.bIsSimplified = bDoSimplify + self.make_header() + + def see_header(self): + """Show header values.""" + print(self.HEADER) + + def see_buffer(self): + """Show buffer values.""" + print(self.BUFFER) + + def blackout(self): + """Sends 0's all across.""" + self.clear() + self.show() + + def flash_all(self): + """Sends 255's all across.""" + packet = bytearray(self.PACKET_SIZE) + [255 for i in packet] + # for i in range(self.PACKET_SIZE): + # packet[i] = 255 + self.set(packet) + self.show() + + ## + # UTILS + ## + + @staticmethod + def shift_this(number, high_first=True): + """Utility method: extracts MSB and LSB from number. + + Args: + number - number to shift + high_first - MSB or LSB first (true / false) + + Returns: + (high, low) - tuple with shifted values + + """ + low = (number & 0xFF) + high = ((number >> 8) & 0xFF) + if (high_first): + return((high, low)) + else: + return((low, high)) + print("Something went wrong") + return False + + @staticmethod + def put_in_range(number, range_min, range_max, make_even=True): + """Utility method: sets number in defined range. + + Args: + number - number to use + range_min - lowest possible number + range_max - highest possible number + make_even - should number be made even + + Returns: + number - number in correct range + + """ + if (number < range_min): + number = range_min + if (number > range_max): + number = range_max + if (make_even and number % 2 != 0): + number += 1 + return number + + +if __name__ == '__main__': + print("===================================") + print("Namespace run") + target_ip = '127.0.0.1' # typically in 2.x or 10.x range + universe = 15 # see docs + packet_size = 20 # it is not necessary to send whole universe + packet = bytearray(packet_size) + + a = StupidArtnet(target_ip, universe, packet_size) + a.set_simplified(False) + a.set_net(129) + a.set_subnet(16) + + print(a) + + a.set_single_value(13, 255) + a.set_single_value(14, 100) + a.set_single_value(15, 200) + + print("Sending values") + a.show() + a.see_buffer() + a.flash_all() + a.see_buffer() + a.show() + + print("Values sent") + + del a diff --git a/stupid_artnet/__init__.py b/stupid_artnet/__init__.py new file mode 100644 index 0000000..d8f773c --- /dev/null +++ b/stupid_artnet/__init__.py @@ -0,0 +1 @@ +# nothing yet diff --git a/stupid_artnet/__pycache__/StupidArtnet.cpython-39.pyc b/stupid_artnet/__pycache__/StupidArtnet.cpython-39.pyc new file mode 100644 index 0000000..800a23d Binary files /dev/null and b/stupid_artnet/__pycache__/StupidArtnet.cpython-39.pyc differ diff --git a/stupid_artnet/__pycache__/__init__.cpython-39.pyc b/stupid_artnet/__pycache__/__init__.cpython-39.pyc new file mode 100644 index 0000000..b56ba84 Binary files /dev/null and b/stupid_artnet/__pycache__/__init__.cpython-39.pyc differ