1
0
mirror of https://github.com/binary-kitchen/doorlockd synced 2024-12-22 18:34:25 +01:00

Refactor pydoorlock to support multiple backends

Signed-off-by: Thomas Schmid <tom@lfence.de>
This commit is contained in:
Thomas 2022-11-16 20:58:32 +01:00
parent b189fe7982
commit 4825c7346e
5 changed files with 150 additions and 62 deletions

View File

@ -4,6 +4,7 @@ DEBUG = False
SIMULATE_SERIAL = False SIMULATE_SERIAL = False
SIMULATE_AUTH = False SIMULATE_AUTH = False
RUN_HOOKS = True RUN_HOOKS = True
SIMULATE_BACKEND = True
SOUNDS = True SOUNDS = True
# LDAP # LDAP
@ -19,7 +20,11 @@ TITLE = Binary Kitchen Doorlock
ROOM = Hauptraum ROOM = Hauptraum
WELCOME = Willkommen in der Binary Kitchen WELCOME = Willkommen in der Binary Kitchen
[backend]
BACKEND_TYPE = avr
SERIAL_PORT = /dev/ttyAMA0 SERIAL_PORT = /dev/ttyAMA0
# Simulation Backend for testing
#BACKEND_TYPE = simulation
SECRET_KEY = foobar SECRET_KEY = foobar

74
pydoorlock/AvrDoorlock.py Normal file
View File

