mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-23 01:16:18 +00:00
Manager class to centralize user management
This commit is contained in:
parent
848cfb2814
commit
c8f0a22980
@ -5,8 +5,7 @@ import simplejson
|
|||||||
import cgi
|
import cgi
|
||||||
|
|
||||||
from web import app
|
from web import app
|
||||||
from db import User
|
from user_manager import UserManager
|
||||||
import hashlib
|
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def set_formatter():
|
def set_formatter():
|
||||||
@ -43,15 +42,10 @@ def authorize():
|
|||||||
if not username or not decoded_pass:
|
if not username or not decoded_pass:
|
||||||
return error
|
return error
|
||||||
|
|
||||||
user = User.query.filter(User.name == username).first()
|
|
||||||
if not user:
|
|
||||||
return error
|
|
||||||
|
|
||||||
if decoded_pass.startswith('enc:'):
|
if decoded_pass.startswith('enc:'):
|
||||||
decoded_pass = hexdecode(decoded_pass[4:])
|
decoded_pass = hexdecode(decoded_pass[4:])
|
||||||
|
|
||||||
crypt = hashlib.sha1(user.salt + decoded_pass).hexdigest()
|
if UserManager.try_auth(username, decoded_pass)[0] != UserManager.LOGIN_SUCCESS:
|
||||||
if crypt != user.password:
|
|
||||||
return error
|
return error
|
||||||
|
|
||||||
@app.after_request
|
@app.after_request
|
||||||
|
@ -14,7 +14,7 @@
|
|||||||
<div class="page">
|
<div class="page">
|
||||||
<h1>Supysonic</h1>
|
<h1>Supysonic</h1>
|
||||||
|
|
||||||
<p>{% if session.get('userid') %}<a href="{{ url_for('logout') }}">Log out</a>{% else %}<a href="{{ url_for('login') }}">Log in</a>{% endif %}</p>
|
<p><a href="{{ url_for('index') }}">Home</a> | {% if session.get('userid') %}<a href="{{ url_for('logout') }}">Log out</a>{% else %}<a href="{{ url_for('login') }}">Log in</a>{% endif %}</p>
|
||||||
|
|
||||||
{% if get_flashed_messages() %}
|
{% if get_flashed_messages() %}
|
||||||
<div class="flash">
|
<div class="flash">
|
||||||
|
@ -4,7 +4,7 @@
|
|||||||
<table>
|
<table>
|
||||||
<tr><th>Name</th><th>EMail</th><th>Admin</th><th></th></tr>
|
<tr><th>Name</th><th>EMail</th><th>Admin</th><th></th></tr>
|
||||||
{% for user in users %}
|
{% for user in users %}
|
||||||
<tr><td>{{ user.name }}</td><td>{{ user.mail }}</td><td>{{ user.admin }}</td><td><a href="{{ url_for('del_user', id = user.id) }}">X</a></td></tr>
|
<tr><td>{{ user.name }}</td><td>{{ user.mail }}</td><td>{{ user.admin }}</td><td><a href="{{ url_for('del_user', uid = user.id) }}">X</a></td></tr>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
</table>
|
</table>
|
||||||
<a href="{{ url_for('add_user') }}">Add</a>
|
<a href="{{ url_for('add_user') }}">Add</a>
|
||||||
|
70
user.py
70
user.py
@ -1,10 +1,9 @@
|
|||||||
# coding: utf-8
|
# coding: utf-8
|
||||||
|
|
||||||
from flask import Flask, request, session, flash, render_template, redirect, url_for
|
from flask import Flask, request, session, flash, render_template, redirect, url_for
|
||||||
import string, random, hashlib
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from web import app
|
from web import app
|
||||||
|
from user_manager import UserManager
|
||||||
import db
|
import db
|
||||||
|
|
||||||
@app.route('/user')
|
@app.route('/user')
|
||||||
@ -21,47 +20,40 @@ def add_user():
|
|||||||
if name in (None, ''):
|
if name in (None, ''):
|
||||||
flash('The name is required.')
|
flash('The name is required.')
|
||||||
error = True
|
error = True
|
||||||
elif db.User.query.filter(db.User.name == name).first():
|
|
||||||
flash('There is already a user with that name. Please pick another one.')
|
|
||||||
error = True
|
|
||||||
if passwd in (None, ''):
|
if passwd in (None, ''):
|
||||||
flash('Please provide a password.')
|
flash('Please provide a password.')
|
||||||
error = True
|
error = True
|
||||||
elif passwd != passwd_confirm:
|
elif passwd != passwd_confirm:
|
||||||
flash("The passwords don't match.")
|
flash("The passwords don't match.")
|
||||||
error = True
|
error = True
|
||||||
|
|
||||||
if admin is None:
|
if admin is None:
|
||||||
admin = True if db.User.query.filter(db.User.admin == True).count() == 0 else False
|
admin = True if db.User.query.filter(db.User.admin == True).count() == 0 else False
|
||||||
else:
|
else:
|
||||||
admin = True
|
admin = True
|
||||||
if error:
|
|
||||||
return render_template('adduser.html')
|
|
||||||
|
|
||||||
salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6))
|
if not error:
|
||||||
crypt = hashlib.sha1(salt + passwd).hexdigest()
|
status = UserManager.add(name, passwd, mail, admin)
|
||||||
user = db.User(name = name, mail = mail, password = crypt, salt = salt, admin = admin)
|
if status == UserManager.ADD_SUCCESS:
|
||||||
db.session.add(user)
|
flash("User '%s' successfully added" % name)
|
||||||
db.session.commit()
|
return redirect(url_for('user_index'))
|
||||||
flash("User '%s' successfully added" % name)
|
elif status == UserManager.ADD_NAME_EXISTS:
|
||||||
|
flash('There is already a user with that name. Please pick another one.')
|
||||||
|
|
||||||
return redirect(url_for('user_index'))
|
return render_template('adduser.html')
|
||||||
|
|
||||||
@app.route('/user/del/<id>')
|
|
||||||
def del_user(id):
|
@app.route('/user/del/<uid>')
|
||||||
try:
|
def del_user(uid):
|
||||||
idid = uuid.UUID(id)
|
status = UserManager.delete(uid)
|
||||||
except ValueError:
|
if status == UserManager.DEL_SUCCESS:
|
||||||
|
flash('Deleted user')
|
||||||
|
elif status == UserManager.DEL_INVALID_ID:
|
||||||
flash('Invalid user id')
|
flash('Invalid user id')
|
||||||
return redirect(url_for('index'))
|
elif status == UserManager.DEL_NO_SUCH_USER:
|
||||||
|
|
||||||
user = db.User.query.get(idid)
|
|
||||||
if user is None:
|
|
||||||
flash('No such user')
|
flash('No such user')
|
||||||
return redirect(url_for('index'))
|
else:
|
||||||
|
flash('Unknown error')
|
||||||
db.session.delete(user)
|
|
||||||
db.session.commit()
|
|
||||||
flash("Deleted user '%s'" % user.name)
|
|
||||||
|
|
||||||
return redirect(url_for('user_index'))
|
return redirect(url_for('user_index'))
|
||||||
|
|
||||||
@ -75,25 +67,27 @@ def login():
|
|||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
return render_template('login.html')
|
return render_template('login.html')
|
||||||
|
|
||||||
user, password = map(request.form.get, [ 'user', 'password' ])
|
name, password = map(request.form.get, [ 'user', 'password' ])
|
||||||
error = False
|
error = False
|
||||||
if user in ('', None):
|
if name in ('', None):
|
||||||
flash('Missing user name')
|
flash('Missing user name')
|
||||||
error = True
|
error = True
|
||||||
if password in ('', None):
|
if password in ('', None):
|
||||||
flash('Missing password')
|
flash('Missing password')
|
||||||
error = True
|
error = True
|
||||||
|
|
||||||
if not error:
|
if not error:
|
||||||
dbuser = db.User.query.filter(db.User.name == user).first()
|
status, user = UserManager.try_auth(name, password)
|
||||||
if not dbuser:
|
if status == UserManager.LOGIN_SUCCESS:
|
||||||
flash('Unknown user')
|
session['userid'] = str(user.id)
|
||||||
elif hashlib.sha1(dbuser.salt + password).hexdigest() != dbuser.password:
|
|
||||||
flash('Wrong password')
|
|
||||||
else:
|
|
||||||
session['userid'] = str(dbuser.id)
|
|
||||||
session['admin'] = dbuser.admin
|
|
||||||
flash('Logged in!')
|
flash('Logged in!')
|
||||||
return redirect(return_url)
|
return redirect(return_url)
|
||||||
|
elif status == UserManager.LOGIN_NO_SUCH_USER:
|
||||||
|
flash('Unknown user')
|
||||||
|
elif status == UserManager.LOGIN_WRONG_PASS:
|
||||||
|
flash('Wrong password')
|
||||||
|
else:
|
||||||
|
flash('Unknown error')
|
||||||
|
|
||||||
return render_template('login.html')
|
return render_template('login.html')
|
||||||
|
|
||||||
|
68
user_manager.py
Executable file
68
user_manager.py
Executable file
@ -0,0 +1,68 @@
|
|||||||
|
# coding: utf-8
|
||||||
|
|
||||||
|
import string, random, hashlib
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
from db import User, session
|
||||||
|
|
||||||
|
class UserManager:
|
||||||
|
ADD_SUCCESS = 0
|
||||||
|
ADD_NAME_EXISTS = 1
|
||||||
|
|
||||||
|
DEL_SUCCESS = 0
|
||||||
|
DEL_INVALID_ID = 1
|
||||||
|
DEL_NO_SUCH_USER = 2
|
||||||
|
|
||||||
|
LOGIN_SUCCESS = 0
|
||||||
|
LOGIN_NO_SUCH_USER = 1
|
||||||
|
LOGIN_WRONG_PASS = 2
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def add(name, password, mail, admin):
|
||||||
|
if User.query.filter(User.name == name).first():
|
||||||
|
return UserManager.ADD_NAME_EXISTS
|
||||||
|
|
||||||
|
crypt, salt = UserManager.__encrypt_password(password)
|
||||||
|
user = User(name = name, mail = mail, password = crypt, salt = salt, admin = admin)
|
||||||
|
session.add(user)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
return UserManager.ADD_SUCCESS
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def delete(uid):
|
||||||
|
if type(uid) in (str, unicode):
|
||||||
|
try:
|
||||||
|
uid = uuid.UUID(uid)
|
||||||
|
except:
|
||||||
|
return UserManager.DEL_INVALID_ID
|
||||||
|
elif type(uid) is uuid.UUID:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return UserManager.DEL_INVALID_ID
|
||||||
|
|
||||||
|
user = User.query.get(uid)
|
||||||
|
if user is None:
|
||||||
|
return UserManager.DEL_NO_SUCH_USER
|
||||||
|
|
||||||
|
session.delete(user)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
return UserManager.DEL_SUCCESS
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def try_auth(name, password):
|
||||||
|
user = User.query.filter(User.name == name).first()
|
||||||
|
if not user:
|
||||||
|
return UserManager.LOGIN_NO_SUCH_USER, None
|
||||||
|
elif UserManager.__encrypt_password(password, user.salt)[0] != user.password:
|
||||||
|
return UserManager.LOGIN_WRONG_PASS, None
|
||||||
|
else:
|
||||||
|
return UserManager.LOGIN_SUCCESS, user
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def __encrypt_password(password, salt = None):
|
||||||
|
if salt is None:
|
||||||
|
salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6))
|
||||||
|
return hashlib.sha1(salt + password).hexdigest(), salt
|
||||||
|
|
Loading…
Reference in New Issue
Block a user