1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-09-19 19:01:03 +00:00

All managers get a pony

This commit is contained in:
spl0k 2017-12-16 17:42:53 +01:00
parent 577f607f13
commit a4b9a97271
5 changed files with 235 additions and 269 deletions

View File

@ -21,6 +21,9 @@
import os.path
import uuid
from pony.orm import db_session, select
from pony.orm import ObjectNotFound
from ..db import Folder, Artist, Album, Track, StarredFolder, RatingFolder
from ..scanner import Scanner
@ -34,7 +37,8 @@ class FolderManager:
SUBPATH_EXISTS = 6
@staticmethod
def get(store, uid):
@db_session
def get(uid):
if isinstance(uid, basestring):
try:
uid = uuid.UUID(uid)
@ -45,65 +49,56 @@ class FolderManager:
else:
return FolderManager.INVALID_ID, None
folder = store.get(Folder, uid)
if not folder:
try:
folder = Folder[uid]
return FolderManager.SUCCESS, folder
except ObjectNotFound:
return FolderManager.NO_SUCH_FOLDER, None
return FolderManager.SUCCESS, folder
@staticmethod
def add(store, name, path):
if not store.find(Folder, Folder.name == name, Folder.root == True).is_empty():
@db_session
def add(name, path):
if Folder.get(name = name, root = True) is not None:
return FolderManager.NAME_EXISTS
path = unicode(os.path.abspath(path))
if not os.path.isdir(path):
return FolderManager.INVALID_PATH
if not store.find(Folder, Folder.path == path).is_empty():
if Folder.get(path = path) is not None:
return FolderManager.PATH_EXISTS
if any(path.startswith(p) for p in store.find(Folder).values(Folder.path)):
if any(path.startswith(p) for p in select(f.path for f in Folder)):
return FolderManager.PATH_EXISTS
if not store.find(Folder, Folder.path.startswith(path)).is_empty():
if Folder.exists(lambda f: f.path.startswith(path)):
return FolderManager.SUBPATH_EXISTS
folder = Folder()
folder.root = True
folder.name = name
folder.path = path
store.add(folder)
store.commit()
folder = Folder(root = True, name = name, path = path)
return FolderManager.SUCCESS
@staticmethod
def delete(store, uid):
status, folder = FolderManager.get(store, uid)
@db_session
def delete(uid):
status, folder = FolderManager.get(uid)
if status != FolderManager.SUCCESS:
return status
if not folder.root:
return FolderManager.NO_SUCH_FOLDER
scanner = Scanner(store)
for track in store.find(Track, Track.root_folder_id == folder.id):
scanner = Scanner()
for track in Track.select(lambda t: t.root_folder == folder):
scanner.remove_file(track.path)
scanner.finish()
store.find(StarredFolder, StarredFolder.starred_id == uid).remove()
store.find(RatingFolder, RatingFolder.rated_id == uid).remove()
store.remove(folder)
store.commit()
folder.delete()
return FolderManager.SUCCESS
@staticmethod
def delete_by_name(store, name):
folder = store.find(Folder, Folder.name == name, Folder.root == True).one()
@db_session
def delete_by_name(name):
folder = Folder.get(name = name, root = True)
if not folder:
return FolderManager.NO_SUCH_FOLDER
return FolderManager.delete(store, folder.id)
return FolderManager.delete(folder.id)
@staticmethod
def error_str(err):

View File

