1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 17:06:17 +00:00

Refactored UserManager to raise exceptions

rather than returning status codes
This commit is contained in:
spl0k 2018-03-02 22:51:49 +01:00
parent 8bf488fab2
commit ef9e7af026
13 changed files with 152 additions and 199 deletions

View File

@ -58,8 +58,8 @@ def decode_password(password):
@api.before_request @api.before_request
def authorize(): def authorize():
if request.authorization: if request.authorization:
status, user = UserManager.try_auth(request.authorization.username, request.authorization.password) user = UserManager.try_auth(request.authorization.username, request.authorization.password)
if status == UserManager.SUCCESS: if user is not None:
request.user = user request.user = user
return return
raise Unauthorized() raise Unauthorized()
@ -68,8 +68,8 @@ def authorize():
password = request.values['p'] password = request.values['p']
password = decode_password(password) password = decode_password(password)
status, user = UserManager.try_auth(username, password) user = UserManager.try_auth(username, password)
if status != UserManager.SUCCESS: if user is None:
raise Unauthorized() raise Unauthorized()
request.user = user request.user = user

View File

@ -30,13 +30,10 @@ def not_found(e):
rollback() rollback()
return NotFound(e.entity.__name__) return NotFound(e.entity.__name__)
@api.errorhandler(Exception) @api.errorhandler(500)
def generic_error(e): # pragma: nocover def generic_error(e): # pragma: nocover
rollback() rollback()
if not current_app.testing: return ServerError(e)
return ServerError(e)
else:
raise e
#@api.errorhandler(404) #@api.errorhandler(404)
@api.route('/<path:invalid>', methods = [ 'GET', 'POST' ]) # blueprint 404 workaround @api.route('/<path:invalid>', methods = [ 'GET', 'POST' ]) # blueprint 404 workaround

View File

@ -64,9 +64,7 @@ def user_add():
admin = True if admin in (True, 'True', 'true', 1, '1') else False admin = True if admin in (True, 'True', 'true', 1, '1') else False
password = decode_password(password) password = decode_password(password)
status = UserManager.add(username, password, email, admin) UserManager.add(username, password, email, admin)
if status == UserManager.NAME_EXISTS:
raise GenericError('There is already a user with that username')
return request.formatter.empty return request.formatter.empty
@ -74,14 +72,7 @@ def user_add():
@admin_only @admin_only
def user_del(): def user_del():
username = request.values['username'] username = request.values['username']
UserManager.delete_by_name(username)
user = User.get(name = username)
if user is None:
raise NotFound('User')
status = UserManager.delete(user.id)
if status != UserManager.SUCCESS:
raise GenericError(UserManager.error_str(status))
return request.formatter.empty return request.formatter.empty
@ -94,11 +85,7 @@ def user_changepass():
raise Forbidden() raise Forbidden()
password = decode_password(password) password = decode_password(password)
status = UserManager.change_password2(username, password) UserManager.change_password2(username, password)
if status == UserManager.NO_SUCH_USER:
raise NotFound('User')
elif status != UserManager.SUCCESS:
raise GenericError(UserManager.error_str(status))
return request.formatter.empty return request.formatter.empty

View File

