From a4b9a972716224a9b5867c5c4bb9e22cfa4f2305 Mon Sep 17 00:00:00 2001 From: spl0k Date: Sat, 16 Dec 2017 17:42:53 +0100 Subject: [PATCH] All managers get a pony --- supysonic/managers/folder.py | 57 ++++----- supysonic/managers/user.py | 84 ++++++------- supysonic/scanner.py | 49 +------- tests/managers/test_manager_folder.py | 140 +++++++++++---------- tests/managers/test_manager_user.py | 174 ++++++++++++++------------ 5 files changed, 235 insertions(+), 269 deletions(-) diff --git a/supysonic/managers/folder.py b/supysonic/managers/folder.py index 547a711..f5f04e6 100644 --- a/supysonic/managers/folder.py +++ b/supysonic/managers/folder.py @@ -21,6 +21,9 @@ import os.path import uuid +from pony.orm import db_session, select +from pony.orm import ObjectNotFound + from ..db import Folder, Artist, Album, Track, StarredFolder, RatingFolder from ..scanner import Scanner @@ -34,7 +37,8 @@ class FolderManager: SUBPATH_EXISTS = 6 @staticmethod - def get(store, uid): + @db_session + def get(uid): if isinstance(uid, basestring): try: uid = uuid.UUID(uid) @@ -45,65 +49,56 @@ class FolderManager: else: return FolderManager.INVALID_ID, None - folder = store.get(Folder, uid) - if not folder: + try: + folder = Folder[uid] + return FolderManager.SUCCESS, folder + except ObjectNotFound: return FolderManager.NO_SUCH_FOLDER, None - return FolderManager.SUCCESS, folder - @staticmethod - def add(store, name, path): - if not store.find(Folder, Folder.name == name, Folder.root == True).is_empty(): + @db_session + def add(name, path): + if Folder.get(name = name, root = True) is not None: return FolderManager.NAME_EXISTS path = unicode(os.path.abspath(path)) if not os.path.isdir(path): return FolderManager.INVALID_PATH - if not store.find(Folder, Folder.path == path).is_empty(): + if Folder.get(path = path) is not None: return FolderManager.PATH_EXISTS - if any(path.startswith(p) for p in store.find(Folder).values(Folder.path)): + if any(path.startswith(p) for p in select(f.path for f in Folder)): return FolderManager.PATH_EXISTS - if not store.find(Folder, Folder.path.startswith(path)).is_empty(): + if Folder.exists(lambda f: f.path.startswith(path)): return FolderManager.SUBPATH_EXISTS - folder = Folder() - folder.root = True - folder.name = name - folder.path = path - - store.add(folder) - store.commit() - + folder = Folder(root = True, name = name, path = path) return FolderManager.SUCCESS @staticmethod - def delete(store, uid): - status, folder = FolderManager.get(store, uid) + @db_session + def delete(uid): + status, folder = FolderManager.get(uid) if status != FolderManager.SUCCESS: return status if not folder.root: return FolderManager.NO_SUCH_FOLDER - scanner = Scanner(store) - for track in store.find(Track, Track.root_folder_id == folder.id): + scanner = Scanner() + for track in Track.select(lambda t: t.root_folder == folder): scanner.remove_file(track.path) scanner.finish() - store.find(StarredFolder, StarredFolder.starred_id == uid).remove() - store.find(RatingFolder, RatingFolder.rated_id == uid).remove() - - store.remove(folder) - store.commit() - + folder.delete() return FolderManager.SUCCESS @staticmethod - def delete_by_name(store, name): - folder = store.find(Folder, Folder.name == name, Folder.root == True).one() + @db_session + def delete_by_name(name): + folder = Folder.get(name = name, root = True) if not folder: return FolderManager.NO_SUCH_FOLDER - return FolderManager.delete(store, folder.id) + return FolderManager.delete(folder.id) @staticmethod def error_str(err): diff --git a/supysonic/managers/user.py b/supysonic/managers/user.py index 2431c68..d36d5c0 100644 --- a/supysonic/managers/user.py +++ b/supysonic/managers/user.py @@ -14,6 +14,9 @@ import random import string import uuid +from pony.orm import db_session +from pony.orm import ObjectNotFound + from ..db import User, ChatMessage, Playlist from ..db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack from ..db import RatingFolder, RatingTrack @@ -26,7 +29,8 @@ class UserManager: WRONG_PASS = 4 @staticmethod - def get(store, uid): + @db_session + def get(uid): if type(uid) in (str, unicode): try: uid = uuid.UUID(uid) @@ -37,63 +41,53 @@ class UserManager: else: return UserManager.INVALID_ID, None - user = store.get(User, uid) - if user is None: + try: + user = User[uid] + return UserManager.SUCCESS, user + except ObjectNotFound: return UserManager.NO_SUCH_USER, None - return UserManager.SUCCESS, user - @staticmethod - def add(store, name, password, mail, admin): - if store.find(User, User.name == name).one(): + @db_session + def add(name, password, mail, admin): + if User.get(name = name) is not None: return UserManager.NAME_EXISTS crypt, salt = UserManager.__encrypt_password(password) - user = User() - user.name = name - user.mail = mail - user.password = crypt - user.salt = salt - user.admin = admin - - store.add(user) - store.commit() + user = User( + name = name, + mail = mail, + password = crypt, + salt = salt, + admin = admin + ) return UserManager.SUCCESS @staticmethod - def delete(store, uid): - status, user = UserManager.get(store, uid) + @db_session + def delete(uid): + status, user = UserManager.get(uid) if status != UserManager.SUCCESS: return status - store.find(StarredFolder, StarredFolder.user_id == user.id).remove() - store.find(StarredArtist, StarredArtist.user_id == user.id).remove() - store.find(StarredAlbum, StarredAlbum.user_id == user.id).remove() - store.find(StarredTrack, StarredTrack.user_id == user.id).remove() - store.find(RatingFolder, RatingFolder.user_id == user.id).remove() - store.find(RatingTrack, RatingTrack.user_id == user.id).remove() - store.find(ChatMessage, ChatMessage.user_id == user.id).remove() - for playlist in store.find(Playlist, Playlist.user_id == user.id): - store.remove(playlist) - - store.remove(user) - store.commit() - + user.delete() return UserManager.SUCCESS @staticmethod - def delete_by_name(store, name): - user = store.find(User, User.name == name).one() - if not user: + @db_session + def delete_by_name(name): + user = User.get(name = name) + if user is None: return UserManager.NO_SUCH_USER - return UserManager.delete(store, user.id) + return UserManager.delete(user.id) @staticmethod - def try_auth(store, name, password): - user = store.find(User, User.name == name).one() - if not user: + @db_session + def try_auth(name, password): + user = User.get(name = name) + if user is None: return UserManager.NO_SUCH_USER, None elif UserManager.__encrypt_password(password, user.salt)[0] != user.password: return UserManager.WRONG_PASS, None @@ -101,8 +95,9 @@ class UserManager: return UserManager.SUCCESS, user @staticmethod - def change_password(store, uid, old_pass, new_pass): - status, user = UserManager.get(store, uid) + @db_session + def change_password(uid, old_pass, new_pass): + status, user = UserManager.get(uid) if status != UserManager.SUCCESS: return status @@ -110,17 +105,16 @@ class UserManager: return UserManager.WRONG_PASS user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] - store.commit() return UserManager.SUCCESS @staticmethod - def change_password2(store, name, new_pass): - user = store.find(User, User.name == name).one() - if not user: + @db_session + def change_password2(name, new_pass): + user = User.get(name = name) + if user is None: return UserManager.NO_SUCH_USER user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] - store.commit() return UserManager.SUCCESS @staticmethod diff --git a/supysonic/scanner.py b/supysonic/scanner.py index 9edff22..aedccd1 100644 --- a/supysonic/scanner.py +++ b/supysonic/scanner.py @@ -23,40 +23,15 @@ import mimetypes import mutagen import time -from storm.expr import ComparableExpr, compile, Like -from storm.exceptions import NotSupportedError - from .db import Folder, Artist, Album, Track, User from .db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack from .db import RatingFolder, RatingTrack -# Hacking in support for a concatenation expression -class Concat(ComparableExpr): - __slots__ = ("left", "right", "db") - - def __init__(self, left, right, db): - self.left = left - self.right = right - self.db = db - -@compile.when(Concat) -def compile_concat(compile, concat, state): - left = compile(concat.left, state) - right = compile(concat.right, state) - if concat.db in ('sqlite', 'postgres'): - statement = "%s||%s" - elif concat.db == 'mysql': - statement = "CONCAT(%s, %s)" - else: - raise NotSupportedError("Unspported database (%s)" % concat.db) - return statement % (left, right) - class Scanner: - def __init__(self, store, force = False, extensions = None): + def __init__(self, force = False, extensions = None): if extensions is not None and not isinstance(extensions, list): raise TypeError('Invalid extensions type') - self.__store = store self.__force = force self.__added_artists = 0 @@ -106,18 +81,14 @@ class Scanner: def finish(self): for album in [ a for a in self.__albums_to_check if not a.tracks.count() ]: - self.__store.find(StarredAlbum, StarredAlbum.starred_id == album.id).remove() - self.__artists_to_check.add(album.artist) - self.__store.remove(album) self.__deleted_albums += 1 + album.delete() self.__albums_to_check.clear() for artist in [ a for a in self.__artists_to_check if not a.albums.count() and not a.tracks.count() ]: - self.__store.find(StarredArtist, StarredArtist.starred_id == artist.id).remove() - - self.__store.remove(artist) self.__deleted_artists += 1 + artist.delete() self.__artists_to_check.clear() while self.__folders_to_check: @@ -126,11 +97,8 @@ class Scanner: continue if not folder.tracks.count() and not folder.children.count(): - self.__store.find(StarredFolder, StarredFolder.starred_id == folder.id).remove() - self.__store.find(RatingFolder, RatingFolder.rated_id == folder.id).remove() - self.__folders_to_check.add(folder.parent) - self.__store.remove(folder) + folder.delete() def __is_valid_path(self, path): if not os.path.exists(path): @@ -206,20 +174,15 @@ class Scanner: if not isinstance(path, basestring): raise TypeError('Expecting string, got ' + str(type(path))) - tr = self.__store.find(Track, Track.path == path).one() + tr = Track.get(path = path) if not tr: return - self.__store.find(StarredTrack, StarredTrack.starred_id == tr.id).remove() - self.__store.find(RatingTrack, RatingTrack.rated_id == tr.id).remove() - # Playlist autofix themselves - self.__store.find(User, User.last_play_id == tr.id).set(last_play_id = None) - self.__folders_to_check.add(tr.folder) self.__albums_to_check.add(tr.album) self.__artists_to_check.add(tr.artist) - self.__store.remove(tr) self.__deleted_tracks += 1 + tr.delete() def move_file(self, src_path, dst_path): if not isinstance(src_path, basestring): diff --git a/tests/managers/test_manager_folder.py b/tests/managers/test_manager_folder.py index 6a2e47b..97e721d 100644 --- a/tests/managers/test_manager_folder.py +++ b/tests/managers/test_manager_folder.py @@ -20,129 +20,133 @@ import tempfile import unittest import uuid +from pony.orm import db_session, ObjectNotFound + class FolderManagerTestCase(unittest.TestCase): def setUp(self): # Create an empty sqlite database in memory - self.store = db.get_store("sqlite:") - # Read schema from file - with io.open('schema/sqlite.sql', 'r') as sql: - schema = sql.read() - # Create tables on memory database - for command in schema.split(';'): - self.store.execute(command) + self.store = db.get_database('sqlite:', True) # Create some temporary directories self.media_dir = tempfile.mkdtemp() self.music_dir = tempfile.mkdtemp() + + @db_session + def create_folders(self): # Add test folders - self.assertEqual(FolderManager.add(self.store, 'media', self.media_dir), FolderManager.SUCCESS) - self.assertEqual(FolderManager.add(self.store, 'music', self.music_dir), FolderManager.SUCCESS) + self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.SUCCESS) + self.assertEqual(FolderManager.add('music', self.music_dir), FolderManager.SUCCESS) - folder = db.Folder() - folder.root = False - folder.name = 'non-root' - folder.path = os.path.join(self.music_dir, 'subfolder') - self.store.add(folder) + folder = db.Folder( + root = False, + name = 'non-root', + path = os.path.join(self.music_dir, 'subfolder') + ) - artist = db.Artist() - artist.name = 'Artist' + artist = db.Artist(name = 'Artist') + album = db.Album(name = 'Album', artist = artist) - album = db.Album() - album.name = 'Album' - album.artist = artist - - root = self.store.find(db.Folder, db.Folder.name == 'media').one() - track = db.Track() - track.title = 'Track' - track.artist = artist - track.album = album - track.disc = 1 - track.number = 1 - track.path = os.path.join(self.media_dir, 'somefile') - track.folder = root - track.root_folder = root - track.duration = 2 - track.content_type = 'audio/mpeg' - track.bitrate = 320 - track.last_modification = 0 - self.store.add(track) - - self.store.commit() + root = db.Folder.get(name = 'media') + track = db.Track( + title = 'Track', + artist = artist, + album = album, + disc = 1, + number = 1, + path = os.path.join(self.media_dir, 'somefile'), + folder = root, + root_folder = root, + duration = 2, + content_type = 'audio/mpeg', + bitrate = 320, + last_modification = 0 + ) def tearDown(self): + db.release_database(self.store) shutil.rmtree(self.media_dir) shutil.rmtree(self.music_dir) + @db_session def test_get_folder(self): + self.create_folders() + # Get existing folders for name in ['media', 'music']: - folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one() - self.assertEqual(FolderManager.get(self.store, folder.id), (FolderManager.SUCCESS, folder)) + folder = db.Folder.get(name = name, root = True) + self.assertEqual(FolderManager.get(folder.id), (FolderManager.SUCCESS, folder)) # Get with invalid UUID - self.assertEqual(FolderManager.get(self.store, 'invalid-uuid'), (FolderManager.INVALID_ID, None)) - self.assertEqual(FolderManager.get(self.store, 0xdeadbeef), (FolderManager.INVALID_ID, None)) + self.assertEqual(FolderManager.get('invalid-uuid'), (FolderManager.INVALID_ID, None)) + self.assertEqual(FolderManager.get(0xdeadbeef), (FolderManager.INVALID_ID, None)) # Non-existent folder - self.assertEqual(FolderManager.get(self.store, uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None)) + self.assertEqual(FolderManager.get(uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None)) + @db_session def test_add_folder(self): - # Added in setUp() - self.assertEqual(self.store.find(db.Folder).count(), 3) + self.create_folders() + self.assertEqual(db.Folder.select().count(), 3) # Create duplicate - self.assertEqual(FolderManager.add(self.store,'media', self.media_dir), FolderManager.NAME_EXISTS) - self.assertEqual(self.store.find(db.Folder, db.Folder.name == 'media').count(), 1) + self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.NAME_EXISTS) + self.assertEqual(db.Folder.select(lambda f: f.name == 'media').count(), 1) # Duplicate path - self.assertEqual(FolderManager.add(self.store,'new-folder', self.media_dir), FolderManager.PATH_EXISTS) - self.assertEqual(self.store.find(db.Folder, db.Folder.path == self.media_dir).count(), 1) + self.assertEqual(FolderManager.add('new-folder', self.media_dir), FolderManager.PATH_EXISTS) + self.assertEqual(db.Folder.select(lambda f: f.path == self.media_dir).count(), 1) # Invalid path path = os.path.abspath('/this/not/is/valid') - self.assertEqual(FolderManager.add(self.store,'invalid-path', path), FolderManager.INVALID_PATH) - self.assertEqual(self.store.find(db.Folder, db.Folder.path == path).count(), 0) + self.assertEqual(FolderManager.add('invalid-path', path), FolderManager.INVALID_PATH) + self.assertFalse(db.Folder.exists(path = path)) # Subfolder of already added path path = os.path.join(self.media_dir, 'subfolder') os.mkdir(path) - self.assertEqual(FolderManager.add(self.store,'subfolder', path), FolderManager.PATH_EXISTS) - self.assertEqual(self.store.find(db.Folder).count(), 3) + self.assertEqual(FolderManager.add('subfolder', path), FolderManager.PATH_EXISTS) + self.assertEqual(db.Folder.select().count(), 3) # Parent folder of an already added path path = os.path.join(self.media_dir, '..') - self.assertEqual(FolderManager.add(self.store, 'parent', path), FolderManager.SUBPATH_EXISTS) - self.assertEqual(self.store.find(db.Folder).count(), 3) + self.assertEqual(FolderManager.add('parent', path), FolderManager.SUBPATH_EXISTS) + self.assertEqual(db.Folder.select().count(), 3) + @db_session def test_delete_folder(self): + self.create_folders() + # Delete existing folders for name in ['media', 'music']: - folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one() - self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.SUCCESS) - self.assertIsNone(self.store.get(db.Folder, folder.id)) + folder = db.Folder.get(name = name, root = True) + self.assertEqual(FolderManager.delete(folder.id), FolderManager.SUCCESS) + self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id) # Delete invalid UUID - self.assertEqual(FolderManager.delete(self.store, 'invalid-uuid'), FolderManager.INVALID_ID) - self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining + self.assertEqual(FolderManager.delete('invalid-uuid'), FolderManager.INVALID_ID) + self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining # Delete non-existent folder - self.assertEqual(FolderManager.delete(self.store, uuid.uuid4()), FolderManager.NO_SUCH_FOLDER) - self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining + self.assertEqual(FolderManager.delete(uuid.uuid4()), FolderManager.NO_SUCH_FOLDER) + self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining # Delete non-root folder - folder = self.store.find(db.Folder, db.Folder.name == 'non-root').one() - self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.NO_SUCH_FOLDER) - self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining + folder = db.Folder.get(name = 'non-root') + self.assertEqual(FolderManager.delete(folder.id), FolderManager.NO_SUCH_FOLDER) + self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining + @db_session def test_delete_by_name(self): + self.create_folders() + # Delete existing folders for name in ['media', 'music']: - self.assertEqual(FolderManager.delete_by_name(self.store, name), FolderManager.SUCCESS) - self.assertEqual(self.store.find(db.Folder, db.Folder.name == name).count(), 0) + self.assertEqual(FolderManager.delete_by_name(name), FolderManager.SUCCESS) + self.assertFalse(db.Folder.exists(name = name)) # Delete non-existent folder - self.assertEqual(FolderManager.delete_by_name(self.store, 'null'), FolderManager.NO_SUCH_FOLDER) - self.assertEqual(self.store.find(db.Folder).count(), 1) # 'non-root' remaining + self.assertEqual(FolderManager.delete_by_name('null'), FolderManager.NO_SUCH_FOLDER) + self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining def test_human_readable_error(self): values = [ FolderManager.SUCCESS, FolderManager.INVALID_ID, FolderManager.NAME_EXISTS, diff --git a/tests/managers/test_manager_user.py b/tests/managers/test_manager_user.py index daf282c..9cf2cba 100644 --- a/tests/managers/test_manager_user.py +++ b/tests/managers/test_manager_user.py @@ -13,61 +13,51 @@ from supysonic import db from supysonic.managers.user import UserManager +import io import unittest import uuid -import io + +from pony.orm import db_session +from pony.orm import ObjectNotFound class UserManagerTestCase(unittest.TestCase): def setUp(self): # Create an empty sqlite database in memory - self.store = db.get_store("sqlite:") - # Read schema from file - with io.open('schema/sqlite.sql', 'r') as sql: - schema = sql.read() - # Create tables on memory database - for command in schema.split(';'): - self.store.execute(command) + self.store = db.get_database('sqlite:', True) + @db_session + def create_data(self): # Create some users - self.assertEqual(UserManager.add(self.store, 'alice', 'ALICE', 'test@example.com', True), UserManager.SUCCESS) - self.assertEqual(UserManager.add(self.store, 'bob', 'BOB', 'bob@example.com', False), UserManager.SUCCESS) - self.assertEqual(UserManager.add(self.store, 'charlie', 'CHARLIE', 'charlie@example.com', False), UserManager.SUCCESS) + self.assertEqual(UserManager.add('alice', 'ALICE', 'test@example.com', True), UserManager.SUCCESS) + self.assertEqual(UserManager.add('bob', 'BOB', 'bob@example.com', False), UserManager.SUCCESS) + self.assertEqual(UserManager.add('charlie', 'CHARLIE', 'charlie@example.com', False), UserManager.SUCCESS) - folder = db.Folder() - folder.name = 'Root' - folder.path = 'tests/assets' - folder.root = True + folder = db.Folder(name = 'Root', path = 'tests/assets', root = True) + artist = db.Artist(name = 'Artist') + album = db.Album(name = 'Album', artist = artist) + track = db.Track( + title = 'Track', + disc = 1, + number = 1, + duration = 1, + artist = artist, + album = album, + path = 'tests/assets/empty', + folder = folder, + root_folder = folder, + content_type = 'audio/mpeg', + bitrate = 320, + last_modification = 0 + ) - artist = db.Artist() - artist.name = 'Artist' - - album = db.Album() - album.name = 'Album' - album.artist = artist - - track = db.Track() - track.title = 'Track' - track.disc = 1 - track.number = 1 - track.duration = 1 - track.artist = artist - track.album = album - track.path = 'tests/assets/empty' - track.folder = folder - track.root_folder = folder - track.duration = 2 - track.content_type = 'audio/mpeg' - track.bitrate = 320 - track.last_modification = 0 - self.store.add(track) - self.store.commit() - - playlist = db.Playlist() - playlist.name = 'Playlist' - playlist.user = self.store.find(db.User, db.User.name == 'alice').one() + playlist = db.Playlist( + name = 'Playlist', + user = db.User.get(name = 'alice') + ) playlist.add(track) - self.store.add(playlist) - self.store.commit() + + def tearDown(self): + db.release_database(self.store) def test_encrypt_password(self): func = UserManager._UserManager__encrypt_password @@ -75,96 +65,116 @@ class UserManagerTestCase(unittest.TestCase): self.assertEqual(func(u'pass-word',u'pepper'), (u'd68c95a91ed7773aa57c7c044d2309a5bf1da2e7', u'pepper')) self.assertEqual(func(u'éèàïô', u'ABC+'), (u'b639ba5217b89c906019d89d5816b407d8730898', u'ABC+')) + @db_session def test_get_user(self): + self.create_data() + # Get existing users for name in ['alice', 'bob', 'charlie']: - user = self.store.find(db.User, db.User.name == name).one() - self.assertEqual(UserManager.get(self.store, user.id), (UserManager.SUCCESS, user)) + user = db.User.get(name = name) + self.assertEqual(UserManager.get(user.id), (UserManager.SUCCESS, user)) # Get with invalid UUID - self.assertEqual(UserManager.get(self.store, 'invalid-uuid'), (UserManager.INVALID_ID, None)) - self.assertEqual(UserManager.get(self.store, 0xfee1bad), (UserManager.INVALID_ID, None)) + self.assertEqual(UserManager.get('invalid-uuid'), (UserManager.INVALID_ID, None)) + self.assertEqual(UserManager.get(0xfee1bad), (UserManager.INVALID_ID, None)) # Non-existent user - self.assertEqual(UserManager.get(self.store, uuid.uuid4()), (UserManager.NO_SUCH_USER, None)) + self.assertEqual(UserManager.get(uuid.uuid4()), (UserManager.NO_SUCH_USER, None)) + @db_session def test_add_user(self): - # Added in setUp() - self.assertEqual(self.store.find(db.User).count(), 3) + self.create_data() + self.assertEqual(db.User.select().count(), 3) # Create duplicate - self.assertEqual(UserManager.add(self.store, 'alice', 'Alic3', 'alice@example.com', True), UserManager.NAME_EXISTS) + self.assertEqual(UserManager.add('alice', 'Alic3', 'alice@example.com', True), UserManager.NAME_EXISTS) + @db_session def test_delete_user(self): + self.create_data() + # Delete invalid UUID - self.assertEqual(UserManager.delete(self.store, 'invalid-uuid'), UserManager.INVALID_ID) - self.assertEqual(UserManager.delete(self.store, 0xfee1b4d), UserManager.INVALID_ID) - self.assertEqual(self.store.find(db.User).count(), 3) + self.assertEqual(UserManager.delete('invalid-uuid'), UserManager.INVALID_ID) + self.assertEqual(UserManager.delete(0xfee1b4d), UserManager.INVALID_ID) + self.assertEqual(db.User.select().count(), 3) # Delete non-existent user - self.assertEqual(UserManager.delete(self.store, uuid.uuid4()), UserManager.NO_SUCH_USER) - self.assertEqual(self.store.find(db.User).count(), 3) + self.assertEqual(UserManager.delete(uuid.uuid4()), UserManager.NO_SUCH_USER) + self.assertEqual(db.User.select().count(), 3) # Delete existing users for name in ['alice', 'bob', 'charlie']: - user = self.store.find(db.User, db.User.name == name).one() - self.assertEqual(UserManager.delete(self.store, user.id), UserManager.SUCCESS) - self.assertIsNone(self.store.get(db.User, user.id)) - self.assertEqual(self.store.find(db.User).count(), 0) + user = db.User.get(name = name) + self.assertEqual(UserManager.delete(user.id), UserManager.SUCCESS) + self.assertRaises(ObjectNotFound, db.User.__getitem__, user.id) + self.store.commit() + self.assertEqual(db.User.select().count(), 0) + @db_session def test_delete_by_name(self): + self.create_data() + # Delete existing users for name in ['alice', 'bob', 'charlie']: - self.assertEqual(UserManager.delete_by_name(self.store, name), UserManager.SUCCESS) - self.assertEqual(self.store.find(db.User, db.User.name == name).count(), 0) + self.assertEqual(UserManager.delete_by_name(name), UserManager.SUCCESS) + self.assertFalse(db.User.exists(name = name)) # Delete non-existent user - self.assertEqual(UserManager.delete_by_name(self.store, 'null'), UserManager.NO_SUCH_USER) + self.assertEqual(UserManager.delete_by_name('null'), UserManager.NO_SUCH_USER) + @db_session def test_try_auth(self): + self.create_data() + # Test authentication for name in ['alice', 'bob', 'charlie']: - user = self.store.find(db.User, db.User.name == name).one() - self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.SUCCESS, user)) + user = db.User.get(name = name) + self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.SUCCESS, user)) # Wrong password - self.assertEqual(UserManager.try_auth(self.store, 'alice', 'bad'), (UserManager.WRONG_PASS, None)) - self.assertEqual(UserManager.try_auth(self.store, 'alice', 'alice'), (UserManager.WRONG_PASS, None)) + self.assertEqual(UserManager.try_auth('alice', 'bad'), (UserManager.WRONG_PASS, None)) + self.assertEqual(UserManager.try_auth('alice', 'alice'), (UserManager.WRONG_PASS, None)) # Non-existent user - self.assertEqual(UserManager.try_auth(self.store, 'null', 'null'), (UserManager.NO_SUCH_USER, None)) + self.assertEqual(UserManager.try_auth('null', 'null'), (UserManager.NO_SUCH_USER, None)) + @db_session def test_change_password(self): + self.create_data() + # With existing users for name in ['alice', 'bob', 'charlie']: - user = self.store.find(db.User, db.User.name == name).one() + user = db.User.get(name = name) # Good password - self.assertEqual(UserManager.change_password(self.store, user.id, name.upper(), 'newpass'), UserManager.SUCCESS) - self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user)) + self.assertEqual(UserManager.change_password(user.id, name.upper(), 'newpass'), UserManager.SUCCESS) + self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user)) # Old password - self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.WRONG_PASS, None)) + self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None)) # Wrong password - self.assertEqual(UserManager.change_password(self.store, user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS) + self.assertEqual(UserManager.change_password(user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS) # Ensure we still got the same number of users - self.assertEqual(self.store.find(db.User).count(), 3) + self.assertEqual(db.User.select().count(), 3) # With invalid UUID - self.assertEqual(UserManager.change_password(self.store, 'invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID) + self.assertEqual(UserManager.change_password('invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID) # Non-existent user - self.assertEqual(UserManager.change_password(self.store, uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER) + self.assertEqual(UserManager.change_password(uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER) + @db_session def test_change_password2(self): + self.create_data() + # With existing users for name in ['alice', 'bob', 'charlie']: - self.assertEqual(UserManager.change_password2(self.store, name, 'newpass'), UserManager.SUCCESS) - user = self.store.find(db.User, db.User.name == name).one() - self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user)) - self.assertEqual(UserManager.try_auth(self.store, name, name.upper()), (UserManager.WRONG_PASS, None)) + self.assertEqual(UserManager.change_password2(name, 'newpass'), UserManager.SUCCESS) + user = db.User.get(name = name) + self.assertEqual(UserManager.try_auth(name, 'newpass'), (UserManager.SUCCESS, user)) + self.assertEqual(UserManager.try_auth(name, name.upper()), (UserManager.WRONG_PASS, None)) # Non-existent user - self.assertEqual(UserManager.change_password2(self.store, 'null', 'newpass'), UserManager.NO_SUCH_USER) + self.assertEqual(UserManager.change_password2('null', 'newpass'), UserManager.NO_SUCH_USER) def test_human_readable_error(self): values = [ UserManager.SUCCESS, UserManager.INVALID_ID, UserManager.NO_SUCH_USER, UserManager.NAME_EXISTS,