@ -14,6 +14,9 @@ import random
import string
import uuid
from pony.orm import db_session
from pony.orm import ObjectNotFound
from ..db import User, ChatMessage, Playlist
from ..db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
from ..db import RatingFolder, RatingTrack
@ -26,7 +29,8 @@ class UserManager:
WRONG_PASS = 4
@staticmethod
def get(store, uid):
@db_session
def get(uid):
if type(uid) in (str, unicode):
try:
uid = uuid.UUID(uid)
@ -37,63 +41,53 @@ class UserManager:
else:
return UserManager.INVALID_ID, None
user = store.get(User, uid)
if user is None:
try:
user = User[uid]
return UserManager.SUCCESS, user
except ObjectNotFound:
return UserManager.NO_SUCH_USER, None
return UserManager.SUCCESS, user
@staticmethod
def add(store, name, password, mail, admin):
if store.find(User, User.name == name).one():
@db_session
def add(name, password, mail, admin):
if User.get(name = name) is not None:
return UserManager.NAME_EXISTS
crypt, salt = UserManager.__encrypt_password(password)
user = User()
user.name = name
user.mail = mail
user.password = crypt
user.salt = salt
user.admin = admin
store.add(user)
store.commit()
user = User(
name = name,
mail = mail,
password = crypt,
salt = salt,
admin = admin
)
return UserManager.SUCCESS
@staticmethod
def delete(store, uid):
status, user = UserManager.get(store, uid)
@db_session
def delete(uid):
status, user = UserManager.get(uid)
if status != UserManager.SUCCESS:
return status
store.find(StarredFolder, StarredFolder.user_id == user.id).remove()
store.find(StarredArtist, StarredArtist.user_id == user.id).remove()
store.find(StarredAlbum, StarredAlbum.user_id == user.id).remove()
store.find(StarredTrack, StarredTrack.user_id == user.id).remove()
store.find(RatingFolder, RatingFolder.user_id == user.id).remove()
store.find(RatingTrack, RatingTrack.user_id == user.id).remove()
store.find(ChatMessage, ChatMessage.user_id == user.id).remove()
for playlist in store.find(Playlist, Playlist.user_id == user.id):
store.remove(playlist)
store.remove(user)
store.commit()
user.delete()
return UserManager.SUCCESS
@staticmethod
def delete_by_name(store, name):
user = store.find(User, User.name == name).one()
if not user:
@db_session
def delete_by_name(name):
user = User.get(name = name)
if user is None:
return UserManager.NO_SUCH_USER
return UserManager.delete(store, user.id)
return UserManager.delete(user.id)
@staticmethod
def try_auth(store, name, password):
user = store.find(User, User.name == name).one()
if not user:
@db_session
def try_auth(name, password):
user = User.get(name = name)
if user is None:
return UserManager.NO_SUCH_USER, None
elif UserManager.__encrypt_password(password, user.salt)[0] != user.password:
return UserManager.WRONG_PASS, None
@ -101,8 +95,9 @@ class UserManager:
return UserManager.SUCCESS, user
@staticmethod
def change_password(store, uid, old_pass, new_pass):
status, user = UserManager.get(store, uid)
@db_session
def change_password(uid, old_pass, new_pass):
status, user = UserManager.get(uid)
if status != UserManager.SUCCESS:
return status
@ -110,17 +105,16 @@ class UserManager:
return UserManager.WRONG_PASS
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
store.commit()
return UserManager.SUCCESS
@staticmethod
def change_password2(store, name, new_pass):
user = store.find(User, User.name == name).one()
if not user:
@db_session
def change_password2(name, new_pass):
user = User.get(name = name)
if user is None:
return UserManager.NO_SUCH_USER
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
store.commit()
return UserManager.SUCCESS
@staticmethod

View File