@ -26,6 +26,7 @@ import sys
import time import time
from pony.orm import db_session from pony.orm import db_session
from pony.orm import ObjectNotFound
from .db import Folder, User from .db import Folder, User
from .managers.folder import FolderManager from .managers.folder import FolderManager
@ -122,6 +123,8 @@ class SupysonicCLI(cmd.Cmd):
self.write_line('Unknown command %s' % line.split()[0]) self.write_line('Unknown command %s' % line.split()[0])
self.do_help(None) self.do_help(None)
onecmd = db_session(cmd.Cmd.onecmd)
def postloop(self): def postloop(self):
self.write_line() self.write_line()
@ -148,7 +151,6 @@ class SupysonicCLI(cmd.Cmd):
folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned') folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed") folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
@db_session
def folder_list(self): def folder_list(self):
self.write_line('Name\t\tPath\n----\t\t----') self.write_line('Name\t\tPath\n----\t\t----')
self.write_line('\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in Folder.select(lambda f: f.root))) self.write_line('\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in Folder.select(lambda f: f.root)))
@ -167,7 +169,6 @@ class SupysonicCLI(cmd.Cmd):
else: else:
self.write_line("Deleted folder '{}'".format(name)) self.write_line("Deleted folder '{}'".format(name))
@db_session
def folder_scan(self, folders, force): def folder_scan(self, folders, force):
extensions = self.__config.BASE['scanner_extensions'] extensions = self.__config.BASE['scanner_extensions']
if extensions: if extensions:
@ -217,30 +218,32 @@ class SupysonicCLI(cmd.Cmd):
user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password') user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
user_pass_parser.add_argument('password', nargs = '?', help = 'New password') user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
@db_session
def user_list(self): def user_list(self):
self.write_line('Name\t\tAdmin\tEmail\n----\t\t-----\t-----') self.write_line('Name\t\tAdmin\tEmail\n----\t\t-----\t-----')
self.write_line('\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in User.select())) self.write_line('\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in User.select()))
def _ask_password(self): # pragma: nocover
password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
raise ValueError("Passwords don't match")
return password
def user_add(self, name, admin, password, email): def user_add(self, name, admin, password, email):
if not password: try:
password = getpass.getpass() if not password:
confirm = getpass.getpass('Confirm password: ') password = self._ask_password() # pragma: nocover
if password != confirm: UserManager.add(name, password, email, admin)
self.write_error_line("Passwords don't match") except ValueError as e:
return self.write_error_line(str(e))
status = UserManager.add(name, password, email, admin)
if status != UserManager.SUCCESS:
self.write_error_line(UserManager.error_str(status))
def user_delete(self, name): def user_delete(self, name):
ret = UserManager.delete_by_name(name) try:
if ret != UserManager.SUCCESS: UserManager.delete_by_name(name)
self.write_error_line(UserManager.error_str(ret))
else:
self.write_line("Deleted user '{}'".format(name)) self.write_line("Deleted user '{}'".format(name))
except ObjectNotFound as e:
self.write_error_line(str(e))
@db_session
def user_setadmin(self, name, off): def user_setadmin(self, name, off):
user = User.get(name = name) user = User.get(name = name)
if user is None: if user is None:
@ -250,15 +253,11 @@ class SupysonicCLI(cmd.Cmd):
self.write_line("{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)) self.write_line("{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name))
def user_changepass(self, name, password): def user_changepass(self, name, password):
if not password: try:
password = getpass.getpass() if not password:
confirm = getpass.getpass('Confirm password: ') password = self._ask_password() # pragma: nocover
if password != confirm: UserManager.change_password2(name, password)
self.write_error_line("Passwords don't match")
return
status = UserManager.change_password2(name, password)
if status != UserManager.SUCCESS:
self.write_error_line(UserManager.error_str(status))
else:
self.write_line("Successfully changed '{}' password".format(name)) self.write_line("Successfully changed '{}' password".format(name))
except ObjectNotFound as e:
self.write_error_line(str(e))

View File

@ -12,6 +12,7 @@
from flask import redirect, request, session, url_for from flask import redirect, request, session, url_for
from flask import Blueprint from flask import Blueprint
from functools import wraps from functools import wraps
from pony.orm import ObjectNotFound
from ..db import Artist, Album, Track from ..db import Artist, Album, Track
from ..managers.user import UserManager from ..managers.user import UserManager
@ -23,12 +24,12 @@ def login_check():
request.user = None request.user = None
should_login = True should_login = True
if session.get('userid'): if session.get('userid'):
code, user = UserManager.get(session.get('userid')) try:
if code != UserManager.SUCCESS: user = UserManager.get(session.get('userid'))
session.clear()
else:
request.user = user request.user = user
should_login = False should_login = False
except (ValueError, ObjectNotFound):
session.clear()
if should_login and request.endpoint != 'frontend.login': if should_login and request.endpoint != 'frontend.login':
flash('Please login') flash('Please login')

View File

@ -24,7 +24,6 @@ import uuid
from flask import current_app, flash, redirect, render_template, request, url_for from flask import current_app, flash, redirect, render_template, request, url_for
from ..db import Folder from ..db import Folder
from ..managers.user import UserManager
from ..managers.folder import FolderManager from ..managers.folder import FolderManager
from ..scanner import Scanner from ..scanner import Scanner

View File

