1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-11-09 11:42:16 +00:00

Rewriting the CLI using click rather than cmd+argparse

This commit is contained in:
Alban Féron 2021-11-11 16:47:37 +01:00
parent 8e33b374fe
commit f4bfc735e8
No known key found for this signature in database
GPG Key ID: 8CE0313646D16165
6 changed files with 355 additions and 432 deletions

View File

@ -13,6 +13,7 @@ Supysonic folder management commands
Synopsis Synopsis
======== ========
| ``supysonic-cli folder --help``
| ``supysonic-cli folder list`` | ``supysonic-cli folder list``
| ``supysonic-cli folder add`` `name` `path` | ``supysonic-cli folder add`` `name` `path`
| ``supysonic-cli folder delete`` `name` | ``supysonic-cli folder delete`` `name`
@ -43,6 +44,10 @@ audio files are located. This allows to list, add, delete and scan the folders.
Options Options
======= =======
-h, --help
Shows help and exits. Depending on where this option appears it will either list the
available commands or display help for a specific command.
-f, --force -f, --force
Force scan of already known files even if they haven't changed. Might be Force scan of already known files even if they haven't changed. Might be
useful if an update to Supysonic adds new metadata to audio files. useful if an update to Supysonic adds new metadata to audio files.

View File

@ -13,10 +13,11 @@ Supysonic user management commands
Synopsis Synopsis
======== ========
| ``supysonic-cli user --help``
| ``supysonic-cli user list`` | ``supysonic-cli user list``
| ``supysonic-cli user add`` `user` [``--password`` `password`] [``--email`` `email`] | ``supysonic-cli user add`` `user` [``--password`` `password`] [``--email`` `email`]
| ``supysonic-cli user delete`` `user` | ``supysonic-cli user delete`` `user`
| ``supysonic-cli user changepass`` `user` `password` | ``supysonic-cli user changepass`` `user` [``--password`` `password`]
| ``supysonic-cli user setroles`` [``--admin``\|\ ``--noadmin``] [``--jukebox``\|\ ``--nojukebox``] `user` | ``supysonic-cli user setroles`` [``--admin``\|\ ``--noadmin``] [``--jukebox``\|\ ``--nojukebox``] `user`
| ``supysonic-cli user rename`` `user` `newname` | ``supysonic-cli user rename`` `user` `newname`
@ -36,7 +37,7 @@ a new user, delete an existing user, and change their password or roles.
``supysonic-cli user delete`` `user` ``supysonic-cli user delete`` `user`
Delete the user `user`. Delete the user `user`.
``supysonic-cli user changepass`` `user` [`password`] ``supysonic-cli user changepass`` `user` [``--password`` `password`]
Change the password of user `user`. Will prompt for the new password if not Change the password of user `user`. Will prompt for the new password if not
provided. provided.
@ -49,6 +50,10 @@ a new user, delete an existing user, and change their password or roles.
Options Options
======= =======
-h, --help
Shows help and exits. Depending on where this option appears it will either list the
available commands or display help for a specific command.
-p password, --password password -p password, --password password
Specify the user's password upon creation. Specify the user's password upon creation.

View File

@ -13,8 +13,8 @@ Supysonic management command line interface
Synopsis Synopsis
======== ========
| ``supysonic-cli --help``
| ``supysonic-cli`` [`subcommand`] | ``supysonic-cli`` [`subcommand`]
| ``supysonic-cli help`` [`subcommand`]
Description Description
=========== ===========
@ -35,18 +35,20 @@ The "Subsonic API" is a set of adhoc standards to browse, stream or download a
music collection over HTTP. music collection over HTTP.
The command-line interface is an interface allowing administration operations The command-line interface is an interface allowing administration operations
without the use of the web interface. If ran without arguments, without the use of the web interface.
``supysonic-cli`` will open an interactive prompt, with arguments it will run
a single command and exit. Options
=======
-h, --help
Shows the help and exits. At top level it only lists the subcommands. To
display the help of a specific subcommand, add the ``--help`` flag *after*
the said subcommand name.
Subcommands Subcommands
=========== ===========
``supysonic-cli`` has three different subcommands: ``supysonic-cli`` has two different subcommands:
``help`` [`subcommand`]
When used without argument, displays the list of available subcommands. With
an argument, shows the help and arguments for the given subcommand.
``user`` `args` ... ``user`` `args` ...
User management commands User management commands

