doorlockd: move DoorHandler to Doorlock.py

Signed-off-by: Ralf Ramsauer <ralf@binary-kitchen.de>
This commit is contained in:
Ralf Ramsauer 2018-10-09 00:35:14 +02:00
parent aa3369baa8
commit d6d90e9f70
2 changed files with 172 additions and 166 deletions

168
doorlockd
View File

@ -22,15 +22,10 @@ import sys
from configparser import ConfigParser
from os.path import abspath, join
from serial import Serial
from subprocess import Popen
from threading import Thread
from time import sleep
from pydoorlock.Authenticator import Authenticator
from pydoorlock.WebApp import webapp_run, emit_doorstate
from pydoorlock.Door import DoorState
from pydoorlock.Doorlock import DoorlockResponse
from pydoorlock.WebApp import webapp_run
from pydoorlock.Doorlock import DoorlockResponse, DoorHandler
SYSCONFDIR = '.'
PREFIX = '.'
@ -67,165 +62,6 @@ class Config:
cfg = Config(SYSCONFDIR)
class DoorHandler:
state = DoorState.Closed
do_close = False
CMD_PRESENT = b'y'
CMD_OPEN = b'g'
CMD_CLOSE = b'r'
BUTTON_PRESENT = b'Y'
BUTTON_OPEN = b'G'
BUTTON_CLOSE = b'R'
CMD_EMERGENCY_SWITCH = b'E'
wave_lock = 'lock.wav'
wave_lock_button = 'lock_button.wav'
wave_present = 'present.wav'
wave_present_button = 'present.wav'
wave_unlock = 'unlock.wav'
wave_unlock_button = 'unlock_button.wav'
wave_zonk = 'zonk.wav'
def __init__(self, cfg, sounds_prefix, scripts_prefix):
self.sounds = cfg.boolean('SOUNDS')
if self.sounds:
self.sounds_prefix = sounds_prefix
self.scripts_prefix = scripts_prefix
self.run_hooks = cfg.boolean('RUN_HOOKS')
if cfg.boolean('SIMULATE_SERIAL'):
return
device = cfg.str('SERIAL_PORT')
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == DoorHandler.BUTTON_CLOSE:
self.close()
log.info('Closed due to Button press')
#emit_status(LogicResponse.ButtonLock)
elif rx == DoorHandler.BUTTON_OPEN:
self.open()
log.info('Opened due to Button press')
#emit_status(LogicResponse.ButtonUnlock)
elif rx == DoorHandler.BUTTON_PRESENT:
self.present()
log.info('Present due to Button press')
#emit_status(LogicResponse.ButtonPresent)
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
log.warning('Emergency unlock')
#emit_status(LogicResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
self.sound_helper(old_state, self.state, True)
if self.do_close:
tx = DoorHandler.CMD_CLOSE
self.do_close = False
elif self.state == DoorState.Present:
tx = DoorHandler.CMD_PRESENT
elif self.state == DoorState.Open:
tx = DoorHandler.CMD_OPEN
else:
continue
self.serial.write(tx)
self.serial.flush()
def open(self):
if self.state == DoorState.Open:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Open
self.run_hook('post_unlock')
return DoorlockResponse.Success
def present(self):
if self.state == DoorState.Present:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Present
self.run_hook('post_present')
return DoorlockResponse.Success
def close(self):
if self.state == DoorState.Closed:
return DoorlockResponse.AlreadyActive
self.do_close = True
self.state = DoorState.Closed
self.run_hook('post_lock')
return DoorlockResponse.Success
def request(self, state):
old_state = self.state
if state == DoorState.Closed:
err = self.close()
elif state == DoorState.Present:
err = self.present()
elif state == DoorState.Open:
err = self.open()
self.sound_helper(old_state, self.state, False)
emit_doorstate()
return err
def sound_helper(self, old_state, new_state, button):
if not self.sounds:
return
# TBD: Emergency Unlock
# wave_emergency = 'emergency_unlock.wav'
if old_state == new_state:
filename = self.wave_zonk
elif button:
if new_state == DoorState.Open:
filename = self.wave_unlock_button
elif new_state == DoorState.Present:
filename = self.wave_present_button
elif new_state == DoorState.Closed:
filename = self.wave_lock_button
else:
if new_state == DoorState.Open:
filename = self.wave_unlock
elif new_state == DoorState.Present:
filename = self.wave_present
elif new_state == DoorState.Closed:
filename = self.wave_lock
Popen(['nohup', 'aplay', join(self.sounds_prefix, filename)])
def run_hook(self, script):
if not self.run_hooks:
log.info('Hooks disabled: not starting %s' % script)
return
log.info('Starting hook %s' % script)
Popen(['nohup', join(self.scripts_prefix, script)])
class Logic:
def __init__(self, cfg, sounds_prefix, scripts_prefix):
self.auth = Authenticator(cfg)