@ -24,7 +24,6 @@ from flask import flash, redirect, render_template, request, url_for
from pony.orm import ObjectNotFound from pony.orm import ObjectNotFound
from ..db import Playlist from ..db import Playlist
from ..managers.user import UserManager
from . import frontend from . import frontend

View File

@ -21,6 +21,7 @@
from flask import flash, redirect, render_template, request, session, url_for from flask import flash, redirect, render_template, request, session, url_for
from flask import current_app from flask import current_app
from functools import wraps from functools import wraps
from pony.orm import ObjectNotFound
from ..db import User, ClientPrefs from ..db import User, ClientPrefs
from ..lastfm import LastFm from ..lastfm import LastFm
@ -42,9 +43,13 @@ def me_or_uuid(f, arg = 'uid'):
elif not request.user.admin: elif not request.user.admin:
return redirect(url_for('frontend.index')) return redirect(url_for('frontend.index'))
else: else:
code, user = UserManager.get(uid) try:
if code != UserManager.SUCCESS: user = UserManager.get(uid)
flash(UserManager.error_str(code)) except ValueError as e:
flash(str(e), 'error')
return redirect(url_for('frontend.index'))
except ObjectNotFound:
flash('No such user', 'error')
return redirect(url_for('frontend.index')) return redirect(url_for('frontend.index'))
if kwargs: if kwargs:
@ -104,9 +109,13 @@ def update_clients(uid, user):
@frontend.route('/user/<uid>/changeusername') @frontend.route('/user/<uid>/changeusername')
@admin_only @admin_only
def change_username_form(uid): def change_username_form(uid):
code, user = UserManager.get(uid) try:
if code != UserManager.SUCCESS: user = UserManager.get(uid)
flash(UserManager.error_str(code)) except ValueError as e:
flash(str(e), 'error')
return redirect(url_for('frontend.index'))
except ObjectNotFound:
flash('No such user', 'error')
return redirect(url_for('frontend.index')) return redirect(url_for('frontend.index'))
return render_template('change_username.html', user = user) return render_template('change_username.html', user = user)
@ -114,8 +123,13 @@ def change_username_form(uid):
@frontend.route('/user/<uid>/changeusername', methods = [ 'POST' ]) @frontend.route('/user/<uid>/changeusername', methods = [ 'POST' ])
@admin_only @admin_only
def change_username_post(uid): def change_username_post(uid):
code, user = UserManager.get(uid) try:
if code != UserManager.SUCCESS: user = UserManager.get(uid)
except ValueError as e:
flash(str(e), 'error')
return redirect(url_for('frontend.index'))
except ObjectNotFound:
flash('No such user', 'error')
return redirect(url_for('frontend.index')) return redirect(url_for('frontend.index'))
username = request.form.get('user') username = request.form.get('user')
@ -178,16 +192,16 @@ def change_password_post(uid, user):
error = True error = True
if not error: if not error:
if user.id == request.user.id: try:
status = UserManager.change_password(user.id, current, new) if user.id == request.user.id:
else: UserManager.change_password(user.id, current, new)
status = UserManager.change_password2(user.name, new) else:
UserManager.change_password2(user.name, new)
if status != UserManager.SUCCESS:
flash(UserManager.error_str(status))
else:
flash('Password changed') flash('Password changed')
return redirect(url_for('frontend.user_profile', uid = uid)) return redirect(url_for('frontend.user_profile', uid = uid))
except ValueError as e:
flash(str(e), 'error')
return change_password_form(uid, user) return change_password_form(uid, user)
@ -216,23 +230,25 @@ def add_user_post():
mail = '' mail = ''
if not error: if not error:
status = UserManager.add(name, passwd, mail, admin) try:
if status == UserManager.SUCCESS: UserManager.add(name, passwd, mail, admin)
flash("User '%s' successfully added" % name) flash("User '%s' successfully added" % name)
return redirect(url_for('frontend.user_index')) return redirect(url_for('frontend.user_index'))
else: except ValueError as e:
flash(UserManager.error_str(status)) flash(str(e), 'error')
return add_user_form() return add_user_form()
@frontend.route('/user/del/<uid>') @frontend.route('/user/del/<uid>')
@admin_only @admin_only
def del_user(uid): def del_user(uid):
status = UserManager.delete(uid) try:
if status == UserManager.SUCCESS: UserManager.delete(uid)
flash('Deleted user') flash('Deleted user')
else: except ValueError as e:
flash(UserManager.error_str(status)) flash(str(e), 'error')
except ObjectNotFound:
flash('No such user', 'error')
return redirect(url_for('frontend.user_index')) return redirect(url_for('frontend.user_index'))
@ -240,7 +256,7 @@ def del_user(uid):
@me_or_uuid @me_or_uuid
def lastfm_reg(uid, user): def lastfm_reg(uid, user):
token = request.args.get('token') token = request.args.get('token')
if token in ('', None): if not token:
flash('Missing LastFM auth token') flash('Missing LastFM auth token')
return redirect(url_for('frontend.user_profile', uid = uid)) return redirect(url_for('frontend.user_profile', uid = uid))
@ -270,21 +286,21 @@ def login():
name, password = map(request.form.get, [ 'user', 'password' ]) name, password = map(request.form.get, [ 'user', 'password' ])
error = False error = False
if name in ('', None): if not name:
flash('Missing user name') flash('Missing user name')
error = True error = True
if password in ('', None): if not password:
flash('Missing password') flash('Missing password')
error = True error = True
if not error: if not error:
status, user = UserManager.try_auth(name, password) user = UserManager.try_auth(name, password)
if status == UserManager.SUCCESS: if user:
session['userid'] = str(user.id) session['userid'] = str(user.id)
flash('Logged in!') flash('Logged in!')
return redirect(return_url) return redirect(return_url)
else: else:
flash(UserManager.error_str(status)) flash('Wrong username or password')
return render_template('login.html') return render_template('login.html')

