add a simulation of the low level avr door control

Simulates the logic of the avr and provides the data over tcp/ip

Signed-off-by: Thomas Schmid <tom@binary-kitchen.de>
This commit is contained in:
Thomas 2019-03-09 13:22:39 +01:00
parent 8bc75bc0f6
commit bf83df5151
3 changed files with 183 additions and 10 deletions

146
doorlockd-serialsim Executable file
View File

@ -0,0 +1,146 @@
#!/usr/bin/env python3
"""
Doorlockd -- Binary Kitchen's smart door opener
Copyright (c) Binary Kitchen e.V., 2018-2019
Author:
Thomas Schmid <tom@binary-kitchen.de>
This work is licensed under the terms of the GNU GPL, version 2. See
the LICENSE file in the top-level directory.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
"""
import sys
import argparse
import socket
import curses
from enum import Enum, auto
from threading import Thread,Timer
from pydoorlock.Config import Config
from pydoorlock.Protocol import Protocol
cfg = Config('doorlockd')
class State(Enum):
RED = auto()
GREEN = auto()
YELLOW = auto()
class Source(Enum):
BUTTON = auto()
EMERGENCY = auto()
TIMEOUT = auto()
COMM = auto()
class NetworkWorker(Thread):
def __init__(self, client, logic):
Thread.__init__(self)
self.client = client
self.logic = logic
def run(self):
while True:
try:
rec = self.client.recv(1).decode()
if rec == Protocol.STATE_SWITCH_RED.value:
self.logic.update_state(State.RED, Source.COMM)
elif rec == Protocol.STATE_SWITCH_GREEN.value:
self.logic.update_state(State.GREEN, Source.COMM)
elif rec == Protocol.STATE_SWITCH_YELLOW.value:
self.logic.update_state(State.YELLOW, Source.COMM)
except socket.error:
print("socket error")
return
class Watchdog:
def __init__(self, timeout, timeoutHandler=None):
self.timeout = timeout
self.handler = timeoutHandler if timeoutHandler is not None else self.defaultHandler
self.timer = Timer(self.timeout, self.handler)
def defaultHandler(self):
pass
def start(self):
self.timer.start()
def stop(self):
self.timer.cancel()
def reset(self):
self.timer.cancel()
self.timer = Timer(self.timeout, self.handler)
self.timer.start()
class DoorlockLogic:
def __init__(self, client, timeout, initialState=State.RED):
self.state = initialState
self.client = client
self.watchdog = Watchdog(timeout, self.watchdog_handler)
self.watchdog.start()
def update_state(self,new_state,source):
self.watchdog.reset()
if new_state == self.state:
return
self.state = new_state
choices = {State.RED: Protocol.STATE_SWITCH_RED.value,
State.GREEN: Protocol.STATE_SWITCH_GREEN.value,
State.YELLOW: Protocol.STATE_SWITCH_YELLOW.value}
ret = choices.get(self.state)
if source == Source.BUTTON:
self.client.send(ret.upper())
elif source == Source.EMERGENCY:
self.client.send(Protocol.EMERGENCY.value)
elif source == Source.TIMEOUT:
self.client.send(ret)
def watchdog_handler(self):
self.update_state(State.RED, Source.TIMEOUT)
def main(args):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('localhost', cfg.int("SIMULATE_SERIAL_PORT")))
server.listen(1)
client, addr = server.accept()
try:
while True:
logic = DoorlockLogic(client, args.timeout)
NetworkWorker(client, logic).start()
while True:
rec = input("input desired state (l=lock, u=unlock, p=present, e=emergency unlock)...")
if rec=='l':
logic.update_state(State.RED, Source.BUTTON)
elif rec=='u':
logic.update_state(State.GREEN, Source.BUTTON)
elif rec=='p':
logic.update_state(State.YELLOW, Source.BUTTON)
elif rec=='e':
logic.update_state(State.GREEN, Source.EMERGENCY)
else:
print("Invalid command ""{}""".format(rec))
finally:
print("closing")
client.close()
server.close()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description = 'Provide a simulated doorlock serial connection over tcp/ip')
parser.add_argument('-t', '--timeout', help='specify watchdog timeout interval', type=float, default=5.0)
args = parser.parse_args()
try:
main(args)
except KeyboardInterrupt as e:
sys.exit(0)

View File

@ -1,7 +1,7 @@
[doorlockd]
DEBUG = False
SIMULATE_SERIAL = False
SIMULATE_SERIAL = True
SIMULATE_AUTH = False
RUN_HOOKS = True
SOUNDS = True
@ -22,3 +22,5 @@ WELCOME = Willkommen in der Binary Kitchen
SERIAL_PORT = /dev/ttyAMA0
SECRET_KEY = foobar
SIMULATE_SERIAL_PORT = 5000

View File

@ -16,6 +16,7 @@ details.
"""
import logging
import socket
from enum import Enum
from random import sample
@ -121,22 +122,47 @@ class DoorHandler:
self.run_hooks = cfg.boolean('RUN_HOOKS')
if cfg.boolean('SIMULATE_SERIAL'):
return
log.info('Trying to connect to simulated serial port')
self.socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
try:
self.socket.connect(('localhost',cfg.int("SIMULATE_SERIAL_PORT")))
self.socket.setblocking(0)
log.info('Using simulated serial port')
except socket.error:
log.info('Unable to connect to simulated serial port')
return
else:
device = cfg.str('SERIAL_PORT')
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
device = cfg.str('SERIAL_PORT')
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def read(self):
try:
return self.serial.read(1)
except AttributeError:
try:
return self.socket.recv(1)
except socket.error:
return ""
def write(self,c):
try:
self.serial.write(c)
self.serial.flush()
except AttributeError:
self.socket.send(c)
self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
rx = self.read()
if len(rx) == 0:
break
@ -171,8 +197,7 @@ class DoorHandler:
else:
continue
self.serial.write(tx)
self.serial.flush()
self.write(tx)
def open(self):
if self.state == DoorState.Open: