from threading import Event, Condition import paho.mqtt.client as mqtt import argparse import json import urwid import sys import signal from urwid import signals from urwid.util import _tagmarkup_recurse client = None CONFIG = dict() x = 0 y = 0 c = 0 def labled_widget(label, widget): return urwid.Columns([ urwid.Text(label), widget ]) class GridFlowMenu(urwid.WidgetWrap): def __init__(self, choices, callback): self.buttons = dict() for choice in choices: b = urwid.Button(choice) self.buttons[choice] = b signals.connect_signal(b, "click", callback, choice) super().__init__(urwid.GridFlow(self.buttons.values(), 12, 1, 1, "center")) class Slider(urwid.Widget): _selectable=True _sizing = frozenset(["flow"]) def __init__(self, value = 0, max = 127, min = 0, step = 1): signals.register_signal(self.__class__, ["value_changed"]) if max <= min or min >= max: raise ValueError() self.max = max self.min = min self.step = step self.last_focus = False self.input_digits = "" self.set_value(value) def set_value(self, value, emit=False): self.value = min(max(value, self.min), self.max) self.ratio = float(value-self.min)/float(self.max - self.min) self.ratio = min(max(self.ratio, 0.0), 1.0) if emit: self._emit("value_changed", self.value) self._invalidate() def rows(self, size, focus=False): return 1 def keypress(self, size, key): if key == "left": self.set_value(self.value - self.step, True) elif key == "right": self.set_value(self.value + self.step, True) elif key.isdigit(): self.input_digits += key self.set_value(int(self.input_digits), True) else: return key def lost_focus(self): self.input_digits = "" def render(self, size, focus=False): if not focus and self.last_focus: self.lost_focus() if focus != self.last_focus: self.last_focus = focus (maxcol,) = size col1 = int(float(maxcol) * self.ratio) col2 = int(maxcol - col1) return urwid.CanvasJoin([ (urwid.SolidCanvas(chr(0x2593) if not focus else chr(9608), col1, 1), None, False, col1), (urwid.SolidCanvas(chr(9472) if not focus else '>', col2, 1), None, False, col2) ]) class LabelSlider(urwid.WidgetWrap): def __init__(self, min = 0, max = 100, value = 0, step = 1, value_format="{:3d}"): self.slider = Slider(value = value, max = max, min = min, step = step) self.value_format = value_format self.text = urwid.Text(value_format.format(value)) signals.connect_signal(self.slider, "value_changed", self.slider_changed) super().__init__(urwid.Columns([self.slider, ('pack', self.text)], focus_column=0, dividechars = 2)) def render_text(self, value): self.text.set_text(self.value_format.format(value)) def slider_changed(self, slider, value): self.render_text(value) def set_value(self, value, emit=False): self.slider.set_value(value, emit=False) self.render_text(value) def get_value(self): return self.slider.value def get_slider(self): return self.slider class myFrame(urwid.Frame): def keypress(self, size, key): if key == "tab": next_focus = "footer" if self.focus_position == "body" else "body" self.set_focus(next_focus) else: return super().keypress(size, key) class View: def __init__(self, laser): self.channel_view = self.build_channel_box() self.main_view = self.build_main_box() self.frame = myFrame(body=self.channel_view, footer=GridFlowMenu(choices=["Main", "Channels"], callback= self.change_view)) #fill = urwid.Filler(frame, "top") self.loop = urwid.MainLoop(self.frame) def change_view(self, button, choice): if choice == "Main": self.frame.set_body(self.main_view) elif choice == "Channels": self.frame.set_body(self.channel_view) def mode_select_callback(self, radiobutton, state): if radiobutton.state: pass def build_main_box(self): fill = urwid.Filler(urwid.Text("Hello World", "left")) self.mode_rb = [] mode = labled_widget("Mode", urwid.Pile([ urwid.RadioButton(self.mode_rb, "Manual"), urwid.RadioButton(self.mode_rb, "Automatic") ])) for e in self.mode_rb: signals.connect_signal(e, "change", self.mode_select_callback) self.effect_rb =[] effect = labled_widget("Effect:", urwid.Pile([ urwid.RadioButton(self.effect_rb, "Effect 1"), urwid.RadioButton(self.effect_rb, "Effect 2"), urwid.RadioButton(self.effect_rb, "Effect 3"), urwid.RadioButton(self.effect_rb, "Effect 4") ])) fill = urwid.Filler(urwid.Pile([mode, urwid.Divider(), effect])) return urwid.LineBox(fill, title="Main") def build_channel_box(self): self.mode = LabelSlider(min = 0, max = 0xFF) self.animation = LabelSlider(min = 0, max=0xFF) self.strobe = LabelSlider(min = 0, max = 0xFF) self.point_speed = LabelSlider(min = 0, max = 0xFF) self.x = LabelSlider(min = 0, max = 0xFF) self.y = LabelSlider(min = 0, max = 0xFF) self.zoom = LabelSlider(min = 0, max = 0xFF) self.color = LabelSlider(min = 0, max = 0xFF) self.reset = LabelSlider(min = 0, max = 0xFF) self.color_effect = LabelSlider(min = 0, max = 0xFF) signals.connect_signal(self.mode.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.animation.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.strobe.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.point_speed.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.x.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.y.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.zoom.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.color.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.reset.get_slider(), "value_changed", self.value_changed_callback) signals.connect_signal(self.color_effect.get_slider(), "value_changed", self.value_changed_callback) self.labels = urwid.Pile([ urwid.Text("Mode:"), urwid.Text("Animation:"), urwid.Text("Strobe:"), urwid.Text("Effect:"), urwid.Text("X:"), urwid.Text("Y:"), urwid.Text("Zoom:"), urwid.Text("Color:"), urwid.Text("Reset:"), urwid.Text("Color-Effect:")] ) self.values = urwid.Pile([ self.mode, self.animation, self.strobe, self.point_speed, self.x, self.y, self.zoom, self.color, self.reset, self.color_effect ]) grid = urwid.Columns([('weight', 0.25, self.labels), self.values]) return urwid.LineBox(urwid.Filler(grid), title="Channels") def value_changed_callback(self, slider, value): if slider == self.x or slider == self.y: move_laser(client, (self.x.get_value(), self.y.get_value())) elif slider == self.mode: laser_set("mode", self.mode.get_value()) elif slider == self.animation: laser_set("animation", self.animation.get_value()) elif slider == self.strobe: laser_set("strobe", self.strobe.get_value()) elif slider == self.point_speed: laser_set("point_speed", self.point_speed.get_value()) elif slider == self.zoom: laser_set("zoom", self.zoom.get_value()) elif slider == self.color: laser_set("color", self.color.get_value()) elif slider == self.reset: laser_set("reset", self.reset.get_value()) elif slider == self.color_effect: laser_set("color_effect", self.color_effect.get_value()) def update_channels(self, channels): self.animation.set_value(channels[1]) self.strobe.set_value(channels[2]) self.point_speed.set_value(channels[3]) self.x.set_value(channels[4]) self.y.set_value(channels[5]) self.zoom.set_value(channels[6]) self.color.set_value(channels[7]) self.reset.set_value(channels[8]) self.color_effect.set_value(channels[9]) self.loop.draw_screen() view = View(client) def move_laser(client, pos): cmd = {"laser": { "pos": {"x": pos[0], "y": pos[1]} }} client.publish("kitchen/laser/state/cmd", payload = json.dumps(cmd)) def laser_set(attribute, value): cmd = {"laser": { attribute: value}} client.publish("kitchen/laser/state/cmd", payload = json.dumps(cmd)) def on_state_changed(client, user_data, message): data = json.loads(message.payload) if "channels" in data: view.update_channels(data["channels"]) def unhandled_input(input): global x, y if input == "up": y = y + 1 move_laser(client, (x, y)) return True if input == "down": y = y - 1 move_laser(client, (x, y)) return True if input == "left": x = x - 1 move_laser(client, (x, y)) return True if input == "right": x = x + 1 move_laser(client, (x, y)) return True if input == "c": return True return False def exit_app(sig, frame): client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "automatic"})) client.loop_stop() client.disconnect() sys.exit(0) def main(args): global client client = mqtt.Client() try: client.connect(args.host) except: print("unable to connect to mqtt broker") sys.exit(-1) client.subscribe("kitchen/laser/state") client.message_callback_add("kitchen/laser/state", on_state_changed) client.loop_start() client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "manual"})) view.loop.unhandled_input = unhandled_input signal.signal(signal.SIGINT, exit_app) try: view.loop.run() except KeyboardInterrupt: print("test") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--host", default="hasspi") args = parser.parse_args() main(args)