View File

@ -14,45 +14,27 @@ import random
import string import string
import uuid import uuid
from pony.orm import db_session
from pony.orm import ObjectNotFound from pony.orm import ObjectNotFound
from ..db import User, ChatMessage, Playlist from ..db import User
from ..db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
from ..db import RatingFolder, RatingTrack
from ..py23 import strtype from ..py23 import strtype
class UserManager: class UserManager:
SUCCESS = 0
INVALID_ID = 1
NO_SUCH_USER = 2
NAME_EXISTS = 3
WRONG_PASS = 4
@staticmethod @staticmethod
@db_session
def get(uid): def get(uid):
if isinstance(uid, strtype): if isinstance(uid, uuid.UUID):
try:
uid = uuid.UUID(uid)
except ValueError:
return UserManager.INVALID_ID, None
elif isinstance(uid, uuid.UUID):
pass pass
elif isinstance(uid, strtype):
uid = uuid.UUID(uid)
else: else:
return UserManager.INVALID_ID, None raise ValueError('Invalid user id')
try: return User[uid]
user = User[uid]
return UserManager.SUCCESS, user
except ObjectNotFound:
return UserManager.NO_SUCH_USER, None
@staticmethod @staticmethod
@db_session
def add(name, password, mail, admin): def add(name, password, mail, admin):
if User.get(name = name) is not None: if User.exists(name = name):
return UserManager.NAME_EXISTS raise ValueError("User '{}' exists".format(name))
crypt, salt = UserManager.__encrypt_password(password) crypt, salt = UserManager.__encrypt_password(password)
@ -64,74 +46,45 @@ class UserManager:
admin = admin admin = admin
) )
return UserManager.SUCCESS return user
@staticmethod @staticmethod
@db_session
def delete(uid): def delete(uid):
status, user = UserManager.get(uid) user = UserManager.get(uid)
if status != UserManager.SUCCESS:
return status
user.delete() user.delete()
return UserManager.SUCCESS
@staticmethod @staticmethod
@db_session
def delete_by_name(name): def delete_by_name(name):
user = User.get(name = name) user = User.get(name = name)
if user is None: if user is None:
return UserManager.NO_SUCH_USER raise ObjectNotFound(User)
return UserManager.delete(user.id) user.delete()
@staticmethod @staticmethod
@db_session
def try_auth(name, password): def try_auth(name, password):
user = User.get(name = name) user = User.get(name = name)
if user is None: if user is None:
return UserManager.NO_SUCH_USER, None return None
elif UserManager.__encrypt_password(password, user.salt)[0] != user.password: elif UserManager.__encrypt_password(password, user.salt)[0] != user.password:
return UserManager.WRONG_PASS, None return None
else: else:
return UserManager.SUCCESS, user return user
@staticmethod @staticmethod
@db_session
def change_password(uid, old_pass, new_pass): def change_password(uid, old_pass, new_pass):
status, user = UserManager.get(uid) user = UserManager.get(uid)
if status != UserManager.SUCCESS:
return status
if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password: if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password:
return UserManager.WRONG_PASS raise ValueError('Wrong password')
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
return UserManager.SUCCESS
@staticmethod @staticmethod
@db_session
def change_password2(name, new_pass): def change_password2(name, new_pass):
user = User.get(name = name) user = User.get(name = name)
if user is None: if user is None:
return UserManager.NO_SUCH_USER raise ObjectNotFound(User)
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
return UserManager.SUCCESS
@staticmethod
def error_str(err):
if err == UserManager.SUCCESS:
return 'No error'
elif err == UserManager.INVALID_ID:
return 'Invalid user id'
elif err == UserManager.NO_SUCH_USER:
return 'No such user'
elif err == UserManager.NAME_EXISTS:
return 'There is already a user with that name'
elif err == UserManager.WRONG_PASS:
return 'Wrong password'
else:
return 'Unkown error'
@staticmethod @staticmethod
def __encrypt_password(password, salt = None): def __encrypt_password(password, salt = None):

View File

@ -31,9 +31,9 @@ class LoginTestCase(FrontendTestBase):
self.assertIn('Missing password', rv.data) self.assertIn('Missing password', rv.data)
# Login with not valid user or password # Login with not valid user or password
rv = self._login('nonexistent', 'nonexistent') rv = self._login('nonexistent', 'nonexistent')
self.assertIn('No such user', rv.data) self.assertIn('Wrong username or password', rv.data)
rv = self._login('alice', 'badpassword') rv = self._login('alice', 'badpassword')
self.assertIn('Wrong password', rv.data) self.assertIn('Wrong username or password', rv.data)
def test_login_admin(self): def test_login_admin(self):
# Login with a valid admin user # Login with a valid admin user

View File

@ -11,6 +11,7 @@
import uuid import uuid
from flask import escape
from pony.orm import db_session from pony.orm import db_session
from supysonic.db import User, ClientPrefs from supysonic.db import User, ClientPrefs
@ -38,7 +39,7 @@ class UserTestCase(FrontendTestBase):
def test_details(self): def test_details(self):
self._login('alice', 'Alic3') self._login('alice', 'Alic3')
rv = self.client.get('/user/string', follow_redirects = True) rv = self.client.get('/user/string', follow_redirects = True)
self.assertIn('Invalid', rv.data) self.assertIn('badly formed', rv.data)
rv = self.client.get('/user/' + str(uuid.uuid4()), follow_redirects = True) rv = self.client.get('/user/' + str(uuid.uuid4()), follow_redirects = True)
self.assertIn('No such user', rv.data) self.assertIn('No such user', rv.data)
rv = self.client.get('/user/' + str(self.users['bob'])) rv = self.client.get('/user/' + str(self.users['bob']))
@ -89,14 +90,17 @@ class UserTestCase(FrontendTestBase):
self._login('alice', 'Alic3') self._login('alice', 'Alic3')
rv = self.client.get('/user/whatever/changeusername', follow_redirects = True) rv = self.client.get('/user/whatever/changeusername', follow_redirects = True)
self.assertIn('Invalid', rv.data) self.assertIn('badly formed', rv.data)
rv = self.client.get('/user/{}/changeusername'.format(uuid.uuid4()), follow_redirects = True) rv = self.client.get('/user/{}/changeusername'.format(uuid.uuid4()), follow_redirects = True)
self.assertIn('No such user', rv.data) self.assertIn('No such user', rv.data)
self.client.get('/user/{}/changeusername'.format(self.users['bob'])) self.client.get('/user/{}/changeusername'.format(self.users['bob']))
def test_change_username_post(self): def test_change_username_post(self):
self._login('alice', 'Alic3') self._login('alice', 'Alic3')
self.client.post('/user/whatever/changeusername') rv = self.client.post('/user/whatever/changeusername', follow_redirects = True)
self.assertIn('badly formed', rv.data)
rv = self.client.post('/user/{}/changeusername'.format(uuid.uuid4()), follow_redirects = True)
self.assertIn('No such user', rv.data)
path = '/user/{}/changeusername'.format(self.users['bob']) path = '/user/{}/changeusername'.format(self.users['bob'])
rv = self.client.post(path, follow_redirects = True) rv = self.client.post(path, follow_redirects = True)
@ -186,7 +190,7 @@ class UserTestCase(FrontendTestBase):
rv = self.client.post('/user/add', data = { 'user': 'name', 'passwd': 'passwd' }) rv = self.client.post('/user/add', data = { 'user': 'name', 'passwd': 'passwd' })
self.assertIn('passwords don', rv.data) self.assertIn('passwords don', rv.data)
rv = self.client.post('/user/add', data = { 'user': 'alice', 'passwd': 'passwd', 'passwd_confirm': 'passwd' }) rv = self.client.post('/user/add', data = { 'user': 'alice', 'passwd': 'passwd', 'passwd_confirm': 'passwd' })
self.assertIn('already a user with that name', rv.data) self.assertIn(escape("User 'alice' exists"), rv.data)
with db_session: with db_session:
self.assertEqual(User.select().count(), 2) self.assertEqual(User.select().count(), 2)
@ -210,7 +214,7 @@ class UserTestCase(FrontendTestBase):
self._login('alice', 'Alic3') self._login('alice', 'Alic3')
rv = self.client.get('/user/del/string', follow_redirects = True) rv = self.client.get('/user/del/string', follow_redirects = True)
self.assertIn('Invalid', rv.data) self.assertIn('badly formed', rv.data)
rv = self.client.get('/user/del/' + str(uuid.uuid4()), follow_redirects = True) rv = self.client.get('/user/del/' + str(uuid.uuid4()), follow_redirects = True)
self.assertIn('No such user', rv.data) self.assertIn('No such user', rv.data)
rv = self.client.get(path, follow_redirects = True) rv = self.client.get(path, follow_redirects = True)
@ -219,7 +223,7 @@ class UserTestCase(FrontendTestBase):
self.assertEqual(User.select().count(), 1) self.assertEqual(User.select().count(), 1)
self._logout() self._logout()
rv = self._login('bob', 'B0b') rv = self._login('bob', 'B0b')
self.assertIn('No such user', rv.data) self.assertIn('Wrong username or password', rv.data)
def test_lastfm_link(self): def test_lastfm_link(self):
self._login('alice', 'Alic3') self._login('alice', 'Alic3')