View File

@ -5,15 +5,12 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
import argparse import click
import cmd
import getpass
import shlex
import sys
import time import time
from click.exceptions import ClickException
from pony.orm import db_session, select from pony.orm import db_session, select
from pony.orm import ObjectNotFound from pony.orm.core import ObjectNotFound
from .config import IniConfig from .config import IniConfig
from .daemon.client import DaemonClient from .daemon.client import DaemonClient
@ -25,8 +22,8 @@ from .scanner import Scanner
class TimedProgressDisplay: class TimedProgressDisplay:
def __init__(self, stdout, interval=5): def __init__(self, interval=5):
self.__stdout = stdout self.__stdout = click.get_text_stream("stdout")
self.__interval = interval self.__interval = interval
self.__last_display = 0 self.__last_display = 0
self.__last_len = 0 self.__last_len = 0
@ -42,415 +39,333 @@ class TimedProgressDisplay:
self.__last_display = time.time() self.__last_display = time.time()
class CLIParser(argparse.ArgumentParser): @click.group()
def error(self, message): def cli():
self.print_usage(sys.stderr) """Supysonic management command line interface"""
raise RuntimeError(message) pass
class SupysonicCLI(cmd.Cmd): @cli.group()
prompt = "supysonic> " def folder():
"""Folder management commands"""
pass
def _make_do(self, command):
def method(obj, line):
try:
args = getattr(obj, command + "_parser").parse_args(shlex.split(line))
except RuntimeError as e:
self.write_error_line(str(e))
return
if hasattr(obj.__class__, command + "_subparsers"): @folder.command("list")
try: @db_session
func = getattr(obj, "{}_{}".format(command, args.action)) def folder_list():
except AttributeError: """Lists folders."""
return obj.default(line)
return func(
**{key: vars(args)[key] for key in vars(args) if key != "action"}
)
else:
try:
func = getattr(obj, command)
except AttributeError:
return obj.default(line)
return func(**vars(args))
return method click.echo("Name\t\tPath\n----\t\t----")
for f in Folder.select(lambda f: f.root):
click.echo("{: <16}{}".format(f.name, f.path))
def __init__(self, config, stderr=None, *args, **kwargs):
cmd.Cmd.__init__(self, *args, **kwargs)
if stderr is not None: @folder.command("add")
self.stderr = stderr @click.argument("name")
else: @click.argument(
self.stderr = sys.stderr "path",
type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True),
)
@db_session
def folder_add(name, path):
"""Adds a folder.
self.__config = config NAME can be anything but must be unique.
self.__daemon = DaemonClient(config.DAEMON["socket"]) PATH must point to an existing readable directory on the filesystem.
# Generate do_* and help_* methods If the daemon is running it will start to listen for changes in this folder but will
for parser_name in filter( not scan files already present in the folder.
lambda attr: attr.endswith("_parser") and "_" not in attr[:-7], """
dir(self.__class__),
):
command = parser_name[:-7]
if not hasattr(self.__class__, "do_" + command): try:
setattr(self.__class__, "do_" + command, self._make_do(command)) FolderManager.add(name, path)
click.echo("Folder '{}' added".format(name))
except ValueError as e:
raise ClickException(str(e)) from e
if hasattr(self.__class__, "do_" + command) and not hasattr(
self.__class__, "help_" + command
):
setattr(
self.__class__,
"help_" + command,
getattr(self.__class__, parser_name).print_help,
)
if hasattr(self.__class__, command + "_subparsers"):
for action, subparser in getattr(
self.__class__, command + "_subparsers"
).choices.items():
setattr(
self, "help_{} {}".format(command, action), subparser.print_help
)
def write_line(self, line=""): @folder.command("delete")
self.stdout.write(line + "\n") @click.argument("name")
@db_session
def folder_delete(name):
"""Deletes a folder.
def write_error_line(self, line=""): NAME is the name of the folder to delete.
self.stderr.write(line + "\n") """
def do_EOF(self, line): try:
return True FolderManager.delete_by_name(name)
click.echo("Deleted folder '{}'".format(name))
except ObjectNotFound as e:
raise ClickException("Folder '{}' does not exist.".format(name)) from e
do_exit = do_EOF
def default(self, line): @folder.command("scan")
self.write_line("Unknown command %s" % line.split()[0]) @click.argument(
self.do_help(None) "folder",
nargs=-1,
)
@click.option(
"-f",
"--force",
is_flag=True,
default=False,
help="Force scan of already known files even if they haven't changed",
)
@click.option(
"--background",
"mode",
flag_value="background",
help="Scan the folder(s) in the background. Requires the daemon to be running.",
)
@click.option(
"--foreground",
"mode",
flag_value="foreground",
help="Scan the folder(s) in the foreground, blocking the processus while the scan is running.",
)
@click.pass_obj
def folder_scan(config, folder, force, mode):
"""Run a scan on specified folders.
def postloop(self): FOLDER is the name of the folder to scan. Multiple can be specified. If ommitted,
self.write_line() all folders are scanned.
"""
def completedefault(self, text, line, begidx, endidx): daemon = DaemonClient(config.DAEMON["socket"])
command = line.split()[0]
parsers = getattr(self.__class__, command + "_subparsers", None)
if not parsers:
return []
num_words = len(line[len(command) : begidx].split()) # quick and dirty shorthand calls
if num_words == 0: scan_bg = lambda: daemon.scan(folder, force)
return [a for a in parsers.choices if a.startswith(text)] scan_fg = lambda: _folder_scan_foreground(config, daemon, folder, force)
return []
folder_parser = CLIParser(prog="folder", add_help=False) auto = not mode
folder_subparsers = folder_parser.add_subparsers(dest="action") if auto:
folder_subparsers.add_parser("list", help="Lists folders", add_help=False) try:
folder_add_parser = folder_subparsers.add_parser( scan_bg()
"add", help="Adds a folder", add_help=False except DaemonUnavailableError:
) click.echo(
folder_add_parser.add_argument("name", help="Name of the folder to add") "Couldn't connect to the daemon, scanning in foreground", err=True
folder_add_parser.add_argument(
"path", help="Path to the directory pointed by the folder"
)
folder_del_parser = folder_subparsers.add_parser(
"delete", help="Deletes a folder", add_help=False
)
folder_del_parser.add_argument("name", help="Name of the folder to delete")
folder_scan_parser = folder_subparsers.add_parser(
"scan", help="Run a scan on specified folders", add_help=False
)
folder_scan_parser.add_argument(
"folders",
metavar="folder",
nargs="*",
help="Folder(s) to be scanned. If ommitted, all folders are scanned",
)
folder_scan_parser.add_argument(
"-f",
"--force",
action="store_true",
help="Force scan of already know files even if they haven't changed",
)
folder_scan_target_group = folder_scan_parser.add_mutually_exclusive_group()
folder_scan_target_group.add_argument(
"--background",
action="store_true",
help="Scan the folder(s) in the background. Requires the daemon to be running.",
)
folder_scan_target_group.add_argument(
"--foreground",
action="store_true",
help="Scan the folder(s) in the foreground, blocking the processus while the scan is running.",
)
@db_session
def folder_list(self):
self.write_line("Name\t\tPath\n----\t\t----")
self.write_line(
"\n".join(
"{: <16}{}".format(f.name, f.path)
for f in Folder.select(lambda f: f.root)
) )
) scan_fg()
elif mode == "background":
@db_session
def folder_add(self, name, path):
try: try:
FolderManager.add(name, path) scan_bg()
self.write_line("Folder '{}' added".format(name)) except DaemonUnavailableError as e:
except ValueError as e: raise ClickException(
self.write_error_line(str(e)) "Couldn't connect to the daemon, please use the '--foreground' option",
) from e
elif mode == "foreground":
scan_fg()
@db_session
def folder_delete(self, name): def _folder_scan_foreground(config, daemon, folders, force):
try:
progress = daemon.get_scanning_progress()
if progress is not None:
raise ClickException(
"The daemon is currently scanning, can't start a scan now"
)
except DaemonUnavailableError:
pass
extensions = config.BASE["scanner_extensions"]
if extensions:
extensions = extensions.split(" ")
def unwatch_folder(folder):
try: try:
FolderManager.delete_by_name(name) daemon.remove_watched_folder(folder.path)
self.write_line("Deleted folder '{}'".format(name))
except ObjectNotFound as e:
self.write_error_line(str(e))
def folder_scan(self, folders, force, background, foreground):
auto = not background and not foreground
if auto:
try:
self.__folder_scan_background(folders, force)
except DaemonUnavailableError:
self.write_error_line(
"Couldn't connect to the daemon, scanning in foreground"
)
self.__folder_scan_foreground(folders, force)
elif background:
try:
self.__folder_scan_background(folders, force)
except DaemonUnavailableError:
self.write_error_line(
"Couldn't connect to the daemon, please use the '--foreground' option"
)
elif foreground:
self.__folder_scan_foreground(folders, force)
def __folder_scan_background(self, folders, force):
self.__daemon.scan(folders, force)
def __folder_scan_foreground(self, folders, force):
try:
progress = self.__daemon.get_scanning_progress()
if progress is not None:
self.write_error_line(
"The daemon is currently scanning, can't start a scan now"
)
return
except DaemonUnavailableError: except DaemonUnavailableError:
pass pass
extensions = self.__config.BASE["scanner_extensions"] def watch_folder(folder):
if extensions: try:
extensions = extensions.split(" ") daemon.add_watched_folder(folder.path)
except DaemonUnavailableError:
pass
scanner = Scanner( scanner = Scanner(
force=force, force=force,
extensions=extensions, extensions=extensions,
follow_symlinks=self.__config.BASE["follow_symlinks"], follow_symlinks=config.BASE["follow_symlinks"],
progress=TimedProgressDisplay(self.stdout), progress=TimedProgressDisplay(),
on_folder_start=self.__unwatch_folder, on_folder_start=unwatch_folder,
on_folder_end=self.__watch_folder, on_folder_end=watch_folder,
) )
if folders: if folders:
fstrs = folders fstrs = folders
with db_session: with db_session:
folders = select(f.name for f in Folder if f.root and f.name in fstrs)[ folders = select(f.name for f in Folder if f.root and f.name in fstrs)[:]
: notfound = set(fstrs) - set(folders)
] if notfound:
notfound = set(fstrs) - set(folders) click.echo("No such folder(s): " + " ".join(notfound))
if notfound: for folder in folders:
self.write_line("No such folder(s): " + " ".join(notfound)) scanner.queue_folder(folder)
for folder in folders: else:
with db_session:
for folder in select(f.name for f in Folder if f.root):
scanner.queue_folder(folder) scanner.queue_folder(folder)
else:
with db_session:
for folder in select(f.name for f in Folder if f.root):
scanner.queue_folder(folder)
scanner.run() scanner.run()
stats = scanner.stats() stats = scanner.stats()
self.write_line("\nScanning done") click.echo("\nScanning done")
self.write_line( click.echo(
"Added: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( "Added: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format(
stats.added stats.added
)
) )
self.write_line( )
"Deleted: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( click.echo(
stats.deleted "Deleted: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format(
) stats.deleted
) )
if stats.errors: )
self.write_line("Errors in:") if stats.errors:
for err in stats.errors: click.echo("Errors in:")
self.write_line("- " + err) for err in stats.errors:
click.echo("- " + err)
def __unwatch_folder(self, folder):
try:
self.__daemon.remove_watched_folder(folder.path)
except DaemonUnavailableError:
pass
def __watch_folder(self, folder): @cli.group("user")
try: def user():
self.__daemon.add_watched_folder(folder.path) """User management commands"""
except DaemonUnavailableError: pass
pass
user_parser = CLIParser(prog="user", add_help=False)
user_subparsers = user_parser.add_subparsers(dest="action")
user_subparsers.add_parser("list", help="List users", add_help=False)
user_add_parser = user_subparsers.add_parser(
"add", help="Adds a user", add_help=False
)
user_add_parser.add_argument("name", help="Name/login of the user to add")
user_add_parser.add_argument(
"-p", "--password", help="Specifies the user's password"
)
user_add_parser.add_argument(
"-e", "--email", default="", help="Sets the user's email address"
)
user_del_parser = user_subparsers.add_parser(
"delete", help="Deletes a user", add_help=False
)
user_del_parser.add_argument("name", help="Name/login of the user to delete")
user_roles_parser = user_subparsers.add_parser(
"setroles", help="Enable/disable rights for a user", add_help=False
)
user_roles_parser.add_argument(
"name", help="Name/login of the user to grant/revoke admin rights"
)
user_roles_admin_group = user_roles_parser.add_mutually_exclusive_group()
user_roles_admin_group.add_argument(
"-A", "--admin", action="store_true", help="Grant admin rights"
)
user_roles_admin_group.add_argument(
"-a", "--noadmin", action="store_true", help="Revoke admin rights"
)
user_roles_jukebox_group = user_roles_parser.add_mutually_exclusive_group()
user_roles_jukebox_group.add_argument(
"-J", "--jukebox", action="store_true", help="Grant jukebox rights"
)
user_roles_jukebox_group.add_argument(
"-j", "--nojukebox", action="store_true", help="Revoke jukebox rights"
)
user_pass_parser = user_subparsers.add_parser(
"changepass", help="Changes a user's password", add_help=False
)
user_pass_parser.add_argument(
"name", help="Name/login of the user to which change the password"
)
user_pass_parser.add_argument("password", nargs="?", help="New password")
user_rename_parser = user_subparsers.add_parser(
"rename", help="Rename a user", add_help=False
)
user_rename_parser.add_argument("name", help="Name of the user to rename")
user_rename_parser.add_argument("newname", help="New name for the user")
@db_session @user.command("list")
def user_list(self): @db_session
self.write_line("Name\t\tAdmin\tJukebox\tEmail") def user_list():
self.write_line("----\t\t-----\t-------\t-----") """Lists users."""
self.write_line(
"\n".join( click.echo("Name\t\tAdmin\tJukebox\tEmail")
"{: <16}{}\t{}\t{}".format( click.echo("----\t\t-----\t-------\t-----")
u.name, "*" if u.admin else "", "*" if u.jukebox else "", u.mail for u in User.select():
) click.echo(
for u in User.select() "{: <16}{}\t{}\t{}".format(
u.name, "*" if u.admin else "", "*" if u.jukebox else "", u.mail
) )
) )
def _ask_password(self): # pragma: nocover
password = getpass.getpass()
confirm = getpass.getpass("Confirm password: ")
if password != confirm:
raise ValueError("Passwords don't match")
return password
@db_session @user.command("add")
def user_add(self, name, password, email): @click.argument("name")
try: @click.password_option("-p", "--password", help="Specifies the user's password")
if not password: @click.option("-e", "--email", default="", help="Sets the user's email address")
password = self._ask_password() # pragma: nocover @db_session
UserManager.add(name, password, mail=email) def user_add(name, password, email):
except ValueError as e: """Adds a new user.
self.write_error_line(str(e))
@db_session NAME is the name (or login) of the new user.
def user_delete(self, name): """
try:
UserManager.delete_by_name(name)
self.write_line("Deleted user '{}'".format(name))
except ObjectNotFound as e:
self.write_error_line(str(e))
@db_session try:
def user_setroles(self, name, admin, noadmin, jukebox, nojukebox): UserManager.add(name, password, mail=email)
user = User.get(name=name) except ValueError as e:
if user is None: raise ClickException(str(e)) from e
self.write_error_line("No such user")
else:
if admin:
user.admin = True
self.write_line("Granted '{}' admin rights".format(name))
elif noadmin:
user.admin = False
self.write_line("Revoked '{}' admin rights".format(name))
if jukebox:
user.jukebox = True
self.write_line("Granted '{}' jukebox rights".format(name))
elif nojukebox:
user.jukebox = False
self.write_line("Revoked '{}' jukebox rights".format(name))
@db_session
def user_changepass(self, name, password):
try:
if not password:
password = self._ask_password() # pragma: nocover
UserManager.change_password2(name, password)
self.write_line("Successfully changed '{}' password".format(name))
except ObjectNotFound as e:
self.write_error_line(str(e))
@db_session @user.command("delete")
def user_rename(self, name, newname): @click.argument("name")
if not name or not newname: @db_session
self.write_error_line("Missing user current name or new name") def user_delete(name):
return """Deletes a user.
if name == newname: NAME is the name of the user to delete.
return """
user = User.get(name=name) try:
if user is None: UserManager.delete_by_name(name)
self.write_error_line("No such user") click.echo("Deleted user '{}'".format(name))
return except ObjectNotFound as e:
raise ClickException("User '{}' does not exist.".format(name)) from e
if User.get(name=newname) is not None:
self.write_error_line("This name is already taken")
return
user.name = newname def _echo_role_change(username, name, value):
self.write_line("User '{}' renamed to '{}'".format(name, newname)) click.echo(
"{} '{}' {} rights".format("Granted" if value else "Revoked", username, name)
)
@user.command("setroles")
@click.argument("name")
@click.option(
"-A/-a", "--admin/--noadmin", default=None, help="Grant or revoke admin rights"
)
@click.option(
"-J/-j",
"--jukebox/--nojukebox",
default=None,
help="Grant or revoke jukebox rights",
)
@db_session
def user_roles(name, admin, jukebox):
"""Enable/disable rights for a user.
NAME is the login of the user to which grant or revoke rights.
"""
user = User.get(name=name)
if user is None:
raise ClickException("No such user")
if admin is not None:
user.admin = admin
_echo_role_change(name, "admin", admin)
if jukebox is not None:
user.jukebox = jukebox
_echo_role_change(name, "jukebox", jukebox)
@user.command("changepass")
@click.argument("name")
@click.password_option("-p", "--password", help="New password")
@db_session
def user_changepass(name, password):
"""Changes a user's password.
NAME is the login of the user to which change the password.
"""
try:
UserManager.change_password2(name, password)
click.echo("Successfully changed '{}' password".format(name))
except ObjectNotFound as e:
raise ClickException("User '{}' does not exist.".format(name)) from e
@user.command("rename")
@click.argument("name")
@click.argument("newname")
@db_session
def user_rename(name, newname):
"""Renames a user.
User NAME will then be known as NEWNAME.
"""
if not name or not newname:
raise ClickException("Missing user current name or new name")
if name == newname:
return
user = User.get(name=name)
if user is None:
raise ClickException("No such user")
if User.get(name=newname) is not None:
raise ClickException("This name is already taken")
user.name = newname
click.echo("User '{}' renamed to '{}'".format(name, newname))
def main(): def main():
config = IniConfig.from_common_locations() config = IniConfig.from_common_locations()
init_database(config.BASE["database_uri"]) init_database(config.BASE["database_uri"])
cli.main(obj=config)
cli = SupysonicCLI(config)
if len(sys.argv) > 1:
cli.onecmd(" ".join(shlex.quote(arg) for arg in sys.argv[1:]))
else:
cli.cmdloop()
release_database() release_database()

