update nuki state handling

update state handling so that manual state changes of the nuki (key or
button press) are reflected to the internal doorlock state. Refactor
general state handling to support this

Signed-off-by: Thomas Schmid <tom@lfence.de>
This commit is contained in:
Thomas 2024-01-03 21:10:07 +01:00
parent 5fd4a5c8c1
commit 03e27930ce
7 changed files with 131 additions and 97 deletions

View File

@ -25,7 +25,8 @@ from os.path import abspath, join
from pydoorlock.Authenticator import Authenticator
from pydoorlock.WebApp import webapp_run, emit_doorstate
from pydoorlock.Doorlock import DoorlockResponse, DoorHandler
from pydoorlock.Doorlock import DoorHandler
from pydoorlock.DoorlockResponse import DoorlockResponse
from pydoorlock.Config import Config, root_prefix, sounds_prefix
__author__ = 'Ralf Ramsauer'

View File

@ -21,7 +21,7 @@ import logging
from enum import Enum
from .Doorlock import DoorlockResponse
from .DoorlockResponse import DoorlockResponse
log = logging.getLogger()

View File

@ -17,8 +17,6 @@ details.
import logging
from enum import Enum
from random import sample
from subprocess import run
from threading import Thread
from time import sleep
@ -28,83 +26,17 @@ from pydoorlock.NukiBridge import NukiBridge
from pydoorlock.SimulationBackend import SimulationBackend
from .Config import Config
from .DoorlockResponse import DoorlockResponse
from .Door import DoorState
from .Protocol import Protocol
log = logging.getLogger()
# copied from sudo
eperm_insults = {
'Wrong! You cheating scum!',
'And you call yourself a Rocket Scientist!',
'No soap, honkie-lips.',
'Where did you learn to type?',
'Are you on drugs?',
'My pet ferret can type better than you!',
'You type like i drive.',
'Do you think like you type?',
'Your mind just hasn\'t been the same since the electro-shock, has it?',
'Maybe if you used more than just two fingers...',
'BOB says: You seem to have forgotten your passwd, enter another!',
'stty: unknown mode: doofus',
'I can\'t hear you -- I\'m using the scrambler.',
'The more you drive -- the dumber you get.',
'Listen, broccoli brains, I don\'t have time to listen to this trash.',
'I\'ve seen penguins that can type better than that.',
'Have you considered trying to match wits with a rutabaga?',
'You speak an infinite deal of nothing',
}
def choose_insult():
return sample(eperm_insults, 1)[0]
def run_background(cmd):
run('%s &' % cmd, shell=True)
class DoorlockResponse(Enum):
Success = 0
Perm = 1
AlreadyActive = 2
# don't break old apps, value 3 is reserved now
RESERVED = 3
Inval = 4
BackendError = 5
EmergencyUnlock = 10,
ButtonLock = 11,
ButtonUnlock = 12,
ButtonPresent = 13,
def __str__(self):
if self == DoorlockResponse.Success:
return 'Yo, passt.'
elif self == DoorlockResponse.Perm:
return choose_insult()
elif self == DoorlockResponse.AlreadyActive:
return 'Zustand bereits aktiv'
elif self == DoorlockResponse.Inval:
return 'Das was du willst geht nicht.'
elif self == DoorlockResponse.EmergencyUnlock:
return '!!! Emergency Unlock !!!'
elif self == DoorlockResponse.ButtonLock:
return 'Closed by button'
elif self == DoorlockResponse.ButtonUnlock:
return 'Opened by button'
elif self == DoorlockResponse.ButtonPresent:
return 'Present by button'
elif self == DoorlockResponse.BackendError:
return "Backend Error"
return 'Error'
class DoorHandler:
state = DoorState.Closed
do_close = False
wave_lock = 'lock.wav'
wave_lock_button = 'lock_button.wav'
@ -144,18 +76,11 @@ class DoorHandler:
log.error(f"Unknown backend {backend_type}")
raise RuntimeError
def state_changed(self, new_state):
if new_state == DoorState.Open:
self.run_hook('post_unlock')
elif new_state == DoorState.Present:
self.run_hook('post_present')
elif new_state == DoorState.Closed:
self.run_hook('post_lock')
else:
return
self.state = new_state
self.backend.set_state(self.state)
self.backend.register_state_changed_handler(self.update_state)
def backend_state_change_handler(self, new_state):
self.update_state(self.state, new_state, DoorlockResponse.Success)
def open(self):
if self.state == DoorState.Open:
@ -163,7 +88,6 @@ class DoorHandler:
if self.backend.set_state(DoorState.Open):
self.state = DoorState.Open
self.run_hook('post_unlock')
return DoorlockResponse.Success
return DoorlockResponse.BackendError
@ -174,7 +98,6 @@ class DoorHandler:
if self.backend.set_state(DoorState.Present):
self.state = DoorState.Present
self.run_hook('post_present')
return DoorlockResponse.Success
return DoorlockResponse.BackendError
@ -185,7 +108,6 @@ class DoorHandler:
if self.backend.set_state(DoorState.Closed):
self.state = DoorState.Closed
self.run_hook('post_lock')
return DoorlockResponse.Success
return DoorlockResponse.BackendError
@ -199,10 +121,23 @@ class DoorHandler:
elif state == DoorState.Open:
err = self.open()
self.sound_helper(old_state, self.state, False)
self.invoke_callback(err)
self.update_state(old_state, self.state, err)
return err
def update_state(self, old_state, new_state, response = DoorlockResponse.Success):
if (old_state != new_state) and response == DoorlockResponse.Success:
if new_state == DoorState.Open:
self.run_hook('post_unlock')
elif new_state == DoorState.Closed:
self.run_hook('post_lock')
elif new_state == DoorState.Present:
self.run_hook('post_present')
self.state = new_state
self.sound_helper(old_state, new_state, False)
self.invoke_callback(response)
def sound_helper(self, old_state, new_state, button):
if not self.sounds:
return