@ -0,0 +1,74 @@
from .DoorlockBackend import DoorlockBackend
from .Protocol import Protocol
from .Door import DoorState
from serial import Serial
from time import sleep
import logging
import threading
log = logging.getLogger(__name__)
class AvrDoorlockBackend(DoorlockBackend):
def __init__(self, device):
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.do_close = False
self.do_open = False
self.do_present = False
threading.Thread(target=self.thread_worker).start()
def get_capabilities(self):
return [DoorState.Closed, DoorState.Open, DoorState.Present]
def set_state(self, state):
if state == DoorState.Closed:
self.do_close = True
elif state == DoorState.Open:
self.do_open = True
elif state == DoorState.Present:
self.do_present = True
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == Protocol.STATE_SWITCH_RED.value.upper():
self.door_handler.close()
log.info('Closed due to Button press')
self.door_handler.invoke_callback(DoorlockResponse.ButtonLock)
elif rx == Protocol.STATE_SWITCH_GREEN.value.upper():
self.door_handler.open()
log.info('Opened due to Button press')
self.door_handler.invoke_callback(DoorlockResponse.ButtonUnlock)
elif rx == Protocol.STATE_SWITCH_YELLOW.value.upper():
self.door_handler.present()
log.info('Present due to Button press')
self.door_handler.invoke_callback(DoorlockResponse.ButtonPresent)
elif rx == Protocol.EMERGENCY.value:
log.warning('Emergency unlock')
self.door_handler.invoke_callback(DoorlockResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
self.sound_helper(old_state, self.state, True)
if self.do_close:
tx = Protocol.STATE_SWITCH_RED.value
self.do_close = False
elif self.do_present:
tx = Protocol.STATE_SWITCH_YELLOW.value
self.do_present = False
elif self.do_open:
tx = Protocol.STATE_SWITCH_GREEN.value
self.do_open = False
else:
continue
self.serial.write(tx)
self.serial.flush()

View File

@ -20,10 +20,13 @@ import logging
from enum import Enum from enum import Enum
from random import sample from random import sample
from subprocess import run from subprocess import run
from serial import Serial
from threading import Thread from threading import Thread
from time import sleep from time import sleep
from os.path import join from os.path import join
from pydoorlock.AvrDoorlock import AvrDoorlockBackend
from pydoorlock.SimulationBackend import SimulationBackend
from .Config import Config
from .Door import DoorState from .Door import DoorState
from .Protocol import Protocol from .Protocol import Protocol
@ -68,6 +71,7 @@ class DoorlockResponse(Enum):
# don't break old apps, value 3 is reserved now # don't break old apps, value 3 is reserved now
RESERVED = 3 RESERVED = 3
Inval = 4 Inval = 4
BackendError = 5
EmergencyUnlock = 10, EmergencyUnlock = 10,
ButtonLock = 11, ButtonLock = 11,
@ -91,6 +95,8 @@ class DoorlockResponse(Enum):
return 'Opened by button' return 'Opened by button'
elif self == DoorlockResponse.ButtonPresent: elif self == DoorlockResponse.ButtonPresent:
return 'Present by button' return 'Present by button'
elif self == DoorlockResponse.BackendError:
return "Backend Error"
return 'Error' return 'Error'
@ -110,7 +116,7 @@ class DoorHandler:
wave_zonk = 'zonk.wav' wave_zonk = 'zonk.wav'
def __init__(self, cfg, sounds_prefix, scripts_prefix): def __init__(self, cfg: Config, sounds_prefix, scripts_prefix):
self._callback = None self._callback = None
self.sounds = cfg.boolean('SOUNDS') self.sounds = cfg.boolean('SOUNDS')
@ -120,84 +126,65 @@ class DoorHandler:
self.scripts_prefix = scripts_prefix self.scripts_prefix = scripts_prefix
self.run_hooks = cfg.boolean('RUN_HOOKS') self.run_hooks = cfg.boolean('RUN_HOOKS')
if cfg.boolean('SIMULATE_SERIAL'): backend_type = cfg.str("BACKEND_TYPE", "backend")
print(backend_type)
if not backend_type:
log.error("No backend configured")
raise RuntimeError()
if backend_type == "avr":
self.backend = AvrDoorlockBackend(self)
elif backend_type == "simulation":
self.backend = SimulationBackend(self)
else:
log.error(f"Unknown backend {backend_type}")
raise RuntimeError
def state_changed(self, new_state):
if new_state == DoorState.Open:
self.run_hook('post_unlock')
elif new_state == DoorState.Present:
self.run_hook('post_present')
elif new_state == DoorState.Closed:
self.run_hook('post_lock')
else:
return return
device = cfg.str('SERIAL_PORT') self.state = new_state
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == Protocol.STATE_SWITCH_RED.value.upper():
self.close()
log.info('Closed due to Button press')
self.invoke_callback(DoorlockResponse.ButtonLock)
elif rx == Protocol.STATE_SWITCH_GREEN.value.upper():
self.open()
log.info('Opened due to Button press')
self.invoke_callback(DoorlockResponse.ButtonUnlock)
elif rx == Protocol.STATE_SWITCH_YELLOW.value.upper():
self.present()
log.info('Present due to Button press')
self.invoke_callback(DoorlockResponse.ButtonPresent)
elif rx == Protocol.EMERGENCY.value:
log.warning('Emergency unlock')
self.invoke_callback(DoorlockResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
self.sound_helper(old_state, self.state, True)
if self.do_close:
tx = Protocol.STATE_SWITCH_RED.value
self.do_close = False
elif self.state == DoorState.Present:
tx = Protocol.STATE_SWITCH_YELLOW.value
elif self.state == DoorState.Open:
tx = Protocol.STATE_SWITCH_GREEN.value
else:
continue
self.serial.write(tx)
self.serial.flush()
def open(self): def open(self):
if self.state == DoorState.Open: if self.state == DoorState.Open:
return DoorlockResponse.AlreadyActive return DoorlockResponse.AlreadyActive
self.state = DoorState.Open if self.backend.set_state(DoorState.Open):
self.run_hook('post_unlock') self.state = DoorState.Open
return DoorlockResponse.Success self.run_hook('post_unlock')
return DoorlockResponse.Success
return DoorlockResponse.BackendError
def present(self): def present(self):
if self.state == DoorState.Present: if self.state == DoorState.Present:
return DoorlockResponse.AlreadyActive return DoorlockResponse.AlreadyActive
self.state = DoorState.Present if self.backend.set_state(DoorState.Present):
self.run_hook('post_present') self.state = DoorState.Present
return DoorlockResponse.Success self.run_hook('post_present')
return DoorlockResponse.Success
return DoorlockResponse.BackendError
def close(self): def close(self):
if self.state == DoorState.Closed: if self.state == DoorState.Closed:
return DoorlockResponse.AlreadyActive return DoorlockResponse.AlreadyActive
self.do_close = True if self.backend.set_state(DoorState.Closed):
self.state = DoorState.Closed self.state = DoorState.Closed
self.run_hook('post_lock') self.run_hook('post_lock')
return DoorlockResponse.Success return DoorlockResponse.Success
return DoorlockResponse.BackendError
def request(self, state): def request(self, state):
old_state = self.state old_state = self.state

View File

@ -0,0 +1,17 @@
from abc import ABC, abstractmethod
class DoorlockBackend(ABC):
def __init__(self):
self.callbacks = list()
self.current_state = None
def update_state(self):
self.state_change_callback(self.current_state)
@abstractmethod
def set_state(self, state):
self.current_state = state
@abstractmethod
def get_state(self, state):
return self.current_state

View File

@ -0,0 +1,5 @@
from .DoorlockBackend import DoorlockBackend
class SimulationBackend(DoorlockBackend):
def __init__(self, handler):
super.__init__(handler)