doorlockd-mirror/doorlockd

566 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Doorlockd -- Binary Kitchen's smart door opener
Copyright (c) Binary Kitchen e.V., 2018
Author:
Ralf Ramsauer <ralf@binary-kitchen.de>
This work is licensed under the terms of the GNU GPL, version 2. See
the LICENSE file in the top-level directory.
This program is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
"""
import hashlib
import ldap
import logging
import sys
from enum import Enum
from os.path import join
from random import sample
from serial import Serial
from subprocess import Popen
from threading import Thread
from time import sleep
from flask import abort, Flask, jsonify, render_template, request
from flask_socketio import SocketIO
from flask_wtf import FlaskForm
from wtforms import PasswordField, StringField, SubmitField
from wtforms.validators import DataRequired, Length
SYSCONFDIR = '.'
PREFIX = '.'
root_prefix = join(PREFIX, 'share', 'doorlockd')
sounds_prefix = join(root_prefix, 'sounds')
static_folder = join(root_prefix, 'static')
template_folder = join(root_prefix, 'templates')
scripts_prefix = join(root_prefix, 'scripts')
flask_config = join(SYSCONFDIR, 'doorlockd.cfg')
__author__ = 'Ralf Ramsauer'
__copyright = 'Copyright (c) Ralf Ramsauer, 2018'
__license__ = 'GPLv2'
__email__ = 'ralf@binary-kitchen.de'
__status__ = 'Development'
__maintainer__ = 'Ralf Ramsauer'
__version__ = '2.0-rc1'
log_level = logging.DEBUG
date_fmt = '%Y-%m-%d %H:%M:%S'
log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s'
log = logging.getLogger()
default_ldap_uri = 'ldaps://ldap1.binary.kitchen/ ' \
'ldaps://ldap2.binary.kitchen/ ' \
'ldaps://ldapm.binary.kitchen/'
default_binddn = 'cn=%s,ou=people,dc=binary-kitchen,dc=de'
webapp = Flask(__name__,
template_folder=template_folder,
static_folder=static_folder)
webapp.config.from_pyfile(flask_config)
socketio = SocketIO(webapp, async_mode='threading')
serial_port = webapp.config.get('SERIAL_PORT')
simulate_ldap = webapp.config.get('SIMULATE_LDAP')
simulate_serial = webapp.config.get('SIMULATE_SERIAL')
run_hooks = webapp.config.get('RUN_HOOKS')
room = webapp.config.get('ROOM')
title = webapp.config.get('TITLE')
welcome = webapp.config.get('WELCOME')
file_local_db = webapp.config.get('LOCAL_USER_DB')
html_title = '%s (%s - v%s)' % (title, __status__, __version__)
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
ldap.set_option(ldap.OPT_REFERRALS, 0)
ldap_uri = webapp.config.get('LDAP_URI')
ldap_binddn = webapp.config.get('LDAP_BINDDN')
wave_emergency = 'emergency_unlock.wav'
wave_lock = 'lock.wav'
wave_lock_button = 'lock_button.wav'
wave_present = 'present.wav'
wave_present_button = 'present.wav'
wave_unlock = 'unlock.wav'
wave_unlock_button = 'unlock_button.wav'
wave_zonk = 'zonk.wav'
sounds = webapp.config.get('SOUNDS')
host = 'localhost'
if webapp.config.get('DEBUG'):
host = '0.0.0.0'
# copied from sudo
eperm_insults = {
'Wrong! You cheating scum!',
'And you call yourself a Rocket Scientist!',
'No soap, honkie-lips.',
'Where did you learn to type?',
'Are you on drugs?',
'My pet ferret can type better than you!',
'You type like i drive.',
'Do you think like you type?',
'Your mind just hasn\'t been the same since the electro-shock, has it?',
'Maybe if you used more than just two fingers...',
'BOB says: You seem to have forgotten your passwd, enter another!',
'stty: unknown mode: doofus',
'I can\'t hear you -- I\'m using the scrambler.',
'The more you drive -- the dumber you get.',
'Listen, broccoli brains, I don\'t have time to listen to this trash.',
'I\'ve seen penguins that can type better than that.',
'Have you considered trying to match wits with a rutabaga?',
'You speak an infinite deal of nothing',
}
def choose_insult():
return(sample(eperm_insults, 1)[0])
def playsound(filename):
if not sounds:
return
Popen(['nohup', 'aplay', join(sounds_prefix, filename)])
def start_hook(script):
if not run_hooks:
log.info('Hooks disabled: not starting %s' % script)
return
log.info('Starting hook %s' % script)
Popen(['nohup', join(scripts_prefix, script)])
def sound_helper(old_state, new_state, button):
if old_state == new_state:
playsound(wave_zonk)
if button:
if new_state == DoorState.Open:
playsound(wave_unlock_button)
elif new_state == DoorState.Present:
playsound(wave_present_button)
elif new_state == DoorState.Closed:
playsound(wave_lock_button)
else:
if new_state == DoorState.Open:
playsound(wave_unlock)
elif new_state == DoorState.Present:
playsound(wave_present)
elif new_state == DoorState.Closed:
playsound(wave_lock)
class AuthMethod(Enum):
LDAP_USER_PW = 1
LOCAL_USER_DB = 2
class DoorState(Enum):
# These numbers are used by the App since version 3.0, do NOT change them
Open = 0
Present = 1
Closed = 2
def from_string(string):
if string == 'lock':
return DoorState.Closed
elif string == 'unlock':
return DoorState.Open
elif string == 'present':
return DoorState.Present
return None
def is_open(self):
if self != DoorState.Closed:
return True
return False
def to_img(self):
led = 'red'
if self == DoorState.Present:
led = 'yellow'
elif self == DoorState.Open:
led = 'green'
return 'static/led-%s.png' % led
def to_html(self):
if self == DoorState.Open:
return 'Offen'
elif self == DoorState.Present:
return 'Jemand da'
return 'Geschlossen'
class LogicResponse(Enum):
Success = 0
Perm = 1
AlreadyActive = 2
# don't break old apps, value 3 is reserved now
RESERVED = 3
Inval = 4
LDAP = 5
EmergencyUnlock = 10,
ButtonLock = 11,
ButtonUnlock = 12,
ButtonPresent = 13,
def to_html(self):
if self == LogicResponse.Success:
return 'Yo, passt.'
elif self == LogicResponse.Perm:
return choose_insult()
elif self == LogicResponse.AlreadyActive:
return 'Zustand bereits aktiv'
elif self == LogicResponse.Inval:
return 'Das was du willst geht nicht.'
elif self == LogicResponse.LDAP:
return 'Moep! Geh LDAP fixen!'
elif self == LogicResponse.EmergencyUnlock:
return '!!! Emergency Unlock !!!'
elif self == LogicResponse.ButtonLock:
return 'Closed by button'
elif self == LogicResponse.ButtonUnlock:
return 'Opened by button'
elif self == LogicResponse.ButtonPresent:
return 'Present by button'
return 'Bitte spezifizieren Sie.'
class DoorHandler:
state = DoorState.Closed
do_close = False
CMD_PRESENT = b'y'
CMD_OPEN = b'g'
CMD_CLOSE = b'r'
BUTTON_PRESENT = b'Y'
BUTTON_OPEN = b'G'
BUTTON_CLOSE = b'R'
CMD_EMERGENCY_SWITCH = b'E'
# TBD DOOR NOT CLOSED
def __init__(self, device):
if simulate_serial:
return
self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N',
stopbits=1, timeout=0)
self.thread = Thread(target=self.thread_worker)
log.debug('Spawning RS232 Thread')
self.thread.start()
def thread_worker(self):
while True:
sleep(0.4)
while True:
rx = self.serial.read(1)
if len(rx) == 0:
break
old_state = self.state
if rx == DoorHandler.BUTTON_CLOSE:
self.close()
log.info('Closed due to Button press')
logic.emit_status(LogicResponse.ButtonLock)
elif rx == DoorHandler.BUTTON_OPEN:
self.open()
log.info('Opened due to Button press')
logic.emit_status(LogicResponse.ButtonUnlock)
elif rx == DoorHandler.BUTTON_PRESENT:
self.present()
log.info('Present due to Button press')
logic.emit_status(LogicResponse.ButtonPresent)
elif rx == DoorHandler.CMD_EMERGENCY_SWITCH:
log.warning('Emergency unlock')
logic.emit_status(LogicResponse.EmergencyUnlock)
else:
log.error('Received unknown message "%s" from AVR' % rx)
sound_helper(old_state, self.state, True)
if self.do_close:
tx = DoorHandler.CMD_CLOSE
self.do_close = False
elif self.state == DoorState.Present:
tx = DoorHandler.CMD_PRESENT
elif self.state == DoorState.Open:
tx = DoorHandler.CMD_OPEN
else:
continue
self.serial.write(tx)
self.serial.flush()
def open(self):
if self.state == DoorState.Open:
return LogicResponse.AlreadyActive
self.state = DoorState.Open
start_hook('post_unlock')
return LogicResponse.Success
def present(self):
if self.state == DoorState.Present:
return LogicResponse.AlreadyActive
self.state = DoorState.Present
start_hook('post_present')
return LogicResponse.Success
def close(self):
if self.state == DoorState.Closed:
return LogicResponse.AlreadyActive
self.do_close = True
self.state = DoorState.Closed
start_hook('post_lock')
return LogicResponse.Success
def request(self, state):
if state == DoorState.Closed:
return self.close()
elif state == DoorState.Present:
return self.present()
elif state == DoorState.Open:
return self.open()
class Logic:
def __init__(self):
self.door_handler = DoorHandler(serial_port)
self.local_db = dict()
if not file_local_db:
return
with open(file_local_db, 'r') as f:
for line in f:
line = line.split()
user = line[0]
pwd = line[1].split(':')
self.local_db[user] = pwd
def _try_auth_local(self, user, password):
if user not in self.local_db:
return LogicResponse.Perm
stored_pw = self.local_db[user][0]
stored_salt = self.local_db[user][1]
if stored_pw == hashlib.sha256(stored_salt.encode() + password.encode()).hexdigest():
return LogicResponse.Success
return LogicResponse.Perm
def _try_auth_ldap(self, user, password):
if simulate_ldap:
log.info('SIMULATION MODE! ACCEPTING ANYTHING!')
return LogicResponse.Success
log.info(' Trying to LDAP auth (user, password) as user %s', user)
ldap_username = ldap_binddn % user
try:
l = ldap.initialize(ldap_uri)
l.simple_bind_s(ldap_username, password)
l.unbind_s()
except ldap.INVALID_CREDENTIALS:
log.info(' Invalid credentials')
return LogicResponse.Perm
except ldap.LDAPError as e:
log.info(' LDAP Error: %s' % e)
return LogicResponse.LDAP
return LogicResponse.Success
def try_auth(self, credentials):
method = credentials[0]
if method == AuthMethod.LDAP_USER_PW:
return self._try_auth_ldap(credentials[1], credentials[2])
elif method == AuthMethod.LOCAL_USER_DB:
return self._try_auth_local(credentials[1], credentials[2])
return LogicResponse.Inval
def _request(self, state, credentials):
err = self.try_auth(credentials)
if err != LogicResponse.Success:
return err
return self.door_handler.request(state)
def request(self, state, credentials):
old_state = self.door_handler.state
err = self._request(state, credentials)
if err == LogicResponse.Success or err == LogicResponse.AlreadyActive:
sound_helper(old_state, self.door_handler.state, False)
self.emit_status(err)
return err
def emit_status(self, message=None):
led = self.state.to_img()
if message is None:
message = self.state.to_html()
else:
message = message.to_html()
socketio.emit('status', {'led': led, 'message': message})
@property
def state(self):
return self.door_handler.state
class AuthenticationForm(FlaskForm):
username = StringField('Username', [Length(min=3, max=25)])
password = PasswordField('Password', [DataRequired()])
method = StringField('Method', [DataRequired()])
open = SubmitField('Open')
present = SubmitField('Present')
close = SubmitField('Close')
def __init__(self, *args, **kwargs):
FlaskForm.__init__(self, *args, **kwargs)
self.desired_state = DoorState.Closed
def validate(self):
if not FlaskForm.validate(self):
return False
if self.open.data:
self.desired_state = DoorState.Open
elif self.present.data:
self.desired_state = DoorState.Present
if self.method.data == 'Local':
self.method = AuthMethod.LOCAL_USER_DB
else: # default: use LDAP
self.method = AuthMethod.LDAP_USER_PW
return True
@socketio.on('request_status')
@socketio.on('connect')
def on_connect():
logic.emit_status()
@webapp.route('/display')
def display():
return render_template('display.html',
room=room,
title=title,
welcome=welcome)
@webapp.route('/api', methods=['POST'])
def api():
def json_response(response, msg=None):
json = dict()
json['err'] = response.value
json['msg'] = response.to_html() if msg is None else msg
if response == LogicResponse.Success or \
response == LogicResponse.AlreadyActive:
# TBD: Remove 'open'. No more users. Still used in App Version 2.1.1!
json['open'] = logic.state.is_open()
json['status'] = logic.state.value
return jsonify(json)
method = request.form.get('method')
user = request.form.get('user')
password = request.form.get('pass')
command = request.form.get('command')
if method == 'local':
method = AuthMethod.LOCAL_USER_DB
else: # 'ldap' or default
method = AuthMethod.LDAP_USER_PW
if any(v is None for v in [user, password, command]):
log.warning('Incomplete API request')
abort(400)
if not user or not password:
log.warning('Invalid username or password format')
return json_response(LogicResponse.Inval,
'Invalid username or password format')
credentials = method, user, password
if command == 'status':
return json_response(logic.try_auth(credentials))
desired_state = DoorState.from_string(command)
if not desired_state:
return json_response(LogicResponse.Inval, "Invalid command requested")
log.info('Incoming API request from %s' % user.encode('utf-8'))
log.info(' desired state: %s' % desired_state)
log.info(' current state: %s' % logic.state)
response = logic.request(desired_state, credentials)
return json_response(response)
@webapp.route('/', methods=['GET', 'POST'])
def home():
authentication_form = AuthenticationForm()
response = None
if request.method == 'POST' and authentication_form.validate():
user = authentication_form.username.data
password = authentication_form.password.data
method = authentication_form.method
credentials = method, user, password
log.info('Incoming request from %s' % user.encode('utf-8'))
log.info(' authentication method: %s' % method)
desired_state = authentication_form.desired_state
log.info(' desired state: %s' % desired_state)
log.info(' current state: %s' % logic.state)
response = logic.request(desired_state, credentials)
log.info(' response: %s' % response)
# Don't trust python, zero credentials
user = password = credentials = None
return render_template('index.html',
authentication_form=authentication_form,
response=response,
state_text=logic.state.to_html(),
led=logic.state.to_img(),
banner='%s - %s' % (title, room))
if __name__ == '__main__':
logging.basicConfig(level=log_level, stream=sys.stdout,
format=log_fmt, datefmt=date_fmt)
log.info('Starting doorlockd')
log.info('Using serial port: %s' % webapp.config.get('SERIAL_PORT'))
logic = Logic()
socketio.run(webapp, host=host, port=8080, use_reloader=False)
sys.exit(0)