View File

@ -23,8 +23,8 @@ class DoorlockBackend(ABC):
self.callbacks = list()
self.current_state = None
def update_state(self):
self.state_change_callback(self.current_state)
def register_state_changed_handler(self, callback):
self.state_change_callback = callback
@abstractmethod
def set_state(self, state):

View File

@ -0,0 +1,65 @@
from enum import Enum
from random import sample
# copied from sudo
eperm_insults = {
'Wrong! You cheating scum!',
'And you call yourself a Rocket Scientist!',
'No soap, honkie-lips.',
'Where did you learn to type?',
'Are you on drugs?',
'My pet ferret can type better than you!',
'You type like i drive.',
'Do you think like you type?',
'Your mind just hasn\'t been the same since the electro-shock, has it?',
'Maybe if you used more than just two fingers...',
'BOB says: You seem to have forgotten your passwd, enter another!',
'stty: unknown mode: doofus',
'I can\'t hear you -- I\'m using the scrambler.',
'The more you drive -- the dumber you get.',
'Listen, broccoli brains, I don\'t have time to listen to this trash.',
'I\'ve seen penguins that can type better than that.',
'Have you considered trying to match wits with a rutabaga?',
'You speak an infinite deal of nothing',
}
def choose_insult():
return sample(eperm_insults, 1)[0]
class DoorlockResponse(Enum):
Success = 0
Perm = 1
AlreadyActive = 2
# don't break old apps, value 3 is reserved now
RESERVED = 3
Inval = 4
BackendError = 5
EmergencyUnlock = 10,
ButtonLock = 11,
ButtonUnlock = 12,
ButtonPresent = 13,
def __str__(self):
if self == DoorlockResponse.Success:
return 'Yo, passt.'
elif self == DoorlockResponse.Perm:
return choose_insult()
elif self == DoorlockResponse.AlreadyActive:
return 'Zustand bereits aktiv'
elif self == DoorlockResponse.Inval:
return 'Das was du willst geht nicht.'
elif self == DoorlockResponse.EmergencyUnlock:
return '!!! Emergency Unlock !!!'
elif self == DoorlockResponse.ButtonLock:
return 'Closed by button'
elif self == DoorlockResponse.ButtonUnlock:
return 'Opened by button'
elif self == DoorlockResponse.ButtonPresent:
return 'Present by button'
elif self == DoorlockResponse.BackendError:
return "Backend Error"
return 'Error'

View File

@ -25,8 +25,17 @@ import requests
from .Door import DoorState
from .DoorlockBackend import DoorlockBackend
from .DoorlockResponse import DoorlockResponse
log = logging.getLogger(__name__)
reqlog = logging.getLogger('urllib3')
reqlog.setLevel(logging.WARNING)
def compare_dicts(d1, d2, ignore_keys):
ka = set(d1).difference(ignore_keys)
kb = set(d2).difference(ignore_keys)
return ka == kb and all(d1[k] == d2[k] for k in ka)
class NukiBridgeDevice():
"""
@ -80,6 +89,9 @@ class NukiBridgeDevice():
if info["nukiId"] == self.get_device_id():
return info["lastKnownState"]
def compare_device_state(state1, state2):
return compare_dicts(state1, state2, ["timestamp"])
def lock(self, device_name: str = None):
if device_name == None:
nukiId = self.device_id
@ -121,25 +133,46 @@ class NukiBridge(DoorlockBackend):
self.device = NukiBridgeDevice(endpoint, api_key, device_name)
self.poll_thread = threading.Thread(target=self.poll_worker)
self.poll_thread.start()
self.current_state = DoorState.Closed
def poll_worker(self):
while True:
state = self.device.get_device_state()
last_dev_state = self.device.get_device_state()
if state is None:
while True:
dev_state = self.device.get_device_state()
if dev_state is None:
continue
self.current_state = state
log.debug(f"Nuki reported state: {state}")
if not NukiBridgeDevice.compare_device_state(dev_state, last_dev_state):
log.debug(f"Nuki changed state: {dev_state}")
if self.current_state != DoorState.Closed and dev_state["stateName"] == "locked":
self.current_state = DoorState.Closed
self.state_change_callback(self.current_state, DoorlockResponse.Success)
if self.current_state != DoorState.Open and dev_state["stateName"] == "unlocked":
self.current_state = DoorState.Open
self.state_change_callback(self.current_state, DoorlockResponse.Success)
last_dev_state = dev_state
time.sleep(10)
def set_state(self, state):
success = False
if state == DoorState.Open:
log.info("open nuki")
return self.device.unlock()
if self.device.unlock():
self.current_state = DoorState.Open
success = True
elif state == DoorState.Closed:
log.info("close nuki")
return self.device.lock()
if self.device.lock():
self.current_state = DoorState.Closed
success = True
return success
def get_state(self, state):
return self.current_state

View File

@ -25,7 +25,7 @@ from wtforms import PasswordField, StringField, SubmitField
from wtforms.validators import DataRequired, Length
from .Door import DoorState
from .Doorlock import DoorlockResponse
from .DoorlockResponse import DoorlockResponse
log = logging.getLogger()
webapp = Flask(__name__)