1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 08:56:17 +00:00

Porting supysonic.cli

This commit is contained in:
Alban Féron 2022-12-11 15:12:03 +01:00
parent 83ba85aaf1
commit cd369f6c7f
No known key found for this signature in database
GPG Key ID: 8CE0313646D16165
3 changed files with 54 additions and 63 deletions

View File

@ -1,7 +1,7 @@
# This file is part of Supysonic. # This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# #
# Copyright (C) 2013-2021 Alban 'spl0k' Féron # Copyright (C) 2013-2022 Alban 'spl0k' Féron
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
@ -9,8 +9,6 @@ import click
import time import time
from click.exceptions import ClickException from click.exceptions import ClickException
from pony.orm import db_session, select
from pony.orm.core import ObjectNotFound
from .config import IniConfig from .config import IniConfig
from .daemon.client import DaemonClient from .daemon.client import DaemonClient
@ -52,12 +50,11 @@ def folder():
@folder.command("list") @folder.command("list")
@db_session
def folder_list(): def folder_list():
"""Lists folders.""" """Lists folders."""
click.echo("Name\t\tPath\n----\t\t----") click.echo("Name\t\tPath\n----\t\t----")
for f in Folder.select(lambda f: f.root): for f in Folder.select().where(Folder.root):
click.echo("{: <16}{}".format(f.name, f.path)) click.echo("{: <16}{}".format(f.name, f.path))
@ -67,7 +64,6 @@ def folder_list():
"path", "path",
type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True), type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True),
) )
@db_session
def folder_add(name, path): def folder_add(name, path):
"""Adds a folder. """Adds a folder.
@ -87,7 +83,6 @@ def folder_add(name, path):
@folder.command("delete") @folder.command("delete")
@click.argument("name") @click.argument("name")
@db_session
def folder_delete(name): def folder_delete(name):
"""Deletes a folder. """Deletes a folder.
@ -97,7 +92,7 @@ def folder_delete(name):
try: try:
FolderManager.delete_by_name(name) FolderManager.delete_by_name(name)
click.echo("Deleted folder '{}'".format(name)) click.echo("Deleted folder '{}'".format(name))
except ObjectNotFound as e: except Folder.DoesNotExist as e:
raise ClickException("Folder '{}' does not exist.".format(name)) from e raise ClickException("Folder '{}' does not exist.".format(name)) from e
@ -196,17 +191,20 @@ def _folder_scan_foreground(config, daemon, folders, force):
if folders: if folders:
fstrs = folders fstrs = folders
with db_session: folders = [
folders = select(f.name for f in Folder if f.root and f.name in fstrs)[:] f
for f, in Folder.select(Folder.name)
.where(Folder.root, Folder.name.in_(fstrs))
.tuples()
]
notfound = set(fstrs) - set(folders) notfound = set(fstrs) - set(folders)
if notfound: if notfound:
click.echo("No such folder(s): " + " ".join(notfound)) click.echo("No such folder(s): " + " ".join(notfound))
for folder in folders: for folder in folders:
scanner.queue_folder(folder) scanner.queue_folder(folder)
else: else:
with db_session: for (folder,) in Folder.select(Folder.name).where(Folder.root).tuples():
for folder in select(f.name for f in Folder if f.root): scanner.queue_folder(folder)
scanner.queue_folder(folder)
scanner.run() scanner.run()
stats = scanner.stats() stats = scanner.stats()
@ -235,7 +233,6 @@ def user():
@user.command("list") @user.command("list")
@db_session
def user_list(): def user_list():
"""Lists users.""" """Lists users."""
@ -253,7 +250,6 @@ def user_list():
@click.argument("name") @click.argument("name")
@click.password_option("-p", "--password", help="Specifies the user's password") @click.password_option("-p", "--password", help="Specifies the user's password")
@click.option("-e", "--email", default="", help="Sets the user's email address") @click.option("-e", "--email", default="", help="Sets the user's email address")
@db_session
def user_add(name, password, email): def user_add(name, password, email):
"""Adds a new user. """Adds a new user.
@ -268,7 +264,6 @@ def user_add(name, password, email):
@user.command("delete") @user.command("delete")
@click.argument("name") @click.argument("name")
@db_session
def user_delete(name): def user_delete(name):
"""Deletes a user. """Deletes a user.
@ -278,7 +273,7 @@ def user_delete(name):
try: try:
UserManager.delete_by_name(name) UserManager.delete_by_name(name)
click.echo("Deleted user '{}'".format(name)) click.echo("Deleted user '{}'".format(name))
except ObjectNotFound as e: except User.DoesNotExist as e:
raise ClickException("User '{}' does not exist.".format(name)) from e raise ClickException("User '{}' does not exist.".format(name)) from e
@ -299,16 +294,16 @@ def _echo_role_change(username, name, value):
default=None, default=None,
help="Grant or revoke jukebox rights", help="Grant or revoke jukebox rights",
) )
@db_session
def user_roles(name, admin, jukebox): def user_roles(name, admin, jukebox):
"""Enable/disable rights for a user. """Enable/disable rights for a user.
NAME is the login of the user to which grant or revoke rights. NAME is the login of the user to which grant or revoke rights.
""" """
user = User.get(name=name) try:
if user is None: user = User.get(name=name)
raise ClickException("No such user") except User.DoesNotExist as e:
raise ClickException("No such user") from e
if admin is not None: if admin is not None:
user.admin = admin user.admin = admin
@ -316,12 +311,12 @@ def user_roles(name, admin, jukebox):
if jukebox is not None: if jukebox is not None:
user.jukebox = jukebox user.jukebox = jukebox
_echo_role_change(name, "jukebox", jukebox) _echo_role_change(name, "jukebox", jukebox)
user.save()
@user.command("changepass") @user.command("changepass")
@click.argument("name") @click.argument("name")
@click.password_option("-p", "--password", help="New password") @click.password_option("-p", "--password", help="New password")
@db_session
def user_changepass(name, password): def user_changepass(name, password):
"""Changes a user's password. """Changes a user's password.
@ -331,14 +326,13 @@ def user_changepass(name, password):
try: try:
UserManager.change_password2(name, password) UserManager.change_password2(name, password)
click.echo("Successfully changed '{}' password".format(name)) click.echo("Successfully changed '{}' password".format(name))
except ObjectNotFound as e: except User.DoesNotExist as e:
raise ClickException("User '{}' does not exist.".format(name)) from e raise ClickException("User '{}' does not exist.".format(name)) from e
@user.command("rename") @user.command("rename")
@click.argument("name") @click.argument("name")
@click.argument("newname") @click.argument("newname")
@db_session
def user_rename(name, newname): def user_rename(name, newname):
"""Renames a user. """Renames a user.
@ -351,14 +345,19 @@ def user_rename(name, newname):
if name == newname: if name == newname:
return return
user = User.get(name=name) try:
if user is None: user = User.get(name=name)
raise ClickException("No such user") except User.DoesNotExist as e:
raise ClickException("No such user") from e
if User.get(name=newname) is not None: try:
User.get(name=newname)
raise ClickException("This name is already taken") raise ClickException("This name is already taken")
except User.DoesNotExist:
pass
user.name = newname user.name = newname
user.save()
click.echo("User '{}' renamed to '{}'".format(name, newname)) click.echo("User '{}' renamed to '{}'".format(name, newname))

