mirror of
https://github.com/binary-kitchen/doorlockd
synced 2024-12-22 02:14:26 +01:00
262 lines
7.6 KiB
Python
Executable File
262 lines
7.6 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Doorlockd -- Binary Kitchen's smart door opener
|
|
|
|
Copyright (c) Binary Kitchen e.V., 2018
|
|
|
|
Author:
|
|
Ralf Ramsauer <ralf@binary-kitchen.de>
|
|
|
|
This work is licensed under the terms of the GNU GPL, version 2. See
|
|
the LICENSE file in the top-level directory.
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
details.
|
|
"""
|
|
|
|
import logging
|
|
import sys
|
|
|
|
from configparser import ConfigParser
|
|
from os.path import abspath, join
|
|
from serial import Serial
|
|
from subprocess import Popen
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
from pydoorlock.Authenticator import Authenticator
|
|
from pydoorlock.WebApp import webapp_run, emit_doorstate
|
|
from pydoorlock.Door import DoorState
|
|
from pydoorlock.Doorlock import DoorlockResponse
|
|
|
|
SYSCONFDIR = '.'
|
|
PREFIX = '.'
|
|
|
|
root_prefix = join(PREFIX, 'share', 'doorlockd')
|
|
scripts_prefix = join(root_prefix, 'scripts')
|
|
|
|
__author__ = 'Ralf Ramsauer'
|
|
__copyright = 'Copyright (c) Ralf Ramsauer, 2018'
|
|
__license__ = 'GPLv2'
|
|
__email__ = 'ralf@binary-kitchen.de'
|
|
__status__ = 'Development'
|
|
__maintainer__ = 'Ralf Ramsauer'
|
|
__version__ = '2.0-rc2'
|
|
|
|
log_level = logging.DEBUG
|
|
date_fmt = '%Y-%m-%d %H:%M:%S'
|
|
log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s'
|
|
log = logging.getLogger()
|
|
|
|
class Config:
|
|
config_topic = 'doorlock'
|
|
|
|
def __init__(self, sysconfdir):
|
|
self.config = ConfigParser()
|
|
self.config.read([join(sysconfdir, 'doorlockd.default.cfg'),
|
|
join(sysconfdir, 'doorlockd.cfg')])
|
|
|
|
def boolean(self, key):
|
|
return self.config.getboolean(self.config_topic, key)
|
|
|
|
def str(self, key):
|
|
return self.config.get(self.config_topic, key)
|
|
|
|
cfg = Config(SYSCONFDIR)
|
|
|
|
# Booleans
|
|
run_hooks = cfg.boolean('RUN_HOOKS')
|
|
|
|
|
|
def start_hook(script):
|
|
if not run_hooks:
|
|
log.info('Hooks disabled: not starting %s' % script)
|
|
return
|
|
log.info('Starting hook %s' % script)
|
|
Popen(['nohup', join(scripts_prefix, script)])
|
|
|
|
|
|
class DoorHandler:
|
|
state = DoorState.Closed
|
|
do_close = False
|
|
|
|
CMD_PRESENT = b'y'
|
|
CMD_OPEN = b'g'
|
|
CMD_CLOSE = b'r'
|
|
|
|
BUTTON_PRESENT = b'Y'
|
|
BUTTON_OPEN = b'G'
|
|
BUTTON_CLOSE = b'R'
|
|
|
|
CMD_EMERGENCY_SWITCH = b'E'
|
|
|
|
wave_lock = 'lock.wav'
|
|
wave_lock_button = 'lock_button.wav'
|
|
|
|
wave_present = 'present.wav'
|
|
wave_present_button = 'present.wav'
|
|
|
|
wave_unlock = 'unlock.wav'
|
|
wave_unlock_button = 'unlock_button.wav'
|
|
|
|
wave_zonk = 'zonk.wav'
|
|
|
|
def __init__(self, cfg, sounds_prefix):
|
|
self.sounds = cfg.boolean('SOUNDS')
|
|
if self.sounds:
|
|
self.sounds_prefix = sounds_prefix
|
|
|
|
if cfg.boolean('SIMULATE_SERIAL'):
|
|
return
|
|
|
|
device = cfg.str('SERIAL_PORT')
|
|
log.info('Using serial port: %s' % device)
|
|
|
|
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
|
|
stopbits=1, timeout=0)
|
|
self.thread = Thread(target=self.thread_worker)
|
|
log.debug('Spawning RS232 Thread')
|
|
self.thread.start()
|
|
|
|
def thread_worker(self):
|
|
while True:
|
|
sleep(0.4)
|
|
while True:
|
|
rx = self.serial.read(1)
|
|
if len(rx) == 0:
|
|
break
|
|
|
|
old_state = self.state
|
|
if rx == DoorHandler.BUTTON_CLOSE:
|
|
self.close()
|
|
log.info('Closed due to Button press')
|
|
#emit_status(LogicResponse.ButtonLock)
|
|
elif rx == DoorHandler.BUTTON_OPEN:
|
|
self.open()
|
|
log.info('Opened due to Button press')
|
|
#emit_status(LogicResponse.ButtonUnlock)
|
|
elif rx == DoorHandler.BUTTON_PRESENT:
|
|
self.present()
|
|
log.info('Present due to Button press')
|
|
#emit_status(LogicResponse.ButtonPresent)
|
|
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
|
|
log.warning('Emergency unlock')
|
|
#emit_status(LogicResponse.EmergencyUnlock)
|
|
else:
|
|
log.error('Received unknown message "%s" from AVR' % rx)
|
|
|
|
self.sound_helper(old_state, self.state, True)
|
|
|
|
if self.do_close:
|
|
tx = DoorHandler.CMD_CLOSE
|
|
self.do_close = False
|
|
elif self.state == DoorState.Present:
|
|
tx = DoorHandler.CMD_PRESENT
|
|
elif self.state == DoorState.Open:
|
|
tx = DoorHandler.CMD_OPEN
|
|
else:
|
|
continue
|
|
|
|
self.serial.write(tx)
|
|
self.serial.flush()
|
|
|
|
def open(self):
|
|
if self.state == DoorState.Open:
|
|
return DoorlockResponse.AlreadyActive
|
|
|
|
self.state = DoorState.Open
|
|
start_hook('post_unlock')
|
|
return DoorlockResponse.Success
|
|
|
|
def present(self):
|
|
if self.state == DoorState.Present:
|
|
return DoorlockResponse.AlreadyActive
|
|
|
|
self.state = DoorState.Present
|
|
start_hook('post_present')
|
|
return DoorlockResponse.Success
|
|
|
|
def close(self):
|
|
if self.state == DoorState.Closed:
|
|
return DoorlockResponse.AlreadyActive
|
|
|
|
self.do_close = True
|
|
self.state = DoorState.Closed
|
|
start_hook('post_lock')
|
|
return DoorlockResponse.Success
|
|
|
|
def request(self, state):
|
|
old_state = self.state
|
|
if state == DoorState.Closed:
|
|
err = self.close()
|
|
elif state == DoorState.Present:
|
|
err = self.present()
|
|
elif state == DoorState.Open:
|
|
err = self.open()
|
|
|
|
self.sound_helper(old_state, self.state, False)
|
|
emit_doorstate()
|
|
return err
|
|
|
|
def sound_helper(self, old_state, new_state, button):
|
|
if not self.sounds:
|
|
return
|
|
|
|
# TBD: Emergency Unlock
|
|
# wave_emergency = 'emergency_unlock.wav'
|
|
|
|
if old_state == new_state:
|
|
filename = self.wave_zonk
|
|
elif button:
|
|
if new_state == DoorState.Open:
|
|
filename = self.wave_unlock_button
|
|
elif new_state == DoorState.Present:
|
|
filename = self.wave_present_button
|
|
elif new_state == DoorState.Closed:
|
|
filename = self.wave_lock_button
|
|
else:
|
|
if new_state == DoorState.Open:
|
|
filename = self.wave_unlock
|
|
elif new_state == DoorState.Present:
|
|
filename = self.wave_present
|
|
elif new_state == DoorState.Closed:
|
|
filename = self.wave_lock
|
|
|
|
Popen(['nohup', 'aplay', join(self.sounds_prefix, filename)])
|
|
|
|
|
|
class Logic:
|
|
def __init__(self, cfg, sounds_prefix):
|
|
self.auth = Authenticator(cfg)
|
|
self.door_handler = DoorHandler(cfg, sounds_prefix)
|
|
|
|
def request(self, state, credentials):
|
|
err = self.auth.try_auth(credentials)
|
|
if err != DoorlockResponse.Success:
|
|
return err
|
|
|
|
return self.door_handler.request(state)
|
|
|
|
@property
|
|
def state(self):
|
|
return self.door_handler.state
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=log_level, stream=sys.stdout,
|
|
format=log_fmt, datefmt=date_fmt)
|
|
log.info('Starting doorlockd')
|
|
|
|
sounds_prefix = join(root_prefix, 'sounds')
|
|
logic = Logic(cfg, sounds_prefix)
|
|
|
|
static_folder = abspath(join(root_prefix, 'static'))
|
|
template_folder = abspath(join(root_prefix, 'templates'))
|
|
webapp_run(cfg, logic, __status__, __version__, template_folder,
|
|
static_folder)
|
|
|
|
sys.exit(0)
|