1
0
mirror of https://github.com/binary-kitchen/doorlockd synced 2024-12-22 02:14:26 +01:00
doorlockd-mirror/doorlockd
Ralf Ramsauer 00f74d69f9 pydoorlock: Refactor LogicResponse to DoorlockResponse
Signed-off-by: Ralf Ramsauer <ralf@binary-kitchen.de>
2018-10-16 21:25:23 +02:00

265 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Doorlockd -- Binary Kitchen's smart door opener
Copyright (c) Binary Kitchen e.V., 2018
Author:
Ralf Ramsauer <ralf@binary-kitchen.de>
This work is licensed under the terms of the GNU GPL, version 2. See
the LICENSE file in the top-level directory.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
"""
import logging
import sys
from configparser import ConfigParser
from enum import Enum
from os.path import abspath, join
from serial import Serial
from subprocess import Popen
from threading import Thread
from time import sleep
from pydoorlock.Authenticator import Authenticator, AuthMethod
from pydoorlock.WebApp import webapp_run, emit_doorstate
from pydoorlock.Door import DoorState
from pydoorlock.Doorlock import DoorlockResponse
SYSCONFDIR = '.'
PREFIX = '.'
root_prefix = join(PREFIX, 'share', 'doorlockd')
sounds_prefix = join(root_prefix, 'sounds')
scripts_prefix = join(root_prefix, 'scripts')
static_folder = abspath(join(root_prefix, 'static'))
template_folder = abspath(join(root_prefix, 'templates'))
__author__ = 'Ralf Ramsauer'
__copyright = 'Copyright (c) Ralf Ramsauer, 2018'
__license__ = 'GPLv2'
__email__ = 'ralf@binary-kitchen.de'
__status__ = 'Development'
__maintainer__ = 'Ralf Ramsauer'
__version__ = '2.0-rc2'
log_level = logging.DEBUG
date_fmt = '%Y-%m-%d %H:%M:%S'
log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s'
log = logging.getLogger()
class Config:
config_topic = 'doorlock'
def __init__(self, sysconfdir):
self.config = ConfigParser()
self.config.read([join(sysconfdir, 'doorlockd.default.cfg'),
join(sysconfdir, 'doorlockd.cfg')])
def boolean(self, key):
return self.config.getboolean(self.config_topic, key)
def str(self, key):
return self.config.get(self.config_topic, key)
cfg = Config(SYSCONFDIR)
# Booleans
run_hooks = cfg.boolean('RUN_HOOKS')
sounds = cfg.boolean('SOUNDS')
wave_emergency = 'emergency_unlock.wav'
wave_lock = 'lock.wav'
wave_lock_button = 'lock_button.wav'
wave_present = 'present.wav'
wave_present_button = 'present.wav'
wave_unlock = 'unlock.wav'
wave_unlock_button = 'unlock_button.wav'
wave_zonk = 'zonk.wav'
def playsound(filename):
if not sounds:
return
Popen(['nohup', 'aplay', join(sounds_prefix, filename)])
def start_hook(script):
if not run_hooks:
log.info('Hooks disabled: not starting %s' % script)
return
log.info('Starting hook %s' % script)
Popen(['nohup', join(scripts_prefix, script)])
def sound_helper(old_state, new_state, button):
if old_state == new_state:
playsound(wave_zonk)
return
if button:
if new_state == DoorState.Open:
playsound(wave_unlock_button)
elif new_state == DoorState.Present:
playsound(wave_present_button)
elif new_state == DoorState.Closed:
playsound(wave_lock_button)
else:
if new_state == DoorState.Open:
playsound(wave_unlock)
elif new_state == DoorState.Present:
playsound(wave_present)
elif new_state == DoorState.Closed:
playsound(wave_lock)
class DoorHandler:
state = DoorState.Closed
do_close = False
CMD_PRESENT = b'y'
CMD_OPEN = b'g'
CMD_CLOSE = b'r'
BUTTON_PRESENT = b'Y'
BUTTON_OPEN = b'G'
BUTTON_CLOSE = b'R'
CMD_EMERGENCY_SWITCH = b'E'
# TBD DOOR NOT CLOSED
def __init__(self, device):
if cfg.boolean('SIMULATE_SERIAL'):
return
device = cfg.str('SERIAL_PORT')
log.info('Using serial port: %s' % device)
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == DoorHandler.BUTTON_CLOSE:
self.close()
log.info('Closed due to Button press')
#emit_status(LogicResponse.ButtonLock)
elif rx == DoorHandler.BUTTON_OPEN:
self.open()
log.info('Opened due to Button press')
#emit_status(LogicResponse.ButtonUnlock)
elif rx == DoorHandler.BUTTON_PRESENT:
self.present()
log.info('Present due to Button press')
#emit_status(LogicResponse.ButtonPresent)
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
log.warning('Emergency unlock')
#emit_status(LogicResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
sound_helper(old_state, self.state, True)
if self.do_close:
tx = DoorHandler.CMD_CLOSE
self.do_close = False
elif self.state == DoorState.Present:
tx = DoorHandler.CMD_PRESENT
elif self.state == DoorState.Open:
tx = DoorHandler.CMD_OPEN
else:
continue
self.serial.write(tx)
self.serial.flush()
def open(self):
if self.state == DoorState.Open:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Open
start_hook('post_unlock')
return DoorlockResponse.Success
def present(self):
if self.state == DoorState.Present:
return DoorlockResponse.AlreadyActive
self.state = DoorState.Present
start_hook('post_present')
return DoorlockResponse.Success
def close(self):
if self.state == DoorState.Closed:
return DoorlockResponse.AlreadyActive
self.do_close = True
self.state = DoorState.Closed
start_hook('post_lock')
return DoorlockResponse.Success
def request(self, state):
old_state = self.state
if state == DoorState.Closed:
err = self.close()
elif state == DoorState.Present:
err = self.present()
elif state == DoorState.Open:
err = self.open()
sound_helper(old_state, self.state, False)
emit_doorstate()
return err
class Logic:
def __init__(self, cfg):
self.auth = Authenticator(cfg)
self.door_handler = DoorHandler(cfg)
def request(self, state, credentials):
err = self.auth.try_auth(credentials)
if err != DoorlockResponse.Success:
return err
return self.door_handler.request(state)
@property
def state(self):
return self.door_handler.state
if __name__ == '__main__':
logging.basicConfig(level=log_level, stream=sys.stdout,
format=log_fmt, datefmt=date_fmt)
log.info('Starting doorlockd')
logic = Logic(cfg)
webapp_run(cfg, logic, __status__, __version__, template_folder,
static_folder)
sys.exit(0)