View File

@ -60,8 +60,9 @@ class PathMixin:
return db.Model.get.__func__(cls, *args, **kwargs) return db.Model.get.__func__(cls, *args, **kwargs)
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
path = kwargs["path"] if "path" in kwargs:
kwargs["_path_hash"] = sha1(path.encode("utf-8")).digest() path = kwargs["path"]
kwargs["_path_hash"] = sha1(path.encode("utf-8")).digest()
db.Model.__init__(self, *args, **kwargs) db.Model.__init__(self, *args, **kwargs)
def __setattr__(self, attr, value): def __setattr__(self, attr, value):
@ -191,10 +192,14 @@ class Artist(db.Model):
@classmethod @classmethod
def prune(cls): def prune(cls):
return cls.delete().where( return (
cls.id.not_in(Album.select(Album.artist)), cls.delete()
cls.id.not_in(Track.select(Track.artist)), .where(
).execute() cls.id.not_in(Album.select(Album.artist)),
cls.id.not_in(Track.select(Track.artist)),
)
.execute()
)
class Album(db.Model): class Album(db.Model):

View File

@ -1,7 +1,7 @@
# This file is part of Supysonic. # This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# #
# Copyright (C) 2017-2021 Alban 'spl0k' Féron # Copyright (C) 2017-2022 Alban 'spl0k' Féron
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
@ -11,7 +11,6 @@ import shlex
import unittest import unittest
from click.testing import CliRunner from click.testing import CliRunner
from pony.orm import db_session
from supysonic.db import Folder, User, init_database, release_database from supysonic.db import Folder, User, init_database, release_database
from supysonic.cli import cli from supysonic.cli import cli
@ -48,10 +47,9 @@ class CLITestCase(unittest.TestCase):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d) self.__add_folder("tmpfolder", d)
with db_session: f = Folder.select().first()
f = Folder.select().first() self.assertIsNotNone(f)
self.assertIsNotNone(f) self.assertEqual(f.path, d)
self.assertEqual(f.path, d)
def test_folder_add_errors(self): def test_folder_add_errors(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
@ -61,8 +59,7 @@ class CLITestCase(unittest.TestCase):
self.__add_folder("f1", d, True) self.__add_folder("f1", d, True)
self.__invoke("folder add f3 /invalid/path", True) self.__invoke("folder add f3 /invalid/path", True)
with db_session: self.assertEqual(Folder.select().count(), 1)
self.assertEqual(Folder.select().count(), 1)
def test_folder_delete(self): def test_folder_delete(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
@ -70,8 +67,7 @@ class CLITestCase(unittest.TestCase):
self.__invoke("folder delete randomfolder", True) self.__invoke("folder delete randomfolder", True)
self.__invoke("folder delete tmpfolder") self.__invoke("folder delete tmpfolder")
with db_session: self.assertEqual(Folder.select().count(), 0)
self.assertEqual(Folder.select().count(), 0)
def test_folder_list(self): def test_folder_list(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
@ -91,16 +87,14 @@ class CLITestCase(unittest.TestCase):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user add -p alice alice", True) self.__invoke("user add -p alice alice", True)
with db_session: self.assertEqual(User.select().count(), 1)
self.assertEqual(User.select().count(), 1)
def test_user_delete(self): def test_user_delete(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user delete alice") self.__invoke("user delete alice")
self.__invoke("user delete bob", True) self.__invoke("user delete bob", True)
with db_session: self.assertEqual(User.select().count(), 0)
self.assertEqual(User.select().count(), 0)
def test_user_list(self): def test_user_list(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
@ -111,28 +105,24 @@ class CLITestCase(unittest.TestCase):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user setroles -A alice") self.__invoke("user setroles -A alice")
self.__invoke("user setroles -A bob", True) self.__invoke("user setroles -A bob", True)
with db_session: self.assertTrue(User.get(name="alice").admin)
self.assertTrue(User.get(name="alice").admin)
def test_user_unsetadmin(self): def test_user_unsetadmin(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user setroles -A alice") self.__invoke("user setroles -A alice")
self.__invoke("user setroles -a alice") self.__invoke("user setroles -a alice")
with db_session: self.assertFalse(User.get(name="alice").admin)
self.assertFalse(User.get(name="alice").admin)
def test_user_setjukebox(self): def test_user_setjukebox(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user setroles -J alice") self.__invoke("user setroles -J alice")
with db_session: self.assertTrue(User.get(name="alice").jukebox)
self.assertTrue(User.get(name="alice").jukebox)
def test_user_unsetjukebox(self): def test_user_unsetjukebox(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__invoke("user setroles -J alice") self.__invoke("user setroles -J alice")
self.__invoke("user setroles -j alice") self.__invoke("user setroles -j alice")
with db_session: self.assertFalse(User.get(name="alice").jukebox)
self.assertFalse(User.get(name="alice").jukebox)
def test_user_changepass(self): def test_user_changepass(self):
self.__invoke("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
@ -145,18 +135,15 @@ class CLITestCase(unittest.TestCase):
self.__invoke("user rename bob charles", True) self.__invoke("user rename bob charles", True)
self.__invoke("user rename alice ''", True) self.__invoke("user rename alice ''", True)
with db_session: self.assertEqual(User.select().first().name, "alice")
self.assertEqual(User.select().first().name, "alice")
self.__invoke("user rename alice bob") self.__invoke("user rename alice bob")
with db_session: self.assertEqual(User.select().first().name, "bob")
self.assertEqual(User.select().first().name, "bob")
self.__invoke("user add -p Ch4rl3s charles") self.__invoke("user add -p Ch4rl3s charles")
self.__invoke("user rename bob charles", True) self.__invoke("user rename bob charles", True)
with db_session: self.assertEqual(User.select().where(User.name == "bob").count(), 1)
self.assertEqual(User.select(lambda u: u.name == "bob").count(), 1) self.assertEqual(User.select().where(User.name == "charles").count(), 1)
self.assertEqual(User.select(lambda u: u.name == "charles").count(), 1)
if __name__ == "__main__": if __name__ == "__main__":