View File

@ -32,9 +32,9 @@ class UserManagerTestCase(unittest.TestCase):
@db_session @db_session
def create_data(self): def create_data(self):
# Create some users # Create some users
self.assertEqual(UserManager.add('alice', 'ALICE', 'test@example.com', True), UserManager.SUCCESS) self.assertIsInstance(UserManager.add('alice', 'ALICE', 'test@example.com', True), db.User)
self.assertEqual(UserManager.add('bob', 'BOB', 'bob@example.com', False), UserManager.SUCCESS) self.assertIsInstance(UserManager.add('bob', 'BOB', 'bob@example.com', False), db.User)
self.assertEqual(UserManager.add('charlie', 'CHARLIE', 'charlie@example.com', False), UserManager.SUCCESS) self.assertIsInstance(UserManager.add('charlie', 'CHARLIE', 'charlie@example.com', False), db.User)
folder = db.Folder(name = 'Root', path = 'tests/assets', root = True) folder = db.Folder(name = 'Root', path = 'tests/assets', root = True)
artist = db.Artist(name = 'Artist') artist = db.Artist(name = 'Artist')
@ -62,9 +62,9 @@ class UserManagerTestCase(unittest.TestCase):
def test_encrypt_password(self): def test_encrypt_password(self):
func = UserManager._UserManager__encrypt_password func = UserManager._UserManager__encrypt_password
self.assertEqual(func(u'password',u'salt'), (u'59b3e8d637cf97edbe2384cf59cb7453dfe30789', u'salt')) self.assertEqual(func('password','salt'), ('59b3e8d637cf97edbe2384cf59cb7453dfe30789', 'salt'))
self.assertEqual(func(u'pass-word',u'pepper'), (u'd68c95a91ed7773aa57c7c044d2309a5bf1da2e7', u'pepper')) self.assertEqual(func('pass-word','pepper'), ('d68c95a91ed7773aa57c7c044d2309a5bf1da2e7', 'pepper'))
self.assertEqual(func(u'éèàïô', u'ABC+'), (u'b639ba5217b89c906019d89d5816b407d8730898', u'ABC+')) self.assertEqual(func(u'éèàïô', 'ABC+'), ('b639ba5217b89c906019d89d5816b407d8730898', 'ABC+'))
@db_session @db_session
def test_get_user(self): def test_get_user(self):
@ -73,14 +73,14 @@ class UserManagerTestCase(unittest.TestCase):
# Get existing users # Get existing users
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
user = db.User.get(name = name) user = db.User.get(name = name)
self.assertEqual(UserManager.get(user.id), (UserManager.SUCCESS, user)) self.assertEqual(UserManager.get(user.id), user)
# Get with invalid UUID # Get with invalid UUID
self.assertEqual(UserManager.get('invalid-uuid'), (UserManager.INVALID_ID, None)) self.assertRaises(ValueError, UserManager.get, 'invalid-uuid')
self.assertEqual(UserManager.get(0xfee1bad), (UserManager.INVALID_ID, None)) self.assertRaises(ValueError, UserManager.get, 0xfee1bad)
# Non-existent user # Non-existent user
self.assertEqual(UserManager.get(uuid.uuid4()), (UserManager.NO_SUCH_USER, None)) self.assertRaises(ObjectNotFound, UserManager.get, uuid.uuid4())
@db_session @db_session
def test_add_user(self): def test_add_user(self):
@ -88,25 +88,25 @@ class UserManagerTestCase(unittest.TestCase):
self.assertEqual(db.User.select().count(), 3) self.assertEqual(db.User.select().count(), 3)
# Create duplicate # Create duplicate
self.assertEqual(UserManager.add('alice', 'Alic3', 'alice@example.com', True), UserManager.NAME_EXISTS) self.assertRaises(ValueError, UserManager.add, 'alice', 'Alic3', 'alice@example.com', True)
@db_session @db_session
def test_delete_user(self): def test_delete_user(self):
self.create_data() self.create_data()
# Delete invalid UUID # Delete invalid UUID
self.assertEqual(UserManager.delete('invalid-uuid'), UserManager.INVALID_ID) self.assertRaises(ValueError, UserManager.delete, 'invalid-uuid')
self.assertEqual(UserManager.delete(0xfee1b4d), UserManager.INVALID_ID) self.assertRaises(ValueError, UserManager.delete, 0xfee1b4d)
self.assertEqual(db.User.select().count(), 3) self.assertEqual(db.User.select().count(), 3)
# Delete non-existent user # Delete non-existent user
self.assertEqual(UserManager.delete(uuid.uuid4()), UserManager.NO_SUCH_USER) self.assertRaises(ObjectNotFound, UserManager.delete, uuid.uuid4())
self.assertEqual(db.User.select().count(), 3) self.assertEqual(db.User.select().count(), 3)
# Delete existing users # Delete existing users
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
user = db.User.get(name = name) user = db.User.get(name = name)
self.assertEqual(UserManager.delete(user.id), UserManager.SUCCESS) UserManager.delete(user.id)
self.assertRaises(ObjectNotFound, db.User.__getitem__, user.id) self.assertRaises(ObjectNotFound, db.User.__getitem__, user.id)
commit() commit()
self.assertEqual(db.User.select().count(), 0) self.assertEqual(db.User.select().count(), 0)
@ -117,11 +117,11 @@ class UserManagerTestCase(unittest.TestCase):
# Delete existing users # Delete existing users
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
self.assertEqual(UserManager.delete_by_name(name), UserManager.SUCCESS) UserManager.delete_by_name(name)
self.assertFalse(db.User.exists(name = name)) self.assertFalse(db.User.exists(name = name))
# Delete non-existent user # Delete non-existent user
self.assertEqual(UserManager.delete_by_name('null'), UserManager.NO_SUCH_USER) self.assertRaises(ObjectNotFound, UserManager.delete_by_name, 'null')
@db_session @db_session
def test_try_auth(self): def test_try_auth(self):
@ -130,14 +130,15 @@ class UserManagerTestCase(unittest.TestCase):
# Test authentication # Test authentication
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
user = db.User.get(name = name) user = db.User.get(name = name)
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.SUCCESS, user)) authed = UserManager.try_auth(name, name.upper())
self.assertEqual(authed, user)
# Wrong password # Wrong password
self.assertEqual(UserManager.try_auth('alice', 'bad'), (UserManager.WRONG_PASS, None)) self.assertIsNone(UserManager.try_auth('alice', 'bad'))
self.assertEqual(UserManager.try_auth('alice', 'alice'), (UserManager.WRONG_PASS, None)) self.assertIsNone(UserManager.try_auth('alice', 'alice'))
# Non-existent user # Non-existent user
self.assertEqual(UserManager.try_auth('null', 'null'), (UserManager.NO_SUCH_USER, None)) self.assertIsNone(UserManager.try_auth('null', 'null'))
@db_session @db_session
def test_change_password(self): def test_change_password(self):
@ -147,21 +148,21 @@ class UserManagerTestCase(unittest.TestCase):
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
user = db.User.get(name = name) user = db.User.get(name = name)
# Good password # Good password
self.assertEqual(UserManager.change_password(user.id, name.upper(), 'newpass'), UserManager.SUCCESS) UserManager.change_password(user.id, name.upper(), 'newpass')
self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user)) self.assertEqual(UserManager.try_auth(name, 'newpass'), user)
# Old password # Old password
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None)) self.assertEqual(UserManager.try_auth(name, name.upper()), None)
# Wrong password # Wrong password
self.assertEqual(UserManager.change_password(user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS) self.assertRaises(ValueError, UserManager.change_password, user.id, 'badpass', 'newpass')
# Ensure we still got the same number of users # Ensure we still got the same number of users
self.assertEqual(db.User.select().count(), 3) self.assertEqual(db.User.select().count(), 3)
# With invalid UUID # With invalid UUID
self.assertEqual(UserManager.change_password('invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID) self.assertRaises(ValueError, UserManager.change_password, 'invalid-uuid', 'oldpass', 'newpass')
# Non-existent user # Non-existent user
self.assertEqual(UserManager.change_password(uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER) self.assertRaises(ObjectNotFound, UserManager.change_password, uuid.uuid4(), 'oldpass', 'newpass')
@db_session @db_session
def test_change_password2(self): def test_change_password2(self):
@ -169,19 +170,13 @@ class UserManagerTestCase(unittest.TestCase):
# With existing users # With existing users
for name in ['alice', 'bob', 'charlie']: for name in ['alice', 'bob', 'charlie']:
self.assertEqual(UserManager.change_password2(name, 'newpass'), UserManager.SUCCESS) UserManager.change_password2(name, 'newpass')
user = db.User.get(name = name) user = db.User.get(name = name)
self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user)) self.assertEqual(UserManager.try_auth(name, 'newpass'), user)
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None)) self.assertEqual(UserManager.try_auth(name, name.upper()), None)
# Non-existent user # Non-existent user
self.assertEqual(UserManager.change_password2('null', 'newpass'), UserManager.NO_SUCH_USER) self.assertRaises(ObjectNotFound, UserManager.change_password2, 'null', 'newpass')
def test_human_readable_error(self):
values = [ UserManager.SUCCESS, UserManager.INVALID_ID, UserManager.NO_SUCH_USER, UserManager.NAME_EXISTS,
UserManager.WRONG_PASS, 1594826, 'string', uuid.uuid4() ]
for value in values:
self.assertIsInstance(UserManager.error_str(value), strtype)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@ -15,6 +15,8 @@ import shutil
import unittest import unittest
import tempfile import tempfile
from pony.orm import db_session
from supysonic.db import init_database, release_database from supysonic.db import init_database, release_database
from supysonic.config import DefaultConfig from supysonic.config import DefaultConfig
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
@ -95,8 +97,9 @@ class TestBase(unittest.TestCase):
self.__app = create_application(config) self.__app = create_application(config)
self.client = self.__app.test_client() self.client = self.__app.test_client()
UserManager.add('alice', 'Alic3', 'test@example.com', True) with db_session:
UserManager.add('bob', 'B0b', 'bob@example.com', False) UserManager.add('alice', 'Alic3', 'test@example.com', True)
UserManager.add('bob', 'B0b', 'bob@example.com', False)
def _patch_client(self): def _patch_client(self):
self.client.get = patch_method(self.client.get) self.client.get = patch_method(self.client.get)