View File

@ -159,7 +159,7 @@ class DaemonClient:
return c.recv().scanned return c.recv().scanned
def scan(self, folders=[], force=False): def scan(self, folders=[], force=False):
if not isinstance(folders, list): if not isinstance(folders, (list, tuple)):
raise TypeError("Expecting list, got " + str(type(folders))) raise TypeError("Expecting list, got " + str(type(folders)))
with self.__get_connection() as c: with self.__get_connection() as c:
c.send(ScannerStartCommand(folders, force)) c.send(ScannerStartCommand(folders, force))

View File

@ -10,37 +10,39 @@ import tempfile
import shlex import shlex
import unittest import unittest
from io import StringIO from click.testing import CliRunner
from pony.orm import db_session from pony.orm import db_session
from supysonic.db import Folder, User, init_database, release_database from supysonic.db import Folder, User, init_database, release_database
from supysonic.cli import SupysonicCLI from supysonic.cli import cli
from ..testbase import TestConfig from ..testbase import TestConfig
class CLITestCase(unittest.TestCase): class CLITestCase(unittest.TestCase):
""" Really basic tests. Some even don't check anything but are just there for coverage """ """Really basic tests. Some even don't check anything but are just there for coverage"""
def setUp(self): def setUp(self):
conf = TestConfig(False, False) self.__conf = TestConfig(False, False)
self.__db = tempfile.mkstemp() self.__db = tempfile.mkstemp()
conf.BASE["database_uri"] = "sqlite:///" + self.__db[1] self.__conf.BASE["database_uri"] = "sqlite:///" + self.__db[1]
init_database(conf.BASE["database_uri"]) init_database(self.__conf.BASE["database_uri"])
self.__stdout = StringIO() self.__runner = CliRunner()
self.__stderr = StringIO()
self.__cli = SupysonicCLI(conf, stdout=self.__stdout, stderr=self.__stderr)
def tearDown(self): def tearDown(self):
self.__stdout.close()
self.__stderr.close()
release_database() release_database()
os.close(self.__db[0]) os.close(self.__db[0])
os.remove(self.__db[1]) os.remove(self.__db[1])
def __add_folder(self, name, path): def __invoke(self, cmd, expect_fail=False):
self.__cli.onecmd("folder add {} {}".format(name, shlex.quote(path))) rv = self.__runner.invoke(cli, shlex.split(cmd), obj=self.__conf)
func = self.assertNotEqual if expect_fail else self.assertEqual
func(rv.exit_code, 0)
return rv
def __add_folder(self, name, path, expect_fail=False):
self.__invoke("folder add {} {}".format(name, shlex.quote(path)), expect_fail)
def test_folder_add(self): def test_folder_add(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
@ -54,10 +56,10 @@ class CLITestCase(unittest.TestCase):
def test_folder_add_errors(self): def test_folder_add_errors(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("f1", d) self.__add_folder("f1", d)
self.__add_folder("f2", d) self.__add_folder("f2", d, True)
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("f1", d) self.__add_folder("f1", d, True)
self.__cli.onecmd("folder add f3 /invalid/path") self.__invoke("folder add f3 /invalid/path", True)
with db_session: with db_session:
self.assertEqual(Folder.select().count(), 1) self.assertEqual(Folder.select().count(), 1)
@ -65,8 +67,8 @@ class CLITestCase(unittest.TestCase):
def test_folder_delete(self): def test_folder_delete(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d) self.__add_folder("tmpfolder", d)
self.__cli.onecmd("folder delete randomfolder") self.__invoke("folder delete randomfolder", True)
self.__cli.onecmd("folder delete tmpfolder") self.__invoke("folder delete tmpfolder")
with db_session: with db_session:
self.assertEqual(Folder.select().count(), 0) self.assertEqual(Folder.select().count(), 0)
@ -74,94 +76,88 @@ class CLITestCase(unittest.TestCase):
def test_folder_list(self): def test_folder_list(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d) self.__add_folder("tmpfolder", d)
self.__cli.onecmd("folder list") rv = self.__invoke("folder list")
self.assertIn("tmpfolder", self.__stdout.getvalue()) self.assertIn("tmpfolder", rv.output)
self.assertIn(d, self.__stdout.getvalue()) self.assertIn(d, rv.output)
def test_folder_scan(self): def test_folder_scan(self):
with tempfile.TemporaryDirectory() as d: with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d) self.__add_folder("tmpfolder", d)
with tempfile.NamedTemporaryFile(dir=d): with tempfile.NamedTemporaryFile(dir=d):
self.__cli.onecmd("folder scan") self.__invoke("folder scan")
self.__cli.onecmd("folder scan tmpfolder nonexistent") self.__invoke("folder scan tmpfolder nonexistent")
def test_user_add(self): def test_user_add(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user add -p alice alice") self.__invoke("user add -p alice alice", True)
with db_session: with db_session:
self.assertEqual(User.select().count(), 1) self.assertEqual(User.select().count(), 1)
def test_user_delete(self): def test_user_delete(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user delete alice") self.__invoke("user delete alice")
self.__cli.onecmd("user delete bob") self.__invoke("user delete bob", True)
with db_session: with db_session:
self.assertEqual(User.select().count(), 0) self.assertEqual(User.select().count(), 0)
def test_user_list(self): def test_user_list(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user list") rv = self.__invoke("user list")
self.assertIn("alice", self.__stdout.getvalue()) self.assertIn("alice", rv.output)
def test_user_setadmin(self): def test_user_setadmin(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user setroles -A alice") self.__invoke("user setroles -A alice")
self.__cli.onecmd("user setroles -A bob") self.__invoke("user setroles -A bob", True)
with db_session: with db_session:
self.assertTrue(User.get(name="alice").admin) self.assertTrue(User.get(name="alice").admin)
def test_user_unsetadmin(self): def test_user_unsetadmin(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user setroles -A alice") self.__invoke("user setroles -A alice")
self.__cli.onecmd("user setroles -a alice") self.__invoke("user setroles -a alice")
with db_session: with db_session:
self.assertFalse(User.get(name="alice").admin) self.assertFalse(User.get(name="alice").admin)
def test_user_setjukebox(self): def test_user_setjukebox(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user setroles -J alice") self.__invoke("user setroles -J alice")
with db_session: with db_session:
self.assertTrue(User.get(name="alice").jukebox) self.assertTrue(User.get(name="alice").jukebox)
def test_user_unsetjukebox(self): def test_user_unsetjukebox(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user setroles -J alice") self.__invoke("user setroles -J alice")
self.__cli.onecmd("user setroles -j alice") self.__invoke("user setroles -j alice")
with db_session: with db_session:
self.assertFalse(User.get(name="alice").jukebox) self.assertFalse(User.get(name="alice").jukebox)
def test_user_changepass(self): def test_user_changepass(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user changepass alice newpass") self.__invoke("user changepass alice -p newpass")
self.__cli.onecmd("user changepass bob B0b") self.__invoke("user changepass bob -p B0b", True)
def test_user_rename(self): def test_user_rename(self):
self.__cli.onecmd("user add -p Alic3 alice") self.__invoke("user add -p Alic3 alice")
self.__cli.onecmd("user rename alice alice") self.__invoke("user rename alice alice")
self.__cli.onecmd("user rename bob charles") self.__invoke("user rename bob charles", True)
self.__cli.onecmd("user rename alice ''") self.__invoke("user rename alice ''", True)
with db_session: with db_session:
self.assertEqual(User.select().first().name, "alice") self.assertEqual(User.select().first().name, "alice")
self.__cli.onecmd("user rename alice bob") self.__invoke("user rename alice bob")
with db_session: with db_session:
self.assertEqual(User.select().first().name, "bob") self.assertEqual(User.select().first().name, "bob")
self.__cli.onecmd("user add -p Ch4rl3s charles") self.__invoke("user add -p Ch4rl3s charles")
self.__cli.onecmd("user rename bob charles") self.__invoke("user rename bob charles", True)
with db_session: with db_session:
self.assertEqual(User.select(lambda u: u.name == "bob").count(), 1) self.assertEqual(User.select(lambda u: u.name == "bob").count(), 1)
self.assertEqual(User.select(lambda u: u.name == "charles").count(), 1) self.assertEqual(User.select(lambda u: u.name == "charles").count(), 1)
def test_other(self):
self.assertTrue(self.__cli.do_EOF(""))
self.__cli.onecmd("unknown command")
self.__cli.postloop()
self.__cli.completedefault("user", "user", 4, 4)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()