mirror of
https://github.com/binary-kitchen/doorlockd
synced 2024-12-21 10:04:26 +01:00
540 lines
16 KiB
Python
Executable File
540 lines
16 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
"""
|
|
Doorlockd -- Binary Kitchen's smart door opener
|
|
|
|
Copyright (c) Binary Kitchen e.V., 2018
|
|
|
|
Author:
|
|
Ralf Ramsauer <ralf@binary-kitchen.de>
|
|
|
|
This work is licensed under the terms of the GNU GPL, version 2. See
|
|
the LICENSE file in the top-level directory.
|
|
|
|
This program is distributed in the hope that it will be useful, but WITHOUT
|
|
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
|
|
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
|
|
details.
|
|
"""
|
|
|
|
import hashlib
|
|
import ldap
|
|
import logging
|
|
import sys
|
|
|
|
from enum import Enum
|
|
from os.path import join
|
|
from random import sample
|
|
from serial import Serial
|
|
from subprocess import Popen
|
|
from threading import Thread
|
|
from time import sleep
|
|
|
|
from flask import abort, Flask, jsonify, render_template, request
|
|
from flask_socketio import SocketIO
|
|
from flask_wtf import FlaskForm
|
|
from wtforms import PasswordField, StringField, SubmitField
|
|
from wtforms.validators import DataRequired, Length
|
|
|
|
SYSCONFDIR = '.'
|
|
PREFIX = '.'
|
|
|
|
root_prefix = join(PREFIX, 'share', 'doorlockd')
|
|
sounds_prefix = join(root_prefix, 'sounds')
|
|
static_folder = join(root_prefix, 'static')
|
|
template_folder = join(root_prefix, 'templates')
|
|
scripts_prefix = join(root_prefix, 'scripts')
|
|
|
|
flask_config = join(SYSCONFDIR, 'doorlockd.cfg')
|
|
|
|
__author__ = 'Ralf Ramsauer'
|
|
__copyright = 'Copyright (c) Ralf Ramsauer, 2018'
|
|
__license__ = 'GPLv2'
|
|
__email__ = 'ralf@binary-kitchen.de'
|
|
__status__ = 'Development'
|
|
__maintainer__ = 'Ralf Ramsauer'
|
|
__version__ = '2.0-rc1'
|
|
|
|
log_level = logging.DEBUG
|
|
date_fmt = '%Y-%m-%d %H:%M:%S'
|
|
log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s'
|
|
log = logging.getLogger()
|
|
|
|
default_ldap_uri = 'ldaps://ldap1.binary.kitchen/ ' \
|
|
'ldaps://ldap2.binary.kitchen/ ' \
|
|
'ldaps://ldapm.binary.kitchen/'
|
|
default_binddn = 'cn=%s,ou=people,dc=binary-kitchen,dc=de'
|
|
|
|
webapp = Flask(__name__,
|
|
template_folder=template_folder,
|
|
static_folder=static_folder)
|
|
webapp.config.from_pyfile(flask_config)
|
|
socketio = SocketIO(webapp, async_mode='threading')
|
|
serial_port = webapp.config.get('SERIAL_PORT')
|
|
simulate_ldap = webapp.config.get('SIMULATE_LDAP')
|
|
simulate_serial = webapp.config.get('SIMULATE_SERIAL')
|
|
run_hooks = webapp.config.get('RUN_HOOKS')
|
|
room = webapp.config.get('ROOM')
|
|
title = webapp.config.get('TITLE')
|
|
welcome = webapp.config.get('WELCOME')
|
|
file_local_db = webapp.config.get('LOCAL_USER_DB')
|
|
|
|
html_title = '%s (%s - v%s)' % (title, __status__, __version__)
|
|
|
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
|
|
ldap.set_option(ldap.OPT_REFERRALS, 0)
|
|
|
|
ldap_uri = webapp.config.get('LDAP_URI')
|
|
ldap_binddn = webapp.config.get('LDAP_BINDDN')
|
|
|
|
wave_emergency = 'emergency_unlock.wav'
|
|
wave_lock = 'lock.wav'
|
|
wave_lock_button = 'lock_button.wav'
|
|
wave_unlock = 'unlock.wav'
|
|
wave_unlock_button = 'unlock_button.wav'
|
|
wave_zonk = 'zonk.wav'
|
|
|
|
sounds = webapp.config.get('SOUNDS')
|
|
|
|
host = 'localhost'
|
|
if webapp.config.get('DEBUG'):
|
|
host = '0.0.0.0'
|
|
|
|
# copied from sudo
|
|
eperm_insults = {
|
|
'Wrong! You cheating scum!',
|
|
'And you call yourself a Rocket Scientist!',
|
|
'No soap, honkie-lips.',
|
|
'Where did you learn to type?',
|
|
'Are you on drugs?',
|
|
'My pet ferret can type better than you!',
|
|
'You type like i drive.',
|
|
'Do you think like you type?',
|
|
'Your mind just hasn\'t been the same since the electro-shock, has it?',
|
|
'Maybe if you used more than just two fingers...',
|
|
'BOB says: You seem to have forgotten your passwd, enter another!',
|
|
'stty: unknown mode: doofus',
|
|
'I can\'t hear you -- I\'m using the scrambler.',
|
|
'The more you drive -- the dumber you get.',
|
|
'Listen, broccoli brains, I don\'t have time to listen to this trash.',
|
|
'I\'ve seen penguins that can type better than that.',
|
|
'Have you considered trying to match wits with a rutabaga?',
|
|
'You speak an infinite deal of nothing',
|
|
}
|
|
|
|
|
|
def choose_insult():
|
|
return(sample(eperm_insults, 1)[0])
|
|
|
|
|
|
def playsound(filename):
|
|
if not sounds:
|
|
return
|
|
Popen(['nohup', 'aplay', join(sounds_prefix, filename)])
|
|
|
|
|
|
def start_hook(script):
|
|
if not run_hooks:
|
|
log.info('Hooks disabled: not starting %s' % script)
|
|
return
|
|
log.info('Starting hook %s' % script)
|
|
Popen(['nohup', join(scripts_prefix, script)])
|
|
|
|
|
|
class AuthMethod(Enum):
|
|
LDAP_USER_PW = 1
|
|
LOCAL_USER_DB = 2
|
|
|
|
|
|
class DoorState(Enum):
|
|
# These numbers are used by the App since version 3.0, do NOT change them
|
|
Open = 0
|
|
Present = 1
|
|
Closed = 2
|
|
|
|
def from_string(string):
|
|
if string == 'lock':
|
|
return DoorState.Closed
|
|
elif string == 'unlock':
|
|
return DoorState.Open
|
|
elif string == 'present':
|
|
return DoorState.Present
|
|
|
|
return None
|
|
|
|
def is_open(self):
|
|
if self != DoorState.Closed:
|
|
return True
|
|
return False
|
|
|
|
def to_img(self):
|
|
led = 'red'
|
|
if self == DoorState.Present:
|
|
led = 'yellow'
|
|
elif self == DoorState.Open:
|
|
led = 'green'
|
|
return 'static/led-%s.png' % led
|
|
|
|
def to_html(self):
|
|
if self == DoorState.Open:
|
|
return 'Offen'
|
|
elif self == DoorState.Present:
|
|
return 'Jemand da'
|
|
return 'Geschlossen'
|
|
|
|
|
|
class LogicResponse(Enum):
|
|
Success = 0
|
|
Perm = 1
|
|
AlreadyActive = 2
|
|
# don't break old apps, value 3 is reserved now
|
|
RESERVED = 3
|
|
Inval = 4
|
|
LDAP = 5
|
|
|
|
EmergencyUnlock = 10,
|
|
ButtonLock = 11,
|
|
ButtonUnlock = 12,
|
|
ButtonPresent = 13,
|
|
|
|
def to_html(self):
|
|
if self == LogicResponse.Success:
|
|
return 'Yo, passt.'
|
|
elif self == LogicResponse.Perm:
|
|
return choose_insult()
|
|
elif self == LogicResponse.AlreadyActive:
|
|
return 'Zustand bereits aktiv'
|
|
elif self == LogicResponse.Inval:
|
|
return 'Das was du willst geht nicht.'
|
|
elif self == LogicResponse.LDAP:
|
|
return 'Moep! Geh LDAP fixen!'
|
|
elif self == LogicResponse.EmergencyUnlock:
|
|
return '!!! Emergency Unlock !!!'
|
|
elif self == LogicResponse.ButtonLock:
|
|
return 'Closed by button'
|
|
elif self == LogicResponse.ButtonUnlock:
|
|
return 'Opened by button'
|
|
elif self == LogicResponse.ButtonPresent:
|
|
return 'Present by button'
|
|
|
|
return 'Bitte spezifizieren Sie.'
|
|
|
|
|
|
class DoorHandler:
|
|
state = DoorState.Closed
|
|
do_close = False
|
|
|
|
CMD_PRESENT = b'y'
|
|
CMD_OPEN = b'g'
|
|
CMD_CLOSE = b'r'
|
|
|
|
BUTTON_PRESENT = b'Y'
|
|
BUTTON_OPEN = b'G'
|
|
BUTTON_CLOSE = b'R'
|
|
|
|
CMD_EMERGENCY_SWITCH = b'E'
|
|
# TBD DOOR NOT CLOSED
|
|
|
|
def __init__(self, device):
|
|
if simulate_serial:
|
|
return
|
|
|
|
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
|
|
stopbits=1, timeout=0)
|
|
self.thread = Thread(target=self.thread_worker)
|
|
log.debug('Spawning RS232 Thread')
|
|
self.thread.start()
|
|
|
|
def thread_worker(self):
|
|
while True:
|
|
sleep(0.4)
|
|
while True:
|
|
rx = self.serial.read(1)
|
|
if len(rx) == 0:
|
|
break
|
|
if rx == DoorHandler.BUTTON_CLOSE:
|
|
self.close()
|
|
log.info('Closed due to Button press')
|
|
logic.emit_status(LogicResponse.ButtonLock)
|
|
elif rx == DoorHandler.BUTTON_OPEN:
|
|
self.open()
|
|
log.info('Opened due to Button press')
|
|
logic.emit_status(LogicResponse.ButtonUnlock)
|
|
elif rx == DoorHandler.BUTTON_PRESENT:
|
|
self.present()
|
|
log.info('Present due to Button press')
|
|
logic.emit_status(LogicResponse.ButtonPresent)
|
|
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
|
|
log.warning('Emergency unlock')
|
|
logic.emit_status(LogicResponse.EmergencyUnlock)
|
|
else:
|
|
log.error('Received unknown message "%s" from AVR' % rx)
|
|
|
|
if self.do_close:
|
|
tx = DoorHandler.CMD_CLOSE
|
|
self.do_close = False
|
|
elif self.state == DoorState.Present:
|
|
tx = DoorHandler.CMD_PRESENT
|
|
elif self.state == DoorState.Open:
|
|
tx = DoorHandler.CMD_OPEN
|
|
else:
|
|
continue
|
|
|
|
self.serial.write(tx)
|
|
self.serial.flush()
|
|
|
|
def open(self):
|
|
if self.state == DoorState.Open:
|
|
return LogicResponse.AlreadyActive
|
|
|
|
self.state = DoorState.Open
|
|
start_hook('post_unlock')
|
|
return LogicResponse.Success
|
|
|
|
def present(self):
|
|
if self.state == DoorState.Present:
|
|
return LogicResponse.AlreadyActive
|
|
|
|
self.state = DoorState.Present
|
|
start_hook('post_present')
|
|
return LogicResponse.Success
|
|
|
|
def close(self):
|
|
if self.state == DoorState.Closed:
|
|
return LogicResponse.AlreadyActive
|
|
|
|
self.do_close = True
|
|
self.state = DoorState.Closed
|
|
start_hook('post_lock')
|
|
return LogicResponse.Success
|
|
|
|
def request(self, state):
|
|
if state == DoorState.Closed:
|
|
return self.close()
|
|
elif state == DoorState.Present:
|
|
return self.present()
|
|
elif state == DoorState.Open:
|
|
return self.open()
|
|
|
|
|
|
class Logic:
|
|
def __init__(self):
|
|
self.door_handler = DoorHandler(serial_port)
|
|
self.local_db = dict()
|
|
|
|
if not file_local_db:
|
|
return
|
|
|
|
with open(file_local_db, 'r') as f:
|
|
for line in f:
|
|
line = line.split()
|
|
user = line[0]
|
|
pwd = line[1].split(':')
|
|
self.local_db[user] = pwd
|
|
|
|
def _try_auth_local(self, user, password):
|
|
if user not in self.local_db:
|
|
return LogicResponse.Perm
|
|
|
|
stored_pw = self.local_db[user][0]
|
|
stored_salt = self.local_db[user][1]
|
|
if stored_pw == hashlib.sha256(stored_salt.encode() + password.encode()).hexdigest():
|
|
return LogicResponse.Success
|
|
|
|
return LogicResponse.Perm
|
|
|
|
def _try_auth_ldap(self, user, password):
|
|
if simulate_ldap:
|
|
log.info('SIMULATION MODE! ACCEPTING ANYTHING!')
|
|
return LogicResponse.Success
|
|
|
|
log.info(' Trying to LDAP auth (user, password) as user %s', user)
|
|
ldap_username = ldap_binddn % user
|
|
try:
|
|
l = ldap.initialize(ldap_uri)
|
|
l.simple_bind_s(ldap_username, password)
|
|
l.unbind_s()
|
|
except ldap.INVALID_CREDENTIALS:
|
|
log.info(' Invalid credentials')
|
|
return LogicResponse.Perm
|
|
except ldap.LDAPError as e:
|
|
log.info(' LDAP Error: %s' % e)
|
|
return LogicResponse.LDAP
|
|
return LogicResponse.Success
|
|
|
|
def try_auth(self, credentials):
|
|
method = credentials[0]
|
|
if method == AuthMethod.LDAP_USER_PW:
|
|
return self._try_auth_ldap(credentials[1], credentials[2])
|
|
elif method == AuthMethod.LOCAL_USER_DB:
|
|
return self._try_auth_local(credentials[1], credentials[2])
|
|
|
|
return LogicResponse.Inval
|
|
|
|
def _request(self, state, credentials):
|
|
err = self.try_auth(credentials)
|
|
if err != LogicResponse.Success:
|
|
return err
|
|
return self.door_handler.request(state)
|
|
|
|
def request(self, state, credentials):
|
|
err = self._request(state, credentials)
|
|
if err == LogicResponse.Success:
|
|
if self.door_handler.state == DoorState.Open:
|
|
playsound(wave_unlock)
|
|
if self.door_handler.state == DoorState.Closed:
|
|
playsound(wave_lock)
|
|
elif err == LogicResponse.AlreadyActive:
|
|
playsound(wave_zonk)
|
|
self.emit_status(err)
|
|
return err
|
|
|
|
def emit_status(self, message=None):
|
|
led = self.state.to_img()
|
|
if message is None:
|
|
message = self.state.to_html()
|
|
else:
|
|
message = message.to_html()
|
|
|
|
socketio.emit('status', {'led': led, 'message': message})
|
|
|
|
@property
|
|
def state(self):
|
|
return self.door_handler.state
|
|
|
|
|
|
class AuthenticationForm(FlaskForm):
|
|
username = StringField('Username', [Length(min=3, max=25)])
|
|
password = PasswordField('Password', [DataRequired()])
|
|
method = StringField('Method', [DataRequired()])
|
|
open = SubmitField('Open')
|
|
present = SubmitField('Present')
|
|
close = SubmitField('Close')
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
FlaskForm.__init__(self, *args, **kwargs)
|
|
self.desired_state = DoorState.Closed
|
|
|
|
def validate(self):
|
|
if not FlaskForm.validate(self):
|
|
return False
|
|
|
|
if self.open.data:
|
|
self.desired_state = DoorState.Open
|
|
elif self.present.data:
|
|
self.desired_state = DoorState.Present
|
|
|
|
if self.method.data == 'Local':
|
|
self.method = AuthMethod.LOCAL_USER_DB
|
|
else: # default: use LDAP
|
|
self.method = AuthMethod.LDAP_USER_PW
|
|
|
|
return True
|
|
|
|
|
|
@socketio.on('request_status')
|
|
@socketio.on('connect')
|
|
def on_connect():
|
|
logic.emit_status()
|
|
|
|
|
|
@webapp.route('/display')
|
|
def display():
|
|
return render_template('display.html',
|
|
room=room,
|
|
title=title,
|
|
welcome=welcome)
|
|
|
|
|
|
@webapp.route('/api', methods=['POST'])
|
|
def api():
|
|
def json_response(response, msg=None):
|
|
json = dict()
|
|
json['err'] = response.value
|
|
json['msg'] = response.to_html() if msg is None else msg
|
|
if response == LogicResponse.Success or \
|
|
response == LogicResponse.AlreadyActive:
|
|
# TBD: Remove 'open'. No more users. Still used in App Version 2.1.1!
|
|
json['open'] = logic.state.is_open()
|
|
json['status'] = logic.state.value
|
|
return jsonify(json)
|
|
|
|
method = request.form.get('method')
|
|
user = request.form.get('user')
|
|
password = request.form.get('pass')
|
|
command = request.form.get('command')
|
|
|
|
if method == 'local':
|
|
method = AuthMethod.LOCAL_USER_DB
|
|
else: # 'ldap' or default
|
|
method = AuthMethod.LDAP_USER_PW
|
|
|
|
if any(v is None for v in [user, password, command]):
|
|
log.warning('Incomplete API request')
|
|
abort(400)
|
|
|
|
if not user or not password:
|
|
log.warning('Invalid username or password format')
|
|
return json_response(LogicResponse.Inval,
|
|
'Invalid username or password format')
|
|
|
|
credentials = method, user, password
|
|
|
|
if command == 'status':
|
|
return json_response(logic.try_auth(credentials))
|
|
|
|
desired_state = DoorState.from_string(command)
|
|
if not desired_state:
|
|
return json_response(LogicResponse.Inval, "Invalid command requested")
|
|
|
|
log.info('Incoming API request from %s' % user.encode('utf-8'))
|
|
log.info(' desired state: %s' % desired_state)
|
|
log.info(' current state: %s' % logic.state)
|
|
|
|
response = logic.request(desired_state, credentials)
|
|
|
|
return json_response(response)
|
|
|
|
|
|
@webapp.route('/', methods=['GET', 'POST'])
|
|
def home():
|
|
authentication_form = AuthenticationForm()
|
|
response = None
|
|
|
|
if request.method == 'POST' and authentication_form.validate():
|
|
user = authentication_form.username.data
|
|
password = authentication_form.password.data
|
|
method = authentication_form.method
|
|
credentials = method, user, password
|
|
|
|
log.info('Incoming request from %s' % user.encode('utf-8'))
|
|
log.info(' authentication method: %s' % method)
|
|
desired_state = authentication_form.desired_state
|
|
log.info(' desired state: %s' % desired_state)
|
|
log.info(' current state: %s' % logic.state)
|
|
response = logic.request(desired_state, credentials)
|
|
log.info(' response: %s' % response)
|
|
|
|
# Don't trust python, zero credentials
|
|
user = password = credentials = None
|
|
|
|
return render_template('index.html',
|
|
authentication_form=authentication_form,
|
|
response=response,
|
|
state_text=logic.state.to_html(),
|
|
led=logic.state.to_img(),
|
|
banner='%s - %s' % (title, room))
|
|
|
|
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=log_level, stream=sys.stdout,
|
|
format=log_fmt, datefmt=date_fmt)
|
|
log.info('Starting doorlockd')
|
|
log.info('Using serial port: %s' % webapp.config.get('SERIAL_PORT'))
|
|
|
|
logic = Logic()
|
|
|
|
socketio.run(webapp, host=host, port=8080, use_reloader=False)
|
|
|
|
sys.exit(0)
|