#!/usr/bin/env python3 """ Doorlockd -- Binary Kitchen's smart door opener Copyright (c) Binary Kitchen e.V., 2018 Author: Ralf Ramsauer This work is licensed under the terms of the GNU GPL, version 2. See the LICENSE file in the top-level directory. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. """ import hashlib import ldap import logging import sys from enum import Enum from os.path import join from random import sample from serial import Serial from subprocess import Popen from threading import Thread from time import sleep from flask import abort, Flask, jsonify, render_template, request from flask_socketio import SocketIO from flask_wtf import FlaskForm from wtforms import PasswordField, StringField, SubmitField from wtforms.validators import DataRequired, Length SYSCONFDIR = '.' PREFIX = '.' root_prefix = join(PREFIX, 'share', 'doorlockd') sounds_prefix = join(root_prefix, 'sounds') static_folder = join(root_prefix, 'static') template_folder = join(root_prefix, 'templates') scripts_prefix = join(root_prefix, 'scripts') flask_config = join(SYSCONFDIR, 'doorlockd.cfg') __author__ = 'Ralf Ramsauer' __copyright = 'Copyright (c) Ralf Ramsauer, 2018' __license__ = 'GPLv2' __email__ = 'ralf@binary-kitchen.de' __status__ = 'Development' __maintainer__ = 'Ralf Ramsauer' __version__ = '0.01a' log_level = logging.DEBUG date_fmt = '%Y-%m-%d %H:%M:%S' log_fmt = '%(asctime)-15s %(levelname)-8s %(message)s' log = logging.getLogger() default_ldap_uri = 'ldaps://ldap1.binary.kitchen/ ' \ 'ldaps://ldap2.binary.kitchen/ ' \ 'ldaps://ldapm.binary.kitchen/' default_binddn = 'cn=%s,ou=people,dc=binary-kitchen,dc=de' webapp = Flask(__name__, template_folder=template_folder, static_folder=static_folder) webapp.config.from_pyfile(flask_config) socketio = SocketIO(webapp, async_mode='threading') serial_port = webapp.config.get('SERIAL_PORT') simulate_ldap = webapp.config.get('SIMULATE_LDAP') simulate_serial = webapp.config.get('SIMULATE_SERIAL') run_hooks = webapp.config.get('RUN_HOOKS') room = webapp.config.get('ROOM') title = webapp.config.get('TITLE') welcome = webapp.config.get('WELCOME') file_local_db = webapp.config.get('LOCAL_USER_DB') html_title = '%s (%s - v%s)' % (title, __status__, __version__) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) ldap.set_option(ldap.OPT_REFERRALS, 0) if 'LDAP_CA' in webapp.config.keys(): ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, webapp.config.get('LDAP_CA')) ldap_uri = webapp.config.get('LDAP_URI') ldap_binddn = webapp.config.get('LDAP_BINDDN') wave_emergency = 'emergency_unlock.wav' wave_lock = 'lock.wav' wave_lock_button = 'lock_button.wav' wave_unlock = 'unlock.wav' wave_unlock_button = 'unlock_button.wav' wave_zonk = 'zonk.wav' sounds = webapp.config.get('SOUNDS') host = 'localhost' if webapp.config.get('DEBUG'): host = '0.0.0.0' # copied from sudo eperm_insults = { 'Wrong! You cheating scum!', 'And you call yourself a Rocket Scientist!', 'No soap, honkie-lips.', 'Where did you learn to type?', 'Are you on drugs?', 'My pet ferret can type better than you!', 'You type like i drive.', 'Do you think like you type?', 'Your mind just hasn\'t been the same since the electro-shock, has it?', 'Maybe if you used more than just two fingers...', 'BOB says: You seem to have forgotten your passwd, enter another!', 'stty: unknown mode: doofus', 'I can\'t hear you -- I\'m using the scrambler.', 'The more you drive -- the dumber you get.', 'Listen, broccoli brains, I don\'t have time to listen to this trash.', 'I\'ve seen penguins that can type better than that.', 'Have you considered trying to match wits with a rutabaga?', 'You speak an infinite deal of nothing', } def choose_insult(): return(sample(eperm_insults, 1)[0]) def playsound(filename): if not sounds: return Popen(['nohup', 'aplay', join(sounds_prefix, filename)]) def start_hook(script): if not run_hooks: log.info('Hooks disabled: not starting %s' % script) return log.info('Starting hook %s' % script) Popen(['nohup', join(scripts_prefix, script)]) class AuthMethod(Enum): LDAP_USER_PW = 1 LOCAL_USER_DB = 2 class DoorState(Enum): # These numbers are used by the App since version 3.0, do NOT change them Open = 0 Present = 1 Closed = 2 def from_string(string): if string == 'lock': return DoorState.Closed elif string == 'unlock': return DoorState.Open elif string == 'present': return DoorState.Present return None def is_open(self): if self != DoorState.Closed: return True return False def to_img(self): led = 'red' if self == DoorState.Present: led = 'yellow' elif self == DoorState.Open: led = 'green' return 'static/led-%s.png' % led def to_html(self): if self == DoorState.Open: return 'Offen' elif self == DoorState.Present: return 'Jemand da' return 'Geschlossen' class LogicResponse(Enum): Success = 0 Perm = 1 AlreadyActive = 2 # don't break old apps, value 3 is reserved now RESERVED = 3 Inval = 4 LDAP = 5 EmergencyUnlock = 10, ButtonLock = 11, ButtonUnlock = 12, ButtonPresent = 13, def to_html(self): if self == LogicResponse.Success: return 'Yo, passt.' elif self == LogicResponse.Perm: return choose_insult() elif self == LogicResponse.AlreadyActive: return 'Zustand bereits aktiv' elif self == LogicResponse.Inval: return 'Das was du willst geht nicht.' elif self == LogicResponse.LDAP: return 'Moep! Geh LDAP fixen!' elif self == LogicResponse.EmergencyUnlock: return '!!! Emergency Unlock !!!' elif self == LogicResponse.ButtonLock: return 'Closed by button' elif self == LogicResponse.ButtonUnlock: return 'Opened by button' elif self == LogicResponse.ButtonPresent: return 'Present by button' return 'Bitte spezifizieren Sie.' class DoorHandler: state = DoorState.Closed do_close = False CMD_PRESENT = b'y' CMD_OPEN = b'g' CMD_CLOSE = b'r' BUTTON_PRESENT = b'Y' BUTTON_OPEN = b'G' BUTTON_CLOSE = b'R' CMD_EMERGENCY_SWITCH = b'E' # TBD DOOR NOT CLOSED def __init__(self, device): if simulate_serial: return self.serial = Serial(device, baudrate=9600, bytesize=8, parity='N', stopbits=1, timeout=0) self.thread = Thread(target=self.thread_worker) log.debug('Spawning RS232 Thread') self.thread.start() def thread_worker(self): while True: sleep(0.4) while True: rx = self.serial.read(1) if len(rx) == 0: break if rx == DoorHandler.BUTTON_CLOSE: self.close() log.info('Closed due to Button press') logic.emit_status(LogicResponse.ButtonLock) elif rx == DoorHandler.BUTTON_OPEN: self.open() log.info('Opened due to Button press') logic.emit_status(LogicResponse.ButtonUnlock) elif rx == DoorHandler.BUTTON_PRESENT: self.present() log.info('Present due to Button press') logic.emit_status(LogicResponse.ButtonPresent) elif rx == DoorHandler.CMD_EMERGENCY_SWITCH: log.warning('Emergency unlock') logic.emit_status(LogicResponse.EmergencyUnlock) else: log.error('Received unknown message "%s" from AVR' % rx) if self.do_close: tx = DoorHandler.CMD_CLOSE self.do_close = False elif self.state == DoorState.Present: tx = DoorHandler.CMD_PRESENT elif self.state == DoorState.Open: tx = DoorHandler.CMD_OPEN else: continue self.serial.write(tx) self.serial.flush() def open(self): if self.state == DoorState.Open: return LogicResponse.AlreadyActive self.state = DoorState.Open start_hook('post_unlock') return LogicResponse.Success def present(self): if self.state == DoorState.Present: return LogicResponse.AlreadyActive self.state = DoorState.Present start_hook('post_present') return LogicResponse.Success def close(self): if self.state == DoorState.Closed: return LogicResponse.AlreadyActive self.do_close = True self.state = DoorState.Closed start_hook('post_lock') return LogicResponse.Success def request(self, state): if state == DoorState.Closed: return self.close() elif state == DoorState.Present: return self.present() elif state == DoorState.Open: return self.open() class Logic: def __init__(self): self.door_handler = DoorHandler(serial_port) self.local_db = dict() if not file_local_db: return with open(file_local_db, 'r') as f: for line in f: line = line.split() user = line[0] pwd = line[1].split(':') self.local_db[user] = pwd def _try_auth_local(self, user, password): if user not in self.local_db: return LogicResponse.Perm stored_pw = self.local_db[user][0] stored_salt = self.local_db[user][1] if stored_pw == hashlib.sha256(stored_salt.encode() + password.encode()).hexdigest(): return LogicResponse.Success return LogicResponse.Perm def _try_auth_ldap(self, user, password): if simulate_ldap: log.info('SIMULATION MODE! ACCEPTING ANYTHING!') return LogicResponse.Success log.info(' Trying to LDAP auth (user, password) as user %s', user) ldap_username = ldap_binddn % user try: l = ldap.initialize(ldap_uri) l.simple_bind_s(ldap_username, password) l.unbind_s() except ldap.INVALID_CREDENTIALS: log.info(' Invalid credentials') return LogicResponse.Perm except ldap.LDAPError as e: log.info(' LDAP Error: %s' % e) return LogicResponse.LDAP return LogicResponse.Success def try_auth(self, credentials): method = credentials[0] if method == AuthMethod.LDAP_USER_PW: return self._try_auth_ldap(credentials[1], credentials[2]) elif method == AuthMethod.LOCAL_USER_DB: return self._try_auth_local(credentials[1], credentials[2]) return LogicResponse.Inval def _request(self, state, credentials): err = self.try_auth(credentials) if err != LogicResponse.Success: return err return self.door_handler.request(state) def request(self, state, credentials): err = self._request(state, credentials) if err == LogicResponse.Success: if self.door_handler.state == DoorState.Open: playsound(wave_unlock) if self.door_handler.state == DoorState.Closed: playsound(wave_lock) elif err == LogicResponse.AlreadyActive: playsound(wave_zonk) self.emit_status(err) return err def emit_status(self, message=None): led = self.state.to_img() if message is None: message = self.state.to_html() else: message = message.to_html() socketio.emit('status', {'led': led, 'message': message}) @property def state(self): return self.door_handler.state class AuthenticationForm(FlaskForm): username = StringField('Username', [Length(min=3, max=25)]) password = PasswordField('Password', [DataRequired()]) open = SubmitField('Open') present = SubmitField('Present') close = SubmitField('Close') def __init__(self, *args, **kwargs): FlaskForm.__init__(self, *args, **kwargs) self.desired_state = DoorState.Closed def validate(self): if not FlaskForm.validate(self): return False if self.open.data: self.desired_state = DoorState.Open elif self.present.data: self.desired_state = DoorState.Present return True @socketio.on('request_status') @socketio.on('connect') def on_connect(): logic.emit_status() @webapp.route('/display') def display(): return render_template('display.html', room=room, title=title, welcome=welcome) @webapp.route('/api', methods=['POST']) def api(): def json_response(response, msg=None): json = dict() json['err'] = response.value json['msg'] = response.to_html() if msg is None else msg if response == LogicResponse.Success or \ response == LogicResponse.AlreadyActive: # TBD: Remove 'open'. No more users. Still used in App Version 2.1.1! json['open'] = logic.state.is_open() json['status'] = logic.state.value return jsonify(json) method = request.form.get('method') user = request.form.get('user') password = request.form.get('pass') command = request.form.get('command') if method == 'local': method = AuthMethod.LOCAL_USER_DB else: # 'ldap' or default method = AuthMethod.LDAP_USER_PW if any(v is None for v in [user, password, command]): log.warning('Incomplete API request') abort(400) if not user or not password: log.warning('Invalid username or password format') return json_response(LogicResponse.Inval, 'Invalid username or password format') credentials = method, user, password if command == 'status': return json_response(logic.try_auth(credentials)) desired_state = DoorState.from_string(command) if not desired_state: return json_response(LogicResponse.Inval, "Invalid command requested") log.info('Incoming API request from %s' % user.encode('utf-8')) log.info(' desired state: %s' % desired_state) log.info(' current state: %s' % logic.state) response = logic.request(desired_state, credentials) return json_response(response) @webapp.route('/', methods=['GET', 'POST']) def home(): authentication_form = AuthenticationForm() response = None if request.method == 'POST' and authentication_form.validate(): user = authentication_form.username.data password = authentication_form.password.data credentials = AuthMethod.LDAP_USER_PW, user, password log.info('Incoming request from %s' % user.encode('utf-8')) desired_state = authentication_form.desired_state log.info(' desired state: %s' % desired_state) log.info(' current state: %s' % logic.state) response = logic.request(desired_state, credentials) log.info(' response: %s' % response) # Don't trust python, zero credentials user = password = credentials = None return render_template('index.html', authentication_form=authentication_form, response=response, state_text=logic.state.to_html(), led=logic.state.to_img(), banner='%s - %s' % (title, room)) if __name__ == '__main__': logging.basicConfig(level=log_level, stream=sys.stdout, format=log_fmt, datefmt=date_fmt) log.info('Starting doorlockd') log.info('Using serial port: %s' % webapp.config.get('SERIAL_PORT')) logic = Logic() socketio.run(webapp, host=host, port=8080, use_reloader=False) sys.exit(0)