diff --git a/supysonic/cli.py b/supysonic/cli.py index d22b148..2eacb5f 100644 --- a/supysonic/cli.py +++ b/supysonic/cli.py @@ -1,7 +1,7 @@ # This file is part of Supysonic. # Supysonic is a Python implementation of the Subsonic server API. # -# Copyright (C) 2013-2021 Alban 'spl0k' Féron +# Copyright (C) 2013-2022 Alban 'spl0k' Féron # # Distributed under terms of the GNU AGPLv3 license. @@ -9,8 +9,6 @@ import click import time from click.exceptions import ClickException -from pony.orm import db_session, select -from pony.orm.core import ObjectNotFound from .config import IniConfig from .daemon.client import DaemonClient @@ -52,12 +50,11 @@ def folder(): @folder.command("list") -@db_session def folder_list(): """Lists folders.""" click.echo("Name\t\tPath\n----\t\t----") - for f in Folder.select(lambda f: f.root): + for f in Folder.select().where(Folder.root): click.echo("{: <16}{}".format(f.name, f.path)) @@ -67,7 +64,6 @@ def folder_list(): "path", type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True), ) -@db_session def folder_add(name, path): """Adds a folder. @@ -87,7 +83,6 @@ def folder_add(name, path): @folder.command("delete") @click.argument("name") -@db_session def folder_delete(name): """Deletes a folder. @@ -97,7 +92,7 @@ def folder_delete(name): try: FolderManager.delete_by_name(name) click.echo("Deleted folder '{}'".format(name)) - except ObjectNotFound as e: + except Folder.DoesNotExist as e: raise ClickException("Folder '{}' does not exist.".format(name)) from e @@ -196,17 +191,20 @@ def _folder_scan_foreground(config, daemon, folders, force): if folders: fstrs = folders - with db_session: - folders = select(f.name for f in Folder if f.root and f.name in fstrs)[:] + folders = [ + f + for f, in Folder.select(Folder.name) + .where(Folder.root, Folder.name.in_(fstrs)) + .tuples() + ] notfound = set(fstrs) - set(folders) if notfound: click.echo("No such folder(s): " + " ".join(notfound)) for folder in folders: scanner.queue_folder(folder) else: - with db_session: - for folder in select(f.name for f in Folder if f.root): - scanner.queue_folder(folder) + for (folder,) in Folder.select(Folder.name).where(Folder.root).tuples(): + scanner.queue_folder(folder) scanner.run() stats = scanner.stats() @@ -235,7 +233,6 @@ def user(): @user.command("list") -@db_session def user_list(): """Lists users.""" @@ -253,7 +250,6 @@ def user_list(): @click.argument("name") @click.password_option("-p", "--password", help="Specifies the user's password") @click.option("-e", "--email", default="", help="Sets the user's email address") -@db_session def user_add(name, password, email): """Adds a new user. @@ -268,7 +264,6 @@ def user_add(name, password, email): @user.command("delete") @click.argument("name") -@db_session def user_delete(name): """Deletes a user. @@ -278,7 +273,7 @@ def user_delete(name): try: UserManager.delete_by_name(name) click.echo("Deleted user '{}'".format(name)) - except ObjectNotFound as e: + except User.DoesNotExist as e: raise ClickException("User '{}' does not exist.".format(name)) from e @@ -299,16 +294,16 @@ def _echo_role_change(username, name, value): default=None, help="Grant or revoke jukebox rights", ) -@db_session def user_roles(name, admin, jukebox): """Enable/disable rights for a user. NAME is the login of the user to which grant or revoke rights. """ - user = User.get(name=name) - if user is None: - raise ClickException("No such user") + try: + user = User.get(name=name) + except User.DoesNotExist as e: + raise ClickException("No such user") from e if admin is not None: user.admin = admin @@ -316,12 +311,12 @@ def user_roles(name, admin, jukebox): if jukebox is not None: user.jukebox = jukebox _echo_role_change(name, "jukebox", jukebox) + user.save() @user.command("changepass") @click.argument("name") @click.password_option("-p", "--password", help="New password") -@db_session def user_changepass(name, password): """Changes a user's password. @@ -331,14 +326,13 @@ def user_changepass(name, password): try: UserManager.change_password2(name, password) click.echo("Successfully changed '{}' password".format(name)) - except ObjectNotFound as e: + except User.DoesNotExist as e: raise ClickException("User '{}' does not exist.".format(name)) from e @user.command("rename") @click.argument("name") @click.argument("newname") -@db_session def user_rename(name, newname): """Renames a user. @@ -351,14 +345,19 @@ def user_rename(name, newname): if name == newname: return - user = User.get(name=name) - if user is None: - raise ClickException("No such user") + try: + user = User.get(name=name) + except User.DoesNotExist as e: + raise ClickException("No such user") from e - if User.get(name=newname) is not None: + try: + User.get(name=newname) raise ClickException("This name is already taken") + except User.DoesNotExist: + pass user.name = newname + user.save() click.echo("User '{}' renamed to '{}'".format(name, newname)) diff --git a/supysonic/db.py b/supysonic/db.py index d83fae4..ef99907 100644 --- a/supysonic/db.py +++ b/supysonic/db.py @@ -60,8 +60,9 @@ class PathMixin: return db.Model.get.__func__(cls, *args, **kwargs) def __init__(self, *args, **kwargs): - path = kwargs["path"] - kwargs["_path_hash"] = sha1(path.encode("utf-8")).digest() + if "path" in kwargs: + path = kwargs["path"] + kwargs["_path_hash"] = sha1(path.encode("utf-8")).digest() db.Model.__init__(self, *args, **kwargs) def __setattr__(self, attr, value): @@ -191,10 +192,14 @@ class Artist(db.Model): @classmethod def prune(cls): - return cls.delete().where( - cls.id.not_in(Album.select(Album.artist)), - cls.id.not_in(Track.select(Track.artist)), - ).execute() + return ( + cls.delete() + .where( + cls.id.not_in(Album.select(Album.artist)), + cls.id.not_in(Track.select(Track.artist)), + ) + .execute() + ) class Album(db.Model): diff --git a/tests/base/test_cli.py b/tests/base/test_cli.py index 51e055f..27dfbff 100644 --- a/tests/base/test_cli.py +++ b/tests/base/test_cli.py @@ -1,7 +1,7 @@ # This file is part of Supysonic. # Supysonic is a Python implementation of the Subsonic server API. # -# Copyright (C) 2017-2021 Alban 'spl0k' Féron +# Copyright (C) 2017-2022 Alban 'spl0k' Féron # # Distributed under terms of the GNU AGPLv3 license. @@ -11,7 +11,6 @@ import shlex import unittest from click.testing import CliRunner -from pony.orm import db_session from supysonic.db import Folder, User, init_database, release_database from supysonic.cli import cli @@ -48,10 +47,9 @@ class CLITestCase(unittest.TestCase): with tempfile.TemporaryDirectory() as d: self.__add_folder("tmpfolder", d) - with db_session: - f = Folder.select().first() - self.assertIsNotNone(f) - self.assertEqual(f.path, d) + f = Folder.select().first() + self.assertIsNotNone(f) + self.assertEqual(f.path, d) def test_folder_add_errors(self): with tempfile.TemporaryDirectory() as d: @@ -61,8 +59,7 @@ class CLITestCase(unittest.TestCase): self.__add_folder("f1", d, True) self.__invoke("folder add f3 /invalid/path", True) - with db_session: - self.assertEqual(Folder.select().count(), 1) + self.assertEqual(Folder.select().count(), 1) def test_folder_delete(self): with tempfile.TemporaryDirectory() as d: @@ -70,8 +67,7 @@ class CLITestCase(unittest.TestCase): self.__invoke("folder delete randomfolder", True) self.__invoke("folder delete tmpfolder") - with db_session: - self.assertEqual(Folder.select().count(), 0) + self.assertEqual(Folder.select().count(), 0) def test_folder_list(self): with tempfile.TemporaryDirectory() as d: @@ -91,16 +87,14 @@ class CLITestCase(unittest.TestCase): self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p alice alice", True) - with db_session: - self.assertEqual(User.select().count(), 1) + self.assertEqual(User.select().count(), 1) def test_user_delete(self): self.__invoke("user add -p Alic3 alice") self.__invoke("user delete alice") self.__invoke("user delete bob", True) - with db_session: - self.assertEqual(User.select().count(), 0) + self.assertEqual(User.select().count(), 0) def test_user_list(self): self.__invoke("user add -p Alic3 alice") @@ -111,28 +105,24 @@ class CLITestCase(unittest.TestCase): self.__invoke("user add -p Alic3 alice") self.__invoke("user setroles -A alice") self.__invoke("user setroles -A bob", True) - with db_session: - self.assertTrue(User.get(name="alice").admin) + self.assertTrue(User.get(name="alice").admin) def test_user_unsetadmin(self): self.__invoke("user add -p Alic3 alice") self.__invoke("user setroles -A alice") self.__invoke("user setroles -a alice") - with db_session: - self.assertFalse(User.get(name="alice").admin) + self.assertFalse(User.get(name="alice").admin) def test_user_setjukebox(self): self.__invoke("user add -p Alic3 alice") self.__invoke("user setroles -J alice") - with db_session: - self.assertTrue(User.get(name="alice").jukebox) + self.assertTrue(User.get(name="alice").jukebox) def test_user_unsetjukebox(self): self.__invoke("user add -p Alic3 alice") self.__invoke("user setroles -J alice") self.__invoke("user setroles -j alice") - with db_session: - self.assertFalse(User.get(name="alice").jukebox) + self.assertFalse(User.get(name="alice").jukebox) def test_user_changepass(self): self.__invoke("user add -p Alic3 alice") @@ -145,18 +135,15 @@ class CLITestCase(unittest.TestCase): self.__invoke("user rename bob charles", True) self.__invoke("user rename alice ''", True) - with db_session: - self.assertEqual(User.select().first().name, "alice") + self.assertEqual(User.select().first().name, "alice") self.__invoke("user rename alice bob") - with db_session: - self.assertEqual(User.select().first().name, "bob") + self.assertEqual(User.select().first().name, "bob") self.__invoke("user add -p Ch4rl3s charles") self.__invoke("user rename bob charles", True) - with db_session: - self.assertEqual(User.select(lambda u: u.name == "bob").count(), 1) - self.assertEqual(User.select(lambda u: u.name == "charles").count(), 1) + self.assertEqual(User.select().where(User.name == "bob").count(), 1) + self.assertEqual(User.select().where(User.name == "charles").count(), 1) if __name__ == "__main__":