@ -23,40 +23,15 @@ import mimetypes
import mutagen
import time
from storm.expr import ComparableExpr, compile, Like
from storm.exceptions import NotSupportedError
from .db import Folder, Artist, Album, Track, User
from .db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
from .db import RatingFolder, RatingTrack
# Hacking in support for a concatenation expression
class Concat(ComparableExpr):
__slots__ = ("left", "right", "db")
def __init__(self, left, right, db):
self.left = left
self.right = right
self.db = db
@compile.when(Concat)
def compile_concat(compile, concat, state):
left = compile(concat.left, state)
right = compile(concat.right, state)
if concat.db in ('sqlite', 'postgres'):
statement = "%s||%s"
elif concat.db == 'mysql':
statement = "CONCAT(%s, %s)"
else:
raise NotSupportedError("Unspported database (%s)" % concat.db)
return statement % (left, right)
class Scanner:
def __init__(self, store, force = False, extensions = None):
def __init__(self, force = False, extensions = None):
if extensions is not None and not isinstance(extensions, list):
raise TypeError('Invalid extensions type')
self.__store = store
self.__force = force
self.__added_artists = 0
@ -106,18 +81,14 @@ class Scanner:
def finish(self):
for album in [ a for a in self.__albums_to_check if not a.tracks.count() ]:
self.__store.find(StarredAlbum, StarredAlbum.starred_id == album.id).remove()
self.__artists_to_check.add(album.artist)
self.__store.remove(album)
self.__deleted_albums += 1
album.delete()
self.__albums_to_check.clear()
for artist in [ a for a in self.__artists_to_check if not a.albums.count() and not a.tracks.count() ]:
self.__store.find(StarredArtist, StarredArtist.starred_id == artist.id).remove()
self.__store.remove(artist)
self.__deleted_artists += 1
artist.delete()
self.__artists_to_check.clear()
while self.__folders_to_check:
@ -126,11 +97,8 @@ class Scanner:
continue
if not folder.tracks.count() and not folder.children.count():
self.__store.find(StarredFolder, StarredFolder.starred_id == folder.id).remove()
self.__store.find(RatingFolder, RatingFolder.rated_id == folder.id).remove()
self.__folders_to_check.add(folder.parent)
self.__store.remove(folder)
folder.delete()
def __is_valid_path(self, path):
if not os.path.exists(path):
@ -206,20 +174,15 @@ class Scanner:
if not isinstance(path, basestring):
raise TypeError('Expecting string, got ' + str(type(path)))
tr = self.__store.find(Track, Track.path == path).one()
tr = Track.get(path = path)
if not tr:
return
self.__store.find(StarredTrack, StarredTrack.starred_id == tr.id).remove()
self.__store.find(RatingTrack, RatingTrack.rated_id == tr.id).remove()
# Playlist autofix themselves
self.__store.find(User, User.last_play_id == tr.id).set(last_play_id = None)
self.__folders_to_check.add(tr.folder)
self.__albums_to_check.add(tr.album)
self.__artists_to_check.add(tr.artist)
self.__store.remove(tr)
self.__deleted_tracks += 1
tr.delete()
def move_file(self, src_path, dst_path):
if not isinstance(src_path, basestring):

View File