View File

@ -1,5 +1,16 @@
import logging
from enum import Enum
from random import sample
from subprocess import Popen
from serial import Serial
from threading import Thread
from time import sleep
from os.path import join
from .Door import DoorState
log = logging.getLogger()
# copied from sudo
eperm_insults = {
@ -60,3 +71,162 @@ class DoorlockResponse(Enum):
return 'Present by button'
return 'Error'
class DoorHandler:
state = DoorState.Closed
do_close = False
CMD_PRESENT = b'y'
CMD_OPEN = b'g'
CMD_CLOSE = b'r'
BUTTON_PRESENT = b'Y'
BUTTON_OPEN = b'G'
BUTTON_CLOSE = b'R'
CMD_EMERGENCY_SWITCH = b'E'
wave_lock = 'lock.wav'
wave_lock_button = 'lock_button.wav'
wave_present = 'present.wav'
wave_present_button = 'present.wav'
wave_unlock = 'unlock.wav'
wave_unlock_button = 'unlock_button.wav'
wave_zonk = 'zonk.wav'
def __init__(self, cfg, sounds_prefix, scripts_prefix):
self.sounds = cfg.boolean('SOUNDS')
if self.sounds:
self.sounds_prefix = sounds_prefix
self.scripts_prefix = scripts_prefix
self.run_hooks = cfg.boolean('RUN_HOOKS')
if cfg.boolean('SIMULATE_SERIAL'):
return
device = cfg.str('SERIAL_PORT')
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == DoorHandler.BUTTON_CLOSE:
self.close()
log.info('Closed due to Button press')
#emit_status(LogicResponse.ButtonLock)
elif rx == DoorHandler.BUTTON_OPEN:
self.open()
log.info('Opened due to Button press')
#emit_status(LogicResponse.ButtonUnlock)
elif rx == DoorHandler.BUTTON_PRESENT:
self.present()
log.info('Present due to Button press')
#emit_status(LogicResponse.ButtonPresent)
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
log.warning('Emergency unlock')
#emit_status(LogicResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
self.sound_helper(old_state, self.state, True)
if self.do_close:
tx = DoorHandler.CMD_CLOSE
self.do_close = False
elif self.state == DoorState.Present:
tx = DoorHandler.CMD_PRESENT
elif self.state == DoorState.Open:
tx = DoorHandler.CMD_OPEN
else:
continue
self.serial.write(tx)
self.serial.flush()
def open(self):
if self.state == DoorState.Open:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Open
self.run_hook('post_unlock')
return DoorlockResponse.Success
def present(self):
if self.state == DoorState.Present:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Present
self.run_hook('post_present')
return DoorlockResponse.Success
def close(self):
if self.state == DoorState.Closed:
return DoorlockResponse.AlreadyActive
self.do_close = True
self.state = DoorState.Closed
self.run_hook('post_lock')
return DoorlockResponse.Success
def request(self, state):
old_state = self.state
if state == DoorState.Closed:
err = self.close()
elif state == DoorState.Present:
err = self.present()
elif state == DoorState.Open:
err = self.open()
self.sound_helper(old_state, self.state, False)
#emit_doorstate()
return err
def sound_helper(self, old_state, new_state, button):
if not self.sounds:
return
# TBD: Emergency Unlock
# wave_emergency = 'emergency_unlock.wav'
if old_state == new_state:
filename = self.wave_zonk
elif button:
if new_state == DoorState.Open:
filename = self.wave_unlock_button
elif new_state == DoorState.Present:
filename = self.wave_present_button
elif new_state == DoorState.Closed:
filename = self.wave_lock_button
else:
if new_state == DoorState.Open:
filename = self.wave_unlock
elif new_state == DoorState.Present:
filename = self.wave_present
elif new_state == DoorState.Closed:
filename = self.wave_lock
Popen(['nohup', 'aplay', join(self.sounds_prefix, filename)])
def run_hook(self, script):
if not self.run_hooks:
log.info('Hooks disabled: not starting %s' % script)
return
log.info('Starting hook %s' % script)
Popen(['nohup', join(self.scripts_prefix, script)])