1
0
mirror of https://github.com/moepman/bk-dss synced 2024-12-22 12:44:27 +01:00

Indention and imports according to PEP8.

This commit is contained in:
Markus 2019-02-09 13:23:36 +01:00
parent f8ef1ec006
commit 8d7b72bf07

264
dss.py
View File

@ -1,16 +1,15 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import uuid
import ldap import ldap
import ldap.modlist import ldap.modlist
import uuid
from flask import Flask, render_template, redirect, url_for, session from flask import Flask, render_template, redirect, url_for, session
from flask_wtf import FlaskForm from flask_wtf import FlaskForm
from wtforms.fields import IntegerField, PasswordField, SelectField, StringField, SubmitField
from wtforms.validators import EqualTo, DataRequired
from passlib.hash import ldap_salted_sha1 from passlib.hash import ldap_salted_sha1
from redis import Redis from redis import Redis
from wtforms.fields import IntegerField, PasswordField, StringField, SubmitField
from wtforms.validators import EqualTo, DataRequired
app = Flask(__name__) app = Flask(__name__)
app.config.from_pyfile('config.cfg') app.config.from_pyfile('config.cfg')
@ -22,187 +21,192 @@ rdb = Redis(host=app.config.get('REDIS_HOST', '127.0.0.1'), password=app.config.
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
ldap.set_option(ldap.OPT_REFERRALS, 0) ldap.set_option(ldap.OPT_REFERRALS, 0)
if 'LDAP_CA' in app.config.keys(): if 'LDAP_CA' in app.config.keys():
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, app.config.get('LDAP_CA')) ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, app.config.get('LDAP_CA'))
class ReadOnlyField(StringField): class ReadOnlyField(StringField):
def __call__(self, *args, **kwargs): def __call__(self, *args, **kwargs):
kwargs.setdefault('readonly', True) kwargs.setdefault('readonly', True)
return super(ReadOnlyField, self).__call__(*args, **kwargs) return super(ReadOnlyField, self).__call__(*args, **kwargs)
class CreateForm(FlaskForm): class CreateForm(FlaskForm):
user = StringField('Username', validators = [DataRequired()]) user = StringField('Username', validators=[DataRequired()])
uid = IntegerField('User ID', validators = [DataRequired()]) uid = IntegerField('User ID', validators=[DataRequired()])
gn = StringField('Given Name', validators = [DataRequired()]) gn = StringField('Given Name', validators=[DataRequired()])
sn = StringField('Family Name', validators = [DataRequired()]) sn = StringField('Family Name', validators=[DataRequired()])
pwd1 = PasswordField('Password', validators = [DataRequired()]) pwd1 = PasswordField('Password', validators=[DataRequired()])
pwd2 = PasswordField('Password (repeat)', validators = [DataRequired(), EqualTo('pwd1', "Passwords must match")]) pwd2 = PasswordField('Password (repeat)', validators=[DataRequired(), EqualTo('pwd1', "Passwords must match")])
submit = SubmitField('Submit') submit = SubmitField('Submit')
class EditForm(FlaskForm): class EditForm(FlaskForm):
user = ReadOnlyField('Username') user = ReadOnlyField('Username')
pwd1 = PasswordField('New Password', validators = [DataRequired()]) pwd1 = PasswordField('New Password', validators=[DataRequired()])
pwd2 = PasswordField('New Password (repeat)', validators = [DataRequired(), EqualTo('pwd1', "Passwords must match")]) pwd2 = PasswordField('New Password (repeat)', validators=[DataRequired(), EqualTo('pwd1', "Passwords must match")])
submit = SubmitField('Submit') submit = SubmitField('Submit')
class LoginForm(FlaskForm): class LoginForm(FlaskForm):
user = StringField('Username', validators=[DataRequired()]) user = StringField('Username', validators=[DataRequired()])
pswd = PasswordField('Password', validators=[DataRequired()]) pswd = PasswordField('Password', validators=[DataRequired()])
submit = SubmitField('Login') submit = SubmitField('Login')
def makeSecret(password): def makeSecret(password):
return ldap_salted_sha1.encrypt(password) return ldap_salted_sha1.encrypt(password)
def isAdmin(): def isAdmin():
return isLoggedin() and rdb.hget(session['uuid'], 'user') in app.config.get('ADMINS', []) return isLoggedin() and rdb.hget(session['uuid'], 'user') in app.config.get('ADMINS', [])
def isLoggedin(): def isLoggedin():
return 'uuid' in session and rdb.exists(session['uuid']) return 'uuid' in session and rdb.exists(session['uuid'])
def buildNav(): def buildNav():
nav = [] nav = []
if isLoggedin(): if isLoggedin():
nav.append(('Edit own Account', 'edit')) nav.append(('Edit own Account', 'edit'))
if isAdmin(): if isAdmin():
nav.append(('List Accounts', 'list_users')) nav.append(('List Accounts', 'list_users'))
nav.append(('Create Account', 'create')) nav.append(('Create Account', 'create'))
nav.append(('Logout', 'logout')) nav.append(('Logout', 'logout'))
else: else:
nav.append(('Login', 'login')) nav.append(('Login', 'login'))
return nav return nav
@app.route('/') @app.route('/')
def index(): def index():
return render_template('index.html', nav=buildNav()) return render_template('index.html', nav=buildNav())
@app.route('/create', methods=['GET', 'POST']) @app.route('/create', methods=['GET', 'POST'])
def create(): def create():
if not isLoggedin(): if not isLoggedin():
return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav()) return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav())
form = CreateForm() form = CreateForm()
if form.validate_on_submit(): if form.validate_on_submit():
l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1')) l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1'))
try: try:
l.simple_bind_s(rdb.hget(session['uuid'], 'user'), rdb.hget(session['uuid'], 'pswd')) l.simple_bind_s(rdb.hget(session['uuid'], 'user'), rdb.hget(session['uuid'], 'pswd'))
d = { d = {
'user' : form.user.data, 'user': form.user.data,
'uid' : form.uid.data, 'uid': form.uid.data,
'gn' : form.gn.data, 'gn': form.gn.data,
'sn' : form.sn.data, 'sn': form.sn.data,
'pass' : makeSecret(form.pwd1.data) 'pass': makeSecret(form.pwd1.data)
} }
# add user # add user
user_dn = app.config.get('USER_DN').format(**d) user_dn = app.config.get('USER_DN').format(**d)
attrs = {} attrs = {}
for k,v in app.config.get('USER_ATTRS').iteritems(): for k, v in app.config.get('USER_ATTRS').iteritems():
if isinstance(v, str): if isinstance(v, str):
attrs[k] = v.format(**d) attrs[k] = v.format(**d)
elif isinstance(v, list): elif isinstance(v, list):
attrs[k] = [] attrs[k] = []
for e in v: for e in v:
attrs[k].append(e.format(**d)) attrs[k].append(e.format(**d))
l.add_s(user_dn, ldap.modlist.addModlist(attrs)) l.add_s(user_dn, ldap.modlist.addModlist(attrs))
# add user to group # add user to group
group_dn = app.config.get('GROUP_DN').format(**d) group_dn = app.config.get('GROUP_DN').format(**d)
l.modify_s(group_dn, [(ldap.MOD_ADD, 'memberUid', str(form.user.data))]) l.modify_s(group_dn, [(ldap.MOD_ADD, 'memberUid', str(form.user.data))])
except ldap.LDAPError as e: except ldap.LDAPError as e:
l.unbind_s() l.unbind_s()
message = "LDAP Error" message = "LDAP Error"
if 'desc' in e.message: if 'desc' in e.message:
message = message + " " + e.message['desc'] message = message + " " + e.message['desc']
if 'info' in e.message: if 'info' in e.message:
message = message + ": " + e.message['info'] message = message + ": " + e.message['info']
return render_template('error.html', message=message, nav=buildNav()) return render_template('error.html', message=message, nav=buildNav())
else: else:
l.unbind_s() l.unbind_s()
return render_template('success.html', message="User successfully created.", nav=buildNav()) return render_template('success.html', message="User successfully created.", nav=buildNav())
return render_template('create.html', form=form, nav=buildNav()) return render_template('create.html', form=form, nav=buildNav())
@app.route('/edit', methods=['GET', 'POST']) @app.route('/edit', methods=['GET', 'POST'])
def edit(): def edit():
if not isLoggedin(): if not isLoggedin():
return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav()) return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav())
form = EditForm() form = EditForm()
creds = rdb.hgetall(session['uuid']) creds = rdb.hgetall(session['uuid'])
if form.validate_on_submit(): if form.validate_on_submit():
npwd = form.pwd1.data npwd = form.pwd1.data
l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1')) l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1'))
try: try:
l.simple_bind_s(creds['user'], creds['pswd']) l.simple_bind_s(creds['user'], creds['pswd'])
l.passwd_s(creds['user'], creds['pswd'], npwd) l.passwd_s(creds['user'], creds['pswd'], npwd)
except ldap.INVALID_CREDENTIALS as e: except ldap.INVALID_CREDENTIALS as e:
form.user.errors.append(e.message['desc']) form.user.errors.append(e.message['desc'])
l.unbind_s() l.unbind_s()
return render_template('edit.html', form=form, nav=buildNav()) return render_template('edit.html', form=form, nav=buildNav())
else: else:
rdb.hset(session['uuid'], 'pswd', npwd) rdb.hset(session['uuid'], 'pswd', npwd)
l.unbind_s() l.unbind_s()
return render_template('success.html', message="User successfully edited.", nav=buildNav()) return render_template('success.html', message="User successfully edited.", nav=buildNav())
form.user.data = creds['user'] form.user.data = creds['user']
return render_template('edit.html', form=form, nav=buildNav()) return render_template('edit.html', form=form, nav=buildNav())
@app.route('/list') @app.route('/list')
def list_users(): def list_users():
if not isLoggedin(): if not isLoggedin():
return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav()) return render_template('error.html', message="You are not logged in. Please log in first.", nav=buildNav())
l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1')) l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1'))
l.simple_bind_s(rdb.hget(session['uuid'], 'user'), rdb.hget(session['uuid'], 'pswd')) l.simple_bind_s(rdb.hget(session['uuid'], 'user'), rdb.hget(session['uuid'], 'pswd'))
sr = l.search_s(app.config.get('LDAP_BASE'), ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['cn']) sr = l.search_s(app.config.get('LDAP_BASE'), ldap.SCOPE_SUBTREE, '(objectClass=posixAccount)', ['cn'])
return render_template('list.html', users=sr, nav=buildNav()) return render_template('list.html', users=sr, nav=buildNav())
@app.route('/login', methods=['GET', 'POST']) @app.route('/login', methods=['GET', 'POST'])
def login(): def login():
form = LoginForm() form = LoginForm()
if form.validate_on_submit(): if form.validate_on_submit():
if form.user.data.endswith(app.config.get('LDAP_BASE','')): if form.user.data.endswith(app.config.get('LDAP_BASE', '')):
user = form.user.data user = form.user.data
else: else:
user = app.config.get('USER_DN').format(user=form.user.data) user = app.config.get('USER_DN').format(user=form.user.data)
pswd = form.pswd.data pswd = form.pswd.data
l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1')) l = ldap.initialize(app.config.get('LDAP_URI', 'ldaps://127.0.0.1'))
try: try:
l.simple_bind_s(user, pswd) l.simple_bind_s(user, pswd)
except ldap.INVALID_CREDENTIALS as e: except ldap.INVALID_CREDENTIALS as e:
form.pswd.errors.append('Invalid Credentials') form.pswd.errors.append('Invalid Credentials')
l.unbind_s() l.unbind_s()
return render_template('login.html', form=form, nav=buildNav()) return render_template('login.html', form=form, nav=buildNav())
l.unbind_s() l.unbind_s()
session['uuid'] = str(uuid.uuid4()) session['uuid'] = str(uuid.uuid4())
credentials = { 'user': user, 'pswd': pswd } credentials = {'user': user, 'pswd': pswd}
rdb.hmset(session['uuid'], credentials) rdb.hmset(session['uuid'], credentials)
# TODO refactor this and reuse # TODO refactor this and reuse
rdb.expire(session['uuid'], app.config.get('SESSION_TIMEOUT', 3600)) rdb.expire(session['uuid'], app.config.get('SESSION_TIMEOUT', 3600))
return redirect(url_for('index')) return redirect(url_for('index'))
return render_template('login.html', form=form, nav=buildNav()) return render_template('login.html', form=form, nav=buildNav())
@app.route('/logout') @app.route('/logout')
def logout(): def logout():
if 'uuid' in session: if 'uuid' in session:
rdb.delete(session['uuid']) rdb.delete(session['uuid'])
del session['uuid'] del session['uuid']
return redirect(url_for('index')) return redirect(url_for('index'))
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000) app.run(host='0.0.0.0', port=5000)