@ -20,129 +20,133 @@ import tempfile
import unittest
import uuid
from pony.orm import db_session, ObjectNotFound
class FolderManagerTestCase(unittest.TestCase):
def setUp(self):
# Create an empty sqlite database in memory
self.store = db.get_store("sqlite:")
# Read schema from file
with io.open('schema/sqlite.sql', 'r') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
self.store.execute(command)
self.store = db.get_database('sqlite:', True)
# Create some temporary directories
self.media_dir = tempfile.mkdtemp()
self.music_dir = tempfile.mkdtemp()
@db_session
def create_folders(self):
# Add test folders
self.assertEqual(FolderManager.add(self.store, 'media', self.media_dir), FolderManager.SUCCESS)
self.assertEqual(FolderManager.add(self.store, 'music', self.music_dir), FolderManager.SUCCESS)
self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.SUCCESS)
self.assertEqual(FolderManager.add('music', self.music_dir), FolderManager.SUCCESS)
folder = db.Folder()
folder.root = False
folder.name = 'non-root'
folder.path = os.path.join(self.music_dir, 'subfolder')
self.store.add(folder)
folder = db.Folder(
root = False,
name = 'non-root',
path = os.path.join(self.music_dir, 'subfolder')
)
artist = db.Artist()
artist.name = 'Artist'
artist = db.Artist(name = 'Artist')
album = db.Album(name = 'Album', artist = artist)
album = db.Album()
album.name = 'Album'
album.artist = artist
root = self.store.find(db.Folder, db.Folder.name == 'media').one()
track = db.Track()
track.title = 'Track'
track.artist = artist
track.album = album
track.disc = 1
track.number = 1
track.path = os.path.join(self.media_dir, 'somefile')
track.folder = root
track.root_folder = root
track.duration = 2
track.content_type = 'audio/mpeg'
track.bitrate = 320
track.last_modification = 0
self.store.add(track)
self.store.commit()
root = db.Folder.get(name = 'media')
track = db.Track(
title = 'Track',
artist = artist,
album = album,
disc = 1,
number = 1,
path = os.path.join(self.media_dir, 'somefile'),
folder = root,
root_folder = root,
duration = 2,
content_type = 'audio/mpeg',
bitrate = 320,
last_modification = 0
)
def tearDown(self):
db.release_database(self.store)
shutil.rmtree(self.media_dir)
shutil.rmtree(self.music_dir)
@db_session
def test_get_folder(self):
self.create_folders()
# Get existing folders
for name in ['media', 'music']:
folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one()
self.assertEqual(FolderManager.get(self.store, folder.id), (FolderManager.SUCCESS, folder))
folder = db.Folder.get(name = name, root = True)
self.assertEqual(FolderManager.get(folder.id), (FolderManager.SUCCESS, folder))
# Get with invalid UUID
self.assertEqual(FolderManager.get(self.store, 'invalid-uuid'), (FolderManager.INVALID_ID, None))
self.assertEqual(FolderManager.get(self.store, 0xdeadbeef), (FolderManager.INVALID_ID, None))
self.assertEqual(FolderManager.get('invalid-uuid'), (FolderManager.INVALID_ID, None))
self.assertEqual(FolderManager.get(0xdeadbeef), (FolderManager.INVALID_ID, None))
# Non-existent folder
self.assertEqual(FolderManager.get(self.store, uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None))
self.assertEqual(FolderManager.get(uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None))
@db_session
def test_add_folder(self):
# Added in setUp()
self.assertEqual(self.store.find(db.Folder).count(), 3)
self.create_folders()
self.assertEqual(db.Folder.select().count(), 3)
# Create duplicate
self.assertEqual(FolderManager.add(self.store,'media', self.media_dir), FolderManager.NAME_EXISTS)
self.assertEqual(self.store.find(db.Folder, db.Folder.name == 'media').count(), 1)
self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.NAME_EXISTS)
self.assertEqual(db.Folder.select(lambda f: f.name == 'media').count(), 1)
# Duplicate path
self.assertEqual(FolderManager.add(self.store,'new-folder', self.media_dir), FolderManager.PATH_EXISTS)
self.assertEqual(self.store.find(db.Folder, db.Folder.path == self.media_dir).count(), 1)
self.assertEqual(FolderManager.add('new-folder', self.media_dir), FolderManager.PATH_EXISTS)
self.assertEqual(db.Folder.select(lambda f: f.path == self.media_dir).count(), 1)
# Invalid path
path = os.path.abspath('/this/not/is/valid')
self.assertEqual(FolderManager.add(self.store,'invalid-path', path), FolderManager.INVALID_PATH)
self.assertEqual(self.store.find(db.Folder, db.Folder.path == path).count(), 0)
self.assertEqual(FolderManager.add('invalid-path', path), FolderManager.INVALID_PATH)
self.assertFalse(db.Folder.exists(path = path))
# Subfolder of already added path
path = os.path.join(self.media_dir, 'subfolder')
os.mkdir(path)
self.assertEqual(FolderManager.add(self.store,'subfolder', path), FolderManager.PATH_EXISTS)
self.assertEqual(self.store.find(db.Folder).count(), 3)
self.assertEqual(FolderManager.add('subfolder', path), FolderManager.PATH_EXISTS)
self.assertEqual(db.Folder.select().count(), 3)
# Parent folder of an already added path
path = os.path.join(self.media_dir, '..')
self.assertEqual(FolderManager.add(self.store, 'parent', path), FolderManager.SUBPATH_EXISTS)
self.assertEqual(self.store.find(db.Folder).count(), 3)
self.assertEqual(FolderManager.add('parent', path), FolderManager.SUBPATH_EXISTS)
self.assertEqual(db.Folder.select().count(), 3)
@db_session
def test_delete_folder(self):
self.create_folders()
# Delete existing folders
for name in ['media', 'music']:
folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one()
self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.SUCCESS)
self.assertIsNone(self.store.get(db.Folder, folder.id))
folder = db.Folder.get(name = name, root = True)
self.assertEqual(FolderManager.delete(folder.id), FolderManager.SUCCESS)
self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id)
# Delete invalid UUID
self.assertEqual(FolderManager.delete(self.store, 'invalid-uuid'), FolderManager.INVALID_ID)
self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining
self.assertEqual(FolderManager.delete('invalid-uuid'), FolderManager.INVALID_ID)
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
# Delete non-existent folder
self.assertEqual(FolderManager.delete(self.store, uuid.uuid4()), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining
self.assertEqual(FolderManager.delete(uuid.uuid4()), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
# Delete non-root folder
folder = self.store.find(db.Folder, db.Folder.name == 'non-root').one()
self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining
folder = db.Folder.get(name = 'non-root')
self.assertEqual(FolderManager.delete(folder.id), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
@db_session
def test_delete_by_name(self):
self.create_folders()
# Delete existing folders
for name in ['media', 'music']:
self.assertEqual(FolderManager.delete_by_name(self.store, name), FolderManager.SUCCESS)
self.assertEqual(self.store.find(db.Folder, db.Folder.name == name).count(), 0)
self.assertEqual(FolderManager.delete_by_name(name), FolderManager.SUCCESS)
self.assertFalse(db.Folder.exists(name = name))
# Delete non-existent folder
self.assertEqual(FolderManager.delete_by_name(self.store, 'null'), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining
self.assertEqual(FolderManager.delete_by_name('null'), FolderManager.NO_SUCH_FOLDER)
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
def test_human_readable_error(self):
values = [ FolderManager.SUCCESS, FolderManager.INVALID_ID, FolderManager.NAME_EXISTS,

View File

@ -13,61 +13,51 @@
from supysonic import db
from supysonic.managers.user import UserManager
import io
import unittest
import uuid
import io
from pony.orm import db_session
from pony.orm import ObjectNotFound
class UserManagerTestCase(unittest.TestCase):
def setUp(self):
# Create an empty sqlite database in memory
self.store = db.get_store("sqlite:")
# Read schema from file
with io.open('schema/sqlite.sql', 'r') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
self.store.execute(command)
self.store = db.get_database('sqlite:', True)
@db_session
def create_data(self):
# Create some users
self.assertEqual(UserManager.add(self.store, 'alice', 'ALICE', 'test@example.com', True), UserManager.SUCCESS)
self.assertEqual(UserManager.add(self.store, 'bob', 'BOB', 'bob@example.com', False), UserManager.SUCCESS)
self.assertEqual(UserManager.add(self.store, 'charlie', 'CHARLIE', 'charlie@example.com', False), UserManager.SUCCESS)
self.assertEqual(UserManager.add('alice', 'ALICE', 'test@example.com', True), UserManager.SUCCESS)
self.assertEqual(UserManager.add('bob', 'BOB', 'bob@example.com', False), UserManager.SUCCESS)
self.assertEqual(UserManager.add('charlie', 'CHARLIE', 'charlie@example.com', False), UserManager.SUCCESS)
folder = db.Folder()
folder.name = 'Root'
folder.path = 'tests/assets'
folder.root = True
folder = db.Folder(name = 'Root', path = 'tests/assets', root = True)
artist = db.Artist(name = 'Artist')
album = db.Album(name = 'Album', artist = artist)
track = db.Track(
title = 'Track',
disc = 1,
number = 1,
duration = 1,
artist = artist,
album = album,
path = 'tests/assets/empty',
folder = folder,
root_folder = folder,
content_type = 'audio/mpeg',
bitrate = 320,
last_modification = 0
)
artist = db.Artist()
artist.name = 'Artist'
album = db.Album()
album.name = 'Album'
album.artist = artist
track = db.Track()
track.title = 'Track'
track.disc = 1
track.number = 1
track.duration = 1
track.artist = artist
track.album = album
track.path = 'tests/assets/empty'
track.folder = folder
track.root_folder = folder
track.duration = 2
track.content_type = 'audio/mpeg'
track.bitrate = 320
track.last_modification = 0
self.store.add(track)
self.store.commit()
playlist = db.Playlist()
playlist.name = 'Playlist'
playlist.user = self.store.find(db.User, db.User.name == 'alice').one()
playlist = db.Playlist(
name = 'Playlist',
user = db.User.get(name = 'alice')
)
playlist.add(track)
self.store.add(playlist)
self.store.commit()
def tearDown(self):
db.release_database(self.store)
def test_encrypt_password(self):
func = UserManager._UserManager__encrypt_password
@ -75,96 +65,116 @@ class UserManagerTestCase(unittest.TestCase):
self.assertEqual(func(u'pass-word',u'pepper'), (u'd68c95a91ed7773aa57c7c044d2309a5bf1da2e7', u'pepper'))
self.assertEqual(func(u'éèàïô', u'ABC+'), (u'b639ba5217b89c906019d89d5816b407d8730898', u'ABC+'))
@db_session
def test_get_user(self):
self.create_data()
# Get existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.get(self.store, user.id), (UserManager.SUCCESS, user))
user = db.User.get(name = name)
self.assertEqual(UserManager.get(user.id), (UserManager.SUCCESS, user))
# Get with invalid UUID
self.assertEqual(UserManager.get(self.store, 'invalid-uuid'), (UserManager.INVALID_ID, None))
self.assertEqual(UserManager.get(self.store, 0xfee1bad), (UserManager.INVALID_ID, None))
self.assertEqual(UserManager.get('invalid-uuid'), (UserManager.INVALID_ID, None))
self.assertEqual(UserManager.get(0xfee1bad), (UserManager.INVALID_ID, None))
# Non-existent user
self.assertEqual(UserManager.get(self.store, uuid.uuid4()), (UserManager.NO_SUCH_USER, None))
self.assertEqual(UserManager.get(uuid.uuid4()), (UserManager.NO_SUCH_USER, None))
@db_session
def test_add_user(self):
# Added in setUp()
self.assertEqual(self.store.find(db.User).count(), 3)
self.create_data()
self.assertEqual(db.User.select().count(), 3)
# Create duplicate
self.assertEqual(UserManager.add(self.store, 'alice', 'Alic3', 'alice@example.com', True), UserManager.NAME_EXISTS)
self.assertEqual(UserManager.add('alice', 'Alic3', 'alice@example.com', True), UserManager.NAME_EXISTS)
@db_session
def test_delete_user(self):
self.create_data()
# Delete invalid UUID
self.assertEqual(UserManager.delete(self.store, 'invalid-uuid'), UserManager.INVALID_ID)
self.assertEqual(UserManager.delete(self.store, 0xfee1b4d), UserManager.INVALID_ID)
self.assertEqual(self.store.find(db.User).count(), 3)
self.assertEqual(UserManager.delete('invalid-uuid'), UserManager.INVALID_ID)
self.assertEqual(UserManager.delete(0xfee1b4d), UserManager.INVALID_ID)
self.assertEqual(db.User.select().count(), 3)
# Delete non-existent user
self.assertEqual(UserManager.delete(self.store, uuid.uuid4()), UserManager.NO_SUCH_USER)
self.assertEqual(self.store.find(db.User).count(), 3)
self.assertEqual(UserManager.delete(uuid.uuid4()), UserManager.NO_SUCH_USER)
self.assertEqual(db.User.select().count(), 3)
# Delete existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.delete(self.store, user.id), UserManager.SUCCESS)
self.assertIsNone(self.store.get(db.User, user.id))
self.assertEqual(self.store.find(db.User).count(), 0)
user = db.User.get(name = name)
self.assertEqual(UserManager.delete(user.id), UserManager.SUCCESS)
self.assertRaises(ObjectNotFound, db.User.__getitem__, user.id)
self.store.commit()
self.assertEqual(db.User.select().count(), 0)
@db_session
def test_delete_by_name(self):
self.create_data()
# Delete existing users
for name in ['alice', 'bob', 'charlie']:
self.assertEqual(UserManager.delete_by_name(self.store, name), UserManager.SUCCESS)
self.assertEqual(self.store.find(db.User, db.User.name == name).count(), 0)
self.assertEqual(UserManager.delete_by_name(name), UserManager.SUCCESS)
self.assertFalse(db.User.exists(name = name))
# Delete non-existent user
self.assertEqual(UserManager.delete_by_name(self.store, 'null'), UserManager.NO_SUCH_USER)
self.assertEqual(UserManager.delete_by_name('null'), UserManager.NO_SUCH_USER)
@db_session
def test_try_auth(self):
self.create_data()
# Test authentication
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.SUCCESS, user))
user = db.User.get(name = name)
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.SUCCESS, user))
# Wrong password
self.assertEqual(UserManager.try_auth(self.store, 'alice', 'bad'), (UserManager.WRONG_PASS, None))
self.assertEqual(UserManager.try_auth(self.store, 'alice', 'alice'), (UserManager.WRONG_PASS, None))
self.assertEqual(UserManager.try_auth('alice', 'bad'), (UserManager.WRONG_PASS, None))
self.assertEqual(UserManager.try_auth('alice', 'alice'), (UserManager.WRONG_PASS, None))
# Non-existent user
self.assertEqual(UserManager.try_auth(self.store, 'null', 'null'), (UserManager.NO_SUCH_USER, None))
self.assertEqual(UserManager.try_auth('null', 'null'), (UserManager.NO_SUCH_USER, None))
@db_session
def test_change_password(self):
self.create_data()
# With existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
user = db.User.get(name = name)
# Good password
self.assertEqual(UserManager.change_password(self.store, user.id, name.upper(), 'newpass'), UserManager.SUCCESS)
self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user))
self.assertEqual(UserManager.change_password(user.id, name.upper(), 'newpass'), UserManager.SUCCESS)
self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user))
# Old password
self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.WRONG_PASS, None))
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None))
# Wrong password
self.assertEqual(UserManager.change_password(self.store, user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS)
self.assertEqual(UserManager.change_password(user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS)
# Ensure we still got the same number of users
self.assertEqual(self.store.find(db.User).count(), 3)
self.assertEqual(db.User.select().count(), 3)
# With invalid UUID
self.assertEqual(UserManager.change_password(self.store, 'invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID)
self.assertEqual(UserManager.change_password('invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID)
# Non-existent user
self.assertEqual(UserManager.change_password(self.store, uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER)
self.assertEqual(UserManager.change_password(uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER)
@db_session
def test_change_password2(self):
self.create_data()
# With existing users
for name in ['alice', 'bob', 'charlie']:
self.assertEqual(UserManager.change_password2(self.store, name, 'newpass'), UserManager.SUCCESS)
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user))
self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.WRONG_PASS, None))
self.assertEqual(UserManager.change_password2(name, 'newpass'), UserManager.SUCCESS)
user = db.User.get(name = name)
self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user))
self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None))
# Non-existent user
self.assertEqual(UserManager.change_password2(self.store, 'null', 'newpass'), UserManager.NO_SUCH_USER)
self.assertEqual(UserManager.change_password2('null', 'newpass'), UserManager.NO_SUCH_USER)
def test_human_readable_error(self):
values = [ UserManager.SUCCESS, UserManager.INVALID_ID, UserManager.NO_SUCH_USER, UserManager.NAME_EXISTS,