cloud_laser/interactive.py

330 lines
11 KiB
Python

from threading import Event, Condition
import paho.mqtt.client as mqtt
import argparse
import json
import urwid
import sys
import signal
from urwid import signals
from urwid.util import _tagmarkup_recurse
client = None
CONFIG = dict()
x = 0
y = 0
c = 0
def labled_widget(label, widget):
return urwid.Columns([
urwid.Text(label),
widget
])
class GridFlowMenu(urwid.WidgetWrap):
def __init__(self, choices, callback):
self.buttons = dict()
for choice in choices:
b = urwid.Button(choice)
self.buttons[choice] = b
signals.connect_signal(b, "click", callback, choice)
super().__init__(urwid.GridFlow(self.buttons.values(), 12, 1, 1, "center"))
class Slider(urwid.Widget):
_selectable=True
_sizing = frozenset(["flow"])
def __init__(self, value = 0, max = 127, min = 0, step = 1):
signals.register_signal(self.__class__, ["value_changed"])
if max <= min or min >= max:
raise ValueError()
self.max = max
self.min = min
self.step = step
self.last_focus = False
self.input_digits = ""
self.set_value(value)
def set_value(self, value, emit=False):
self.value = min(max(value, self.min), self.max)
self.ratio = float(value-self.min)/float(self.max - self.min)
self.ratio = min(max(self.ratio, 0.0), 1.0)
if emit:
self._emit("value_changed", self.value)
self._invalidate()
def rows(self, size, focus=False):
return 1
def keypress(self, size, key):
if key == "left":
self.set_value(self.value - self.step, True)
elif key == "right":
self.set_value(self.value + self.step, True)
elif key.isdigit():
self.input_digits += key
self.set_value(int(self.input_digits), True)
else:
return key
def lost_focus(self):
self.input_digits = ""
def render(self, size, focus=False):
if not focus and self.last_focus:
self.lost_focus()
if focus != self.last_focus:
self.last_focus = focus
(maxcol,) = size
col1 = int(float(maxcol) * self.ratio)
col2 = int(maxcol - col1)
return urwid.CanvasJoin([
(urwid.SolidCanvas(chr(0x2593) if not focus else chr(9608), col1, 1), None, False, col1),
(urwid.SolidCanvas(chr(9472) if not focus else '>', col2, 1), None, False, col2)
])
class LabelSlider(urwid.WidgetWrap):
def __init__(self, min = 0, max = 100, value = 0, step = 1, value_format="{:3d}"):
self.slider = Slider(value = value, max = max, min = min, step = step)
self.value_format = value_format
self.text = urwid.Text(value_format.format(value))
signals.connect_signal(self.slider, "value_changed", self.slider_changed)
super().__init__(urwid.Columns([self.slider, ('pack', self.text)], focus_column=0, dividechars = 2))
def render_text(self, value):
self.text.set_text(self.value_format.format(value))
def slider_changed(self, slider, value):
self.render_text(value)
def set_value(self, value, emit=False):
self.slider.set_value(value, emit=False)
self.render_text(value)
def get_value(self):
return self.slider.value
def get_slider(self):
return self.slider
class myFrame(urwid.Frame):
def keypress(self, size, key):
if key == "tab":
next_focus = "footer" if self.focus_position == "body" else "body"
self.set_focus(next_focus)
else:
return super().keypress(size, key)
class View:
def __init__(self, laser):
self.channel_view = self.build_channel_box()
self.main_view = self.build_main_box()
self.frame = myFrame(body=self.channel_view, footer=GridFlowMenu(choices=["Main", "Channels"], callback= self.change_view))
#fill = urwid.Filler(frame, "top")
self.loop = urwid.MainLoop(self.frame)
def change_view(self, button, choice):
if choice == "Main":
self.frame.set_body(self.main_view)
elif choice == "Channels":
self.frame.set_body(self.channel_view)
def mode_select_callback(self, radiobutton, state):
if radiobutton.state:
pass
def build_main_box(self):
fill = urwid.Filler(urwid.Text("Hello World", "left"))
self.mode_rb = []
mode = labled_widget("Mode", urwid.Pile([
urwid.RadioButton(self.mode_rb, "Manual"),
urwid.RadioButton(self.mode_rb, "Automatic")
]))
for e in self.mode_rb:
signals.connect_signal(e, "change", self.mode_select_callback)
self.effect_rb =[]
effect = labled_widget("Effect:", urwid.Pile([
urwid.RadioButton(self.effect_rb, "Effect 1"),
urwid.RadioButton(self.effect_rb, "Effect 2"),
urwid.RadioButton(self.effect_rb, "Effect 3"),
urwid.RadioButton(self.effect_rb, "Effect 4")
]))
fill = urwid.Filler(urwid.Pile([mode, urwid.Divider(), effect]))
return urwid.LineBox(fill, title="Main")
def build_channel_box(self):
self.mode = LabelSlider(min = 0, max = 0xFF)
self.animation = LabelSlider(min = 0, max=0xFF)
self.strobe = LabelSlider(min = 0, max = 0xFF)
self.point_speed = LabelSlider(min = 0, max = 0xFF)
self.x = LabelSlider(min = 0, max = 0xFF)
self.y = LabelSlider(min = 0, max = 0xFF)
self.zoom = LabelSlider(min = 0, max = 0xFF)
self.color = LabelSlider(min = 0, max = 0xFF)
self.reset = LabelSlider(min = 0, max = 0xFF)
self.color_effect = LabelSlider(min = 0, max = 0xFF)
signals.connect_signal(self.mode.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.animation.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.strobe.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.point_speed.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.x.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.y.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.zoom.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.color.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.reset.get_slider(), "value_changed", self.value_changed_callback)
signals.connect_signal(self.color_effect.get_slider(), "value_changed", self.value_changed_callback)
self.labels = urwid.Pile([
urwid.Text("Mode:"),
urwid.Text("Animation:"),
urwid.Text("Strobe:"),
urwid.Text("Effect:"),
urwid.Text("X:"),
urwid.Text("Y:"),
urwid.Text("Zoom:"),
urwid.Text("Color:"),
urwid.Text("Reset:"),
urwid.Text("Color-Effect:")]
)
self.values = urwid.Pile([
self.mode,
self.animation,
self.strobe,
self.point_speed,
self.x,
self.y,
self.zoom,
self.color,
self.reset,
self.color_effect
])
grid = urwid.Columns([('weight', 0.25, self.labels), self.values])
return urwid.LineBox(urwid.Filler(grid), title="Channels")
def value_changed_callback(self, slider, value):
if slider == self.x or slider == self.y:
move_laser(client, (self.x.get_value(), self.y.get_value()))
elif slider == self.mode:
laser_set("mode", self.mode.get_value())
elif slider == self.animation:
laser_set("animation", self.animation.get_value())
elif slider == self.strobe:
laser_set("strobe", self.strobe.get_value())
elif slider == self.point_speed:
laser_set("point_speed", self.point_speed.get_value())
elif slider == self.zoom:
laser_set("zoom", self.zoom.get_value())
elif slider == self.color:
laser_set("color", self.color.get_value())
elif slider == self.reset:
laser_set("reset", self.reset.get_value())
elif slider == self.color_effect:
laser_set("color_effect", self.color_effect.get_value())
def update_channels(self, channels):
self.animation.set_value(channels[1])
self.strobe.set_value(channels[2])
self.point_speed.set_value(channels[3])
self.x.set_value(channels[4])
self.y.set_value(channels[5])
self.zoom.set_value(channels[6])
self.color.set_value(channels[7])
self.reset.set_value(channels[8])
self.color_effect.set_value(channels[9])
self.loop.draw_screen()
view = View(client)
def move_laser(client, pos):
cmd = {"laser": {
"pos": {"x": pos[0], "y": pos[1]}
}}
client.publish("kitchen/laser/state/cmd", payload = json.dumps(cmd))
def laser_set(attribute, value):
cmd = {"laser": { attribute: value}}
client.publish("kitchen/laser/state/cmd", payload = json.dumps(cmd))
def on_state_changed(client, user_data, message):
data = json.loads(message.payload)
if "channels" in data:
view.update_channels(data["channels"])
def unhandled_input(input):
global x, y
if input == "up":
y = y + 1
move_laser(client, (x, y))
return True
if input == "down":
y = y - 1
move_laser(client, (x, y))
return True
if input == "left":
x = x - 1
move_laser(client, (x, y))
return True
if input == "right":
x = x + 1
move_laser(client, (x, y))
return True
if input == "c":
return True
return False
def exit_app(sig, frame):
client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "automatic"}))
client.loop_stop()
client.disconnect()
sys.exit(0)
def main(args):
global client
client = mqtt.Client()
try:
client.connect(args.host)
except:
print("unable to connect to mqtt broker")
sys.exit(-1)
client.subscribe("kitchen/laser/state")
client.message_callback_add("kitchen/laser/state", on_state_changed)
client.loop_start()
client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "manual"}))
view.loop.unhandled_input = unhandled_input
signal.signal(signal.SIGINT, exit_app)
try:
view.loop.run()
except KeyboardInterrupt:
print("test")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="hasspi")
args = parser.parse_args()
main(args)