wip: add control script
Signed-off-by: Thomas Schmid <tom@lfence.de>
This commit is contained in:
parent
ade567081c
commit
ebca53d6cf
326
interactive.py
Normal file
326
interactive.py
Normal file
@ -0,0 +1,326 @@
|
||||
from threading import Event, Condition
|
||||
import paho.mqtt.client as mqtt
|
||||
import argparse
|
||||
import json
|
||||
import urwid
|
||||
import sys
|
||||
import signal
|
||||
|
||||
from urwid import signals
|
||||
from urwid.util import _tagmarkup_recurse
|
||||
|
||||
client = None
|
||||
CONFIG = dict()
|
||||
x = 0
|
||||
y = 0
|
||||
c = 0
|
||||
|
||||
def labled_widget(label, widget):
|
||||
return urwid.Columns([
|
||||
urwid.Text(label),
|
||||
widget
|
||||
])
|
||||
|
||||
class GridFlowMenu(urwid.WidgetWrap):
|
||||
def __init__(self, choices, callback):
|
||||
self.buttons = dict()
|
||||
for choice in choices:
|
||||
b = urwid.Button(choice)
|
||||
self.buttons[choice] = b
|
||||
signals.connect_signal(b, "click", callback, choice)
|
||||
super().__init__(urwid.GridFlow(self.buttons.values(), 12, 1, 1, "center"))
|
||||
|
||||
class Slider(urwid.Widget):
|
||||
_selectable=True
|
||||
_sizing = frozenset(["flow"])
|
||||
|
||||
def __init__(self, value = 0, max = 127, min = 0, step = 1):
|
||||
signals.register_signal(self.__class__, ["value_changed"])
|
||||
if max <= min or min >= max:
|
||||
raise ValueError()
|
||||
|
||||
self.max = max
|
||||
self.min = min
|
||||
self.step = step
|
||||
self.last_focus = False
|
||||
self.input_digits = ""
|
||||
self.set_value(value)
|
||||
|
||||
def set_value(self, value, emit=False):
|
||||
self.value = min(max(value, self.min), self.max)
|
||||
self.ratio = float(value-self.min)/float(self.max - self.min)
|
||||
self.ratio = min(max(self.ratio, 0.0), 1.0)
|
||||
if emit:
|
||||
self._emit("value_changed", self.value)
|
||||
self._invalidate()
|
||||
|
||||
def rows(self, size, focus=False):
|
||||
return 1
|
||||
|
||||
def keypress(self, size, key):
|
||||
if key == "left":
|
||||
self.set_value(self.value - self.step, True)
|
||||
elif key == "right":
|
||||
self.set_value(self.value + self.step, True)
|
||||
elif key.isdigit():
|
||||
self.input_digits += key
|
||||
self.set_value(int(self.input_digits), True)
|
||||
else:
|
||||
return key
|
||||
|
||||
def lost_focus(self):
|
||||
self.input_digits = ""
|
||||
|
||||
def render(self, size, focus=False):
|
||||
if not focus and self.last_focus:
|
||||
self.lost_focus()
|
||||
|
||||
if focus != self.last_focus:
|
||||
self.last_focus = focus
|
||||
|
||||
(maxcol,) = size
|
||||
|
||||
col1 = int(float(maxcol) * self.ratio)
|
||||
col2 = int(maxcol - col1)
|
||||
|
||||
return urwid.CanvasJoin([
|
||||
(urwid.SolidCanvas(chr(0x2593) if not focus else chr(9608), col1, 1), None, False, col1),
|
||||
(urwid.SolidCanvas(chr(9472) if not focus else '>', col2, 1), None, False, col2)
|
||||
])
|
||||
|
||||
class LabelSlider(urwid.WidgetWrap):
|
||||
def __init__(self, min = 0, max = 100, value = 0, step = 1, value_format="{:3d}"):
|
||||
self.slider = Slider(value = value, max = max, min = min, step = step)
|
||||
self.value_format = value_format
|
||||
self.text = urwid.Text(value_format.format(value))
|
||||
signals.connect_signal(self.slider, "value_changed", self.slider_changed)
|
||||
super().__init__(urwid.Columns([self.slider, ('pack', self.text)], focus_column=0, dividechars = 2))
|
||||
|
||||
def render_text(self, value):
|
||||
self.text.set_text(self.value_format.format(value))
|
||||
|
||||
def slider_changed(self, slider, value):
|
||||
self.render_text(value)
|
||||
|
||||
def set_value(self, value, emit=False):
|
||||
self.slider.set_value(value, emit=False)
|
||||
self.text.render_text
|
||||
|
||||
def get_slider(self):
|
||||
return self.slider
|
||||
|
||||
class myFrame(urwid.Frame):
|
||||
def keypress(self, size, key):
|
||||
if key == "tab":
|
||||
next_focus = "footer" if self.focus_position == "body" else "body"
|
||||
self.set_focus(next_focus)
|
||||
else:
|
||||
return super().keypress(size, key)
|
||||
|
||||
class View:
|
||||
def __init__(self, laser):
|
||||
self.channel_view = self.build_channel_box()
|
||||
self.main_view = self.build_main_box()
|
||||
self.frame = myFrame(body=self.channel_view, footer=GridFlowMenu(choices=["Main", "Channels"], callback= self.change_view))
|
||||
#fill = urwid.Filler(frame, "top")
|
||||
self.loop = urwid.MainLoop(self.frame)
|
||||
|
||||
def change_view(self, button, choice):
|
||||
if choice == "Main":
|
||||
self.frame.set_body(self.main_view)
|
||||
elif choice == "Channels":
|
||||
self.frame.set_body(self.channel_view)
|
||||
|
||||
def mode_select_callback(self, radiobutton, state):
|
||||
if radiobutton.state:
|
||||
pass
|
||||
|
||||
def build_main_box(self):
|
||||
fill = urwid.Filler(urwid.Text("Hello World", "left"))
|
||||
|
||||
self.mode_rb = []
|
||||
mode = labled_widget("Mode", urwid.Pile([
|
||||
urwid.RadioButton(self.mode_rb, "Manual"),
|
||||
urwid.RadioButton(self.mode_rb, "Automatic")
|
||||
]))
|
||||
|
||||
for e in self.mode_rb:
|
||||
signals.connect_signal(e, "change", self.mode_select_callback)
|
||||
|
||||
self.effect_rb =[]
|
||||
effect = labled_widget("Effect:", urwid.Pile([
|
||||
urwid.RadioButton(self.effect_rb, "Effect 1"),
|
||||
urwid.RadioButton(self.effect_rb, "Effect 2"),
|
||||
urwid.RadioButton(self.effect_rb, "Effect 3"),
|
||||
urwid.RadioButton(self.effect_rb, "Effect 4")
|
||||
]))
|
||||
|
||||
fill = urwid.Filler(urwid.Pile([mode, urwid.Divider(), effect]))
|
||||
return urwid.LineBox(fill, title="Main")
|
||||
|
||||
def build_channel_box(self):
|
||||
self.mode = LabelSlider(min = 0, max = 0xFF)
|
||||
self.animation = LabelSlider(min = 0, max=0xFF)
|
||||
self.strobe = LabelSlider(min = 0, max = 0xFF)
|
||||
self.effect = LabelSlider(min = 0, max = 0xFF)
|
||||
self.x = LabelSlider(min = 0, max = 0xFF)
|
||||
self.y = LabelSlider(min = 0, max = 0xFF)
|
||||
self.zoom = LabelSlider(min = 0, max = 0xFF)
|
||||
self.color = LabelSlider(min = 0, max = 0xFF)
|
||||
self.reset = LabelSlider(min = 0, max = 0xFF)
|
||||
self.color_effect = LabelSlider(min = 0, max = 0xFF)
|
||||
|
||||
signals.connect_signal(self.mode.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.animation.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.strobe.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.effect.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.x.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.y.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.zoom.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.color.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.reset.get_slider(), "value_changed", self.value_changed_callback)
|
||||
signals.connect_signal(self.color_effect.get_slider(), "value_changed", self.value_changed_callback)
|
||||
|
||||
self.labels = urwid.Pile([
|
||||
urwid.Text("Mode:"),
|
||||
urwid.Text("Animation:"),
|
||||
urwid.Text("Strobe:"),
|
||||
urwid.Text("Effect:"),
|
||||
urwid.Text("X:"),
|
||||
urwid.Text("Y:"),
|
||||
urwid.Text("Zoom:"),
|
||||
urwid.Text("Color:"),
|
||||
urwid.Text("Reset:"),
|
||||
urwid.Text("Color-Effect:")]
|
||||
)
|
||||
|
||||
self.values = urwid.Pile([
|
||||
self.mode,
|
||||
self.animation,
|
||||
self.strobe,
|
||||
self.effect,
|
||||
self.x,
|
||||
self.y,
|
||||
self.zoom,
|
||||
self.color,
|
||||
self.reset,
|
||||
self.color_effect
|
||||
])
|
||||
|
||||
grid = urwid.Columns([('weight', 0.25, self.labels), self.values])
|
||||
return urwid.LineBox(urwid.Filler(grid), title="Channels")
|
||||
|
||||
def value_changed_callback(self, slider, value):
|
||||
if slider == self.x or slider == self.y:
|
||||
move_laser(client, (self.x.value, self.y.value))
|
||||
elif slider == self.mode:
|
||||
pass
|
||||
elif slider == self.animation:
|
||||
pass
|
||||
elif slider == self.strobe:
|
||||
pass
|
||||
elif slider == self.effect:
|
||||
pass
|
||||
elif slider == self.zoom:
|
||||
pass
|
||||
elif slider == self.color:
|
||||
pass
|
||||
elif slider == self.reset:
|
||||
pass
|
||||
elif slider == self.color_effect:
|
||||
pass
|
||||
|
||||
def update_channels(self, channels):
|
||||
self.animation.set_value(channels[1])
|
||||
self.strobe.set_value(channels[2])
|
||||
self.effect.set_value(channels[3])
|
||||
self.x.set_value(channels[4])
|
||||
self.y.set_value(channels[5])
|
||||
self.zoom.set_value(channels[6])
|
||||
self.color.set_value(channels[7])
|
||||
self.reset.set_value(channels[8])
|
||||
self.color_effect.set_value(channels[9])
|
||||
|
||||
|
||||
|
||||
|
||||
self.loop.draw_screen()
|
||||
|
||||
view = View(client)
|
||||
|
||||
def move_laser(client, pos):
|
||||
cmd = {"laser": {
|
||||
"pos": {"x": pos[0], "y": pos[1]}
|
||||
}}
|
||||
client.publish("kitchen/laser/state/cmd", payload = json.dumps(cmd))
|
||||
|
||||
def on_state_changed(client, user_data, message):
|
||||
data = json.loads(message.payload)
|
||||
|
||||
if "channels" in data:
|
||||
view.update_channels(data["channels"])
|
||||
|
||||
def unhandled_input(input):
|
||||
global x, y
|
||||
if input == "up":
|
||||
y = y + 1
|
||||
move_laser(client, (x, y))
|
||||
return True
|
||||
|
||||
if input == "down":
|
||||
y = y - 1
|
||||
move_laser(client, (x, y))
|
||||
return True
|
||||
|
||||
if input == "left":
|
||||
x = x - 1
|
||||
move_laser(client, (x, y))
|
||||
return True
|
||||
|
||||
if input == "right":
|
||||
x = x + 1
|
||||
move_laser(client, (x, y))
|
||||
return True
|
||||
|
||||
if input == "c":
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def exit_app(sig, frame):
|
||||
client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "automatic"}))
|
||||
client.loop_stop()
|
||||
client.disconnect()
|
||||
sys.exit(0)
|
||||
|
||||
def main(args):
|
||||
global client
|
||||
client = mqtt.Client()
|
||||
|
||||
try:
|
||||
client.connect(args.host)
|
||||
except:
|
||||
print("unable to connect to mqtt broker")
|
||||
sys.exit(-1)
|
||||
|
||||
client.subscribe("kitchen/laser/state")
|
||||
client.message_callback_add("kitchen/laser/state", on_state_changed)
|
||||
client.loop_start()
|
||||
client.publish("kitchen/laser/state/cmd", payload=json.dumps({"mode": "manual"}))
|
||||
|
||||
view.loop.unhandled_input = unhandled_input
|
||||
|
||||
signal.signal(signal.SIGINT, exit_app)
|
||||
|
||||
try:
|
||||
view.loop.run()
|
||||
except KeyboardInterrupt:
|
||||
print("test")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--host", default="hasspi")
|
||||
|
||||
args = parser.parse_args()
|
||||
main(args)
|
Loading…
x
Reference in New Issue
Block a user