From f4bfc735e8d680508a7af7f39080ba034bd41881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alban=20F=C3=A9ron?= Date: Thu, 11 Nov 2021 16:47:37 +0100 Subject: [PATCH] Rewriting the CLI using click rather than cmd+argparse --- docs/man/supysonic-cli-folder.rst | 5 + docs/man/supysonic-cli-user.rst | 9 +- docs/man/supysonic-cli.rst | 20 +- supysonic/cli.py | 639 +++++++++++++----------------- supysonic/daemon/client.py | 2 +- tests/base/test_cli.py | 112 +++--- 6 files changed, 355 insertions(+), 432 deletions(-) diff --git a/docs/man/supysonic-cli-folder.rst b/docs/man/supysonic-cli-folder.rst index d41dca0..66f68cf 100644 --- a/docs/man/supysonic-cli-folder.rst +++ b/docs/man/supysonic-cli-folder.rst @@ -13,6 +13,7 @@ Supysonic folder management commands Synopsis ======== +| ``supysonic-cli folder --help`` | ``supysonic-cli folder list`` | ``supysonic-cli folder add`` `name` `path` | ``supysonic-cli folder delete`` `name` @@ -43,6 +44,10 @@ audio files are located. This allows to list, add, delete and scan the folders. Options ======= +-h, --help + Shows help and exits. Depending on where this option appears it will either list the + available commands or display help for a specific command. + -f, --force Force scan of already known files even if they haven't changed. Might be useful if an update to Supysonic adds new metadata to audio files. diff --git a/docs/man/supysonic-cli-user.rst b/docs/man/supysonic-cli-user.rst index 3803d09..e08af6f 100644 --- a/docs/man/supysonic-cli-user.rst +++ b/docs/man/supysonic-cli-user.rst @@ -13,10 +13,11 @@ Supysonic user management commands Synopsis ======== +| ``supysonic-cli user --help`` | ``supysonic-cli user list`` | ``supysonic-cli user add`` `user` [``--password`` `password`] [``--email`` `email`] | ``supysonic-cli user delete`` `user` -| ``supysonic-cli user changepass`` `user` `password` +| ``supysonic-cli user changepass`` `user` [``--password`` `password`] | ``supysonic-cli user setroles`` [``--admin``\|\ ``--noadmin``] [``--jukebox``\|\ ``--nojukebox``] `user` | ``supysonic-cli user rename`` `user` `newname` @@ -36,7 +37,7 @@ a new user, delete an existing user, and change their password or roles. ``supysonic-cli user delete`` `user` Delete the user `user`. -``supysonic-cli user changepass`` `user` [`password`] +``supysonic-cli user changepass`` `user` [``--password`` `password`] Change the password of user `user`. Will prompt for the new password if not provided. @@ -49,6 +50,10 @@ a new user, delete an existing user, and change their password or roles. Options ======= +-h, --help + Shows help and exits. Depending on where this option appears it will either list the + available commands or display help for a specific command. + -p password, --password password Specify the user's password upon creation. diff --git a/docs/man/supysonic-cli.rst b/docs/man/supysonic-cli.rst index 05313b1..76cdbf2 100644 --- a/docs/man/supysonic-cli.rst +++ b/docs/man/supysonic-cli.rst @@ -13,8 +13,8 @@ Supysonic management command line interface Synopsis ======== +| ``supysonic-cli --help`` | ``supysonic-cli`` [`subcommand`] -| ``supysonic-cli help`` [`subcommand`] Description =========== @@ -35,18 +35,20 @@ The "Subsonic API" is a set of adhoc standards to browse, stream or download a music collection over HTTP. The command-line interface is an interface allowing administration operations -without the use of the web interface. If ran without arguments, -``supysonic-cli`` will open an interactive prompt, with arguments it will run -a single command and exit. +without the use of the web interface. + +Options +======= + +-h, --help + Shows the help and exits. At top level it only lists the subcommands. To + display the help of a specific subcommand, add the ``--help`` flag *after* + the said subcommand name. Subcommands =========== -``supysonic-cli`` has three different subcommands: - -``help`` [`subcommand`] - When used without argument, displays the list of available subcommands. With - an argument, shows the help and arguments for the given subcommand. +``supysonic-cli`` has two different subcommands: ``user`` `args` ... User management commands diff --git a/supysonic/cli.py b/supysonic/cli.py index 7c8b292..d22b148 100644 --- a/supysonic/cli.py +++ b/supysonic/cli.py @@ -5,15 +5,12 @@ # # Distributed under terms of the GNU AGPLv3 license. -import argparse -import cmd -import getpass -import shlex -import sys +import click import time +from click.exceptions import ClickException from pony.orm import db_session, select -from pony.orm import ObjectNotFound +from pony.orm.core import ObjectNotFound from .config import IniConfig from .daemon.client import DaemonClient @@ -25,8 +22,8 @@ from .scanner import Scanner class TimedProgressDisplay: - def __init__(self, stdout, interval=5): - self.__stdout = stdout + def __init__(self, interval=5): + self.__stdout = click.get_text_stream("stdout") self.__interval = interval self.__last_display = 0 self.__last_len = 0 @@ -42,415 +39,333 @@ class TimedProgressDisplay: self.__last_display = time.time() -class CLIParser(argparse.ArgumentParser): - def error(self, message): - self.print_usage(sys.stderr) - raise RuntimeError(message) +@click.group() +def cli(): + """Supysonic management command line interface""" + pass -class SupysonicCLI(cmd.Cmd): - prompt = "supysonic> " +@cli.group() +def folder(): + """Folder management commands""" + pass - def _make_do(self, command): - def method(obj, line): - try: - args = getattr(obj, command + "_parser").parse_args(shlex.split(line)) - except RuntimeError as e: - self.write_error_line(str(e)) - return - if hasattr(obj.__class__, command + "_subparsers"): - try: - func = getattr(obj, "{}_{}".format(command, args.action)) - except AttributeError: - return obj.default(line) - return func( - **{key: vars(args)[key] for key in vars(args) if key != "action"} - ) - else: - try: - func = getattr(obj, command) - except AttributeError: - return obj.default(line) - return func(**vars(args)) +@folder.command("list") +@db_session +def folder_list(): + """Lists folders.""" - return method + click.echo("Name\t\tPath\n----\t\t----") + for f in Folder.select(lambda f: f.root): + click.echo("{: <16}{}".format(f.name, f.path)) - def __init__(self, config, stderr=None, *args, **kwargs): - cmd.Cmd.__init__(self, *args, **kwargs) - if stderr is not None: - self.stderr = stderr - else: - self.stderr = sys.stderr +@folder.command("add") +@click.argument("name") +@click.argument( + "path", + type=click.Path(exists=True, file_okay=False, dir_okay=True, resolve_path=True), +) +@db_session +def folder_add(name, path): + """Adds a folder. - self.__config = config - self.__daemon = DaemonClient(config.DAEMON["socket"]) + NAME can be anything but must be unique. + PATH must point to an existing readable directory on the filesystem. - # Generate do_* and help_* methods - for parser_name in filter( - lambda attr: attr.endswith("_parser") and "_" not in attr[:-7], - dir(self.__class__), - ): - command = parser_name[:-7] + If the daemon is running it will start to listen for changes in this folder but will + not scan files already present in the folder. + """ - if not hasattr(self.__class__, "do_" + command): - setattr(self.__class__, "do_" + command, self._make_do(command)) + try: + FolderManager.add(name, path) + click.echo("Folder '{}' added".format(name)) + except ValueError as e: + raise ClickException(str(e)) from e - if hasattr(self.__class__, "do_" + command) and not hasattr( - self.__class__, "help_" + command - ): - setattr( - self.__class__, - "help_" + command, - getattr(self.__class__, parser_name).print_help, - ) - if hasattr(self.__class__, command + "_subparsers"): - for action, subparser in getattr( - self.__class__, command + "_subparsers" - ).choices.items(): - setattr( - self, "help_{} {}".format(command, action), subparser.print_help - ) - def write_line(self, line=""): - self.stdout.write(line + "\n") +@folder.command("delete") +@click.argument("name") +@db_session +def folder_delete(name): + """Deletes a folder. - def write_error_line(self, line=""): - self.stderr.write(line + "\n") + NAME is the name of the folder to delete. + """ - def do_EOF(self, line): - return True + try: + FolderManager.delete_by_name(name) + click.echo("Deleted folder '{}'".format(name)) + except ObjectNotFound as e: + raise ClickException("Folder '{}' does not exist.".format(name)) from e - do_exit = do_EOF - def default(self, line): - self.write_line("Unknown command %s" % line.split()[0]) - self.do_help(None) +@folder.command("scan") +@click.argument( + "folder", + nargs=-1, +) +@click.option( + "-f", + "--force", + is_flag=True, + default=False, + help="Force scan of already known files even if they haven't changed", +) +@click.option( + "--background", + "mode", + flag_value="background", + help="Scan the folder(s) in the background. Requires the daemon to be running.", +) +@click.option( + "--foreground", + "mode", + flag_value="foreground", + help="Scan the folder(s) in the foreground, blocking the processus while the scan is running.", +) +@click.pass_obj +def folder_scan(config, folder, force, mode): + """Run a scan on specified folders. - def postloop(self): - self.write_line() + FOLDER is the name of the folder to scan. Multiple can be specified. If ommitted, + all folders are scanned. + """ - def completedefault(self, text, line, begidx, endidx): - command = line.split()[0] - parsers = getattr(self.__class__, command + "_subparsers", None) - if not parsers: - return [] + daemon = DaemonClient(config.DAEMON["socket"]) - num_words = len(line[len(command) : begidx].split()) - if num_words == 0: - return [a for a in parsers.choices if a.startswith(text)] - return [] + # quick and dirty shorthand calls + scan_bg = lambda: daemon.scan(folder, force) + scan_fg = lambda: _folder_scan_foreground(config, daemon, folder, force) - folder_parser = CLIParser(prog="folder", add_help=False) - folder_subparsers = folder_parser.add_subparsers(dest="action") - folder_subparsers.add_parser("list", help="Lists folders", add_help=False) - folder_add_parser = folder_subparsers.add_parser( - "add", help="Adds a folder", add_help=False - ) - folder_add_parser.add_argument("name", help="Name of the folder to add") - folder_add_parser.add_argument( - "path", help="Path to the directory pointed by the folder" - ) - folder_del_parser = folder_subparsers.add_parser( - "delete", help="Deletes a folder", add_help=False - ) - folder_del_parser.add_argument("name", help="Name of the folder to delete") - folder_scan_parser = folder_subparsers.add_parser( - "scan", help="Run a scan on specified folders", add_help=False - ) - folder_scan_parser.add_argument( - "folders", - metavar="folder", - nargs="*", - help="Folder(s) to be scanned. If ommitted, all folders are scanned", - ) - folder_scan_parser.add_argument( - "-f", - "--force", - action="store_true", - help="Force scan of already know files even if they haven't changed", - ) - folder_scan_target_group = folder_scan_parser.add_mutually_exclusive_group() - folder_scan_target_group.add_argument( - "--background", - action="store_true", - help="Scan the folder(s) in the background. Requires the daemon to be running.", - ) - folder_scan_target_group.add_argument( - "--foreground", - action="store_true", - help="Scan the folder(s) in the foreground, blocking the processus while the scan is running.", - ) - - @db_session - def folder_list(self): - self.write_line("Name\t\tPath\n----\t\t----") - self.write_line( - "\n".join( - "{: <16}{}".format(f.name, f.path) - for f in Folder.select(lambda f: f.root) + auto = not mode + if auto: + try: + scan_bg() + except DaemonUnavailableError: + click.echo( + "Couldn't connect to the daemon, scanning in foreground", err=True ) - ) - - @db_session - def folder_add(self, name, path): + scan_fg() + elif mode == "background": try: - FolderManager.add(name, path) - self.write_line("Folder '{}' added".format(name)) - except ValueError as e: - self.write_error_line(str(e)) + scan_bg() + except DaemonUnavailableError as e: + raise ClickException( + "Couldn't connect to the daemon, please use the '--foreground' option", + ) from e + elif mode == "foreground": + scan_fg() - @db_session - def folder_delete(self, name): + +def _folder_scan_foreground(config, daemon, folders, force): + try: + progress = daemon.get_scanning_progress() + if progress is not None: + raise ClickException( + "The daemon is currently scanning, can't start a scan now" + ) + except DaemonUnavailableError: + pass + + extensions = config.BASE["scanner_extensions"] + if extensions: + extensions = extensions.split(" ") + + def unwatch_folder(folder): try: - FolderManager.delete_by_name(name) - self.write_line("Deleted folder '{}'".format(name)) - except ObjectNotFound as e: - self.write_error_line(str(e)) - - def folder_scan(self, folders, force, background, foreground): - auto = not background and not foreground - if auto: - try: - self.__folder_scan_background(folders, force) - except DaemonUnavailableError: - self.write_error_line( - "Couldn't connect to the daemon, scanning in foreground" - ) - self.__folder_scan_foreground(folders, force) - elif background: - try: - self.__folder_scan_background(folders, force) - except DaemonUnavailableError: - self.write_error_line( - "Couldn't connect to the daemon, please use the '--foreground' option" - ) - elif foreground: - self.__folder_scan_foreground(folders, force) - - def __folder_scan_background(self, folders, force): - self.__daemon.scan(folders, force) - - def __folder_scan_foreground(self, folders, force): - try: - progress = self.__daemon.get_scanning_progress() - if progress is not None: - self.write_error_line( - "The daemon is currently scanning, can't start a scan now" - ) - return + daemon.remove_watched_folder(folder.path) except DaemonUnavailableError: pass - extensions = self.__config.BASE["scanner_extensions"] - if extensions: - extensions = extensions.split(" ") + def watch_folder(folder): + try: + daemon.add_watched_folder(folder.path) + except DaemonUnavailableError: + pass - scanner = Scanner( - force=force, - extensions=extensions, - follow_symlinks=self.__config.BASE["follow_symlinks"], - progress=TimedProgressDisplay(self.stdout), - on_folder_start=self.__unwatch_folder, - on_folder_end=self.__watch_folder, - ) + scanner = Scanner( + force=force, + extensions=extensions, + follow_symlinks=config.BASE["follow_symlinks"], + progress=TimedProgressDisplay(), + on_folder_start=unwatch_folder, + on_folder_end=watch_folder, + ) - if folders: - fstrs = folders - with db_session: - folders = select(f.name for f in Folder if f.root and f.name in fstrs)[ - : - ] - notfound = set(fstrs) - set(folders) - if notfound: - self.write_line("No such folder(s): " + " ".join(notfound)) - for folder in folders: + if folders: + fstrs = folders + with db_session: + folders = select(f.name for f in Folder if f.root and f.name in fstrs)[:] + notfound = set(fstrs) - set(folders) + if notfound: + click.echo("No such folder(s): " + " ".join(notfound)) + for folder in folders: + scanner.queue_folder(folder) + else: + with db_session: + for folder in select(f.name for f in Folder if f.root): scanner.queue_folder(folder) - else: - with db_session: - for folder in select(f.name for f in Folder if f.root): - scanner.queue_folder(folder) - scanner.run() - stats = scanner.stats() + scanner.run() + stats = scanner.stats() - self.write_line("\nScanning done") - self.write_line( - "Added: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( - stats.added - ) + click.echo("\nScanning done") + click.echo( + "Added: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( + stats.added ) - self.write_line( - "Deleted: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( - stats.deleted - ) + ) + click.echo( + "Deleted: {0.artists} artists, {0.albums} albums, {0.tracks} tracks".format( + stats.deleted ) - if stats.errors: - self.write_line("Errors in:") - for err in stats.errors: - self.write_line("- " + err) + ) + if stats.errors: + click.echo("Errors in:") + for err in stats.errors: + click.echo("- " + err) - def __unwatch_folder(self, folder): - try: - self.__daemon.remove_watched_folder(folder.path) - except DaemonUnavailableError: - pass - def __watch_folder(self, folder): - try: - self.__daemon.add_watched_folder(folder.path) - except DaemonUnavailableError: - pass +@cli.group("user") +def user(): + """User management commands""" + pass - user_parser = CLIParser(prog="user", add_help=False) - user_subparsers = user_parser.add_subparsers(dest="action") - user_subparsers.add_parser("list", help="List users", add_help=False) - user_add_parser = user_subparsers.add_parser( - "add", help="Adds a user", add_help=False - ) - user_add_parser.add_argument("name", help="Name/login of the user to add") - user_add_parser.add_argument( - "-p", "--password", help="Specifies the user's password" - ) - user_add_parser.add_argument( - "-e", "--email", default="", help="Sets the user's email address" - ) - user_del_parser = user_subparsers.add_parser( - "delete", help="Deletes a user", add_help=False - ) - user_del_parser.add_argument("name", help="Name/login of the user to delete") - user_roles_parser = user_subparsers.add_parser( - "setroles", help="Enable/disable rights for a user", add_help=False - ) - user_roles_parser.add_argument( - "name", help="Name/login of the user to grant/revoke admin rights" - ) - user_roles_admin_group = user_roles_parser.add_mutually_exclusive_group() - user_roles_admin_group.add_argument( - "-A", "--admin", action="store_true", help="Grant admin rights" - ) - user_roles_admin_group.add_argument( - "-a", "--noadmin", action="store_true", help="Revoke admin rights" - ) - user_roles_jukebox_group = user_roles_parser.add_mutually_exclusive_group() - user_roles_jukebox_group.add_argument( - "-J", "--jukebox", action="store_true", help="Grant jukebox rights" - ) - user_roles_jukebox_group.add_argument( - "-j", "--nojukebox", action="store_true", help="Revoke jukebox rights" - ) - user_pass_parser = user_subparsers.add_parser( - "changepass", help="Changes a user's password", add_help=False - ) - user_pass_parser.add_argument( - "name", help="Name/login of the user to which change the password" - ) - user_pass_parser.add_argument("password", nargs="?", help="New password") - user_rename_parser = user_subparsers.add_parser( - "rename", help="Rename a user", add_help=False - ) - user_rename_parser.add_argument("name", help="Name of the user to rename") - user_rename_parser.add_argument("newname", help="New name for the user") - @db_session - def user_list(self): - self.write_line("Name\t\tAdmin\tJukebox\tEmail") - self.write_line("----\t\t-----\t-------\t-----") - self.write_line( - "\n".join( - "{: <16}{}\t{}\t{}".format( - u.name, "*" if u.admin else "", "*" if u.jukebox else "", u.mail - ) - for u in User.select() +@user.command("list") +@db_session +def user_list(): + """Lists users.""" + + click.echo("Name\t\tAdmin\tJukebox\tEmail") + click.echo("----\t\t-----\t-------\t-----") + for u in User.select(): + click.echo( + "{: <16}{}\t{}\t{}".format( + u.name, "*" if u.admin else "", "*" if u.jukebox else "", u.mail ) ) - def _ask_password(self): # pragma: nocover - password = getpass.getpass() - confirm = getpass.getpass("Confirm password: ") - if password != confirm: - raise ValueError("Passwords don't match") - return password - @db_session - def user_add(self, name, password, email): - try: - if not password: - password = self._ask_password() # pragma: nocover - UserManager.add(name, password, mail=email) - except ValueError as e: - self.write_error_line(str(e)) +@user.command("add") +@click.argument("name") +@click.password_option("-p", "--password", help="Specifies the user's password") +@click.option("-e", "--email", default="", help="Sets the user's email address") +@db_session +def user_add(name, password, email): + """Adds a new user. - @db_session - def user_delete(self, name): - try: - UserManager.delete_by_name(name) - self.write_line("Deleted user '{}'".format(name)) - except ObjectNotFound as e: - self.write_error_line(str(e)) + NAME is the name (or login) of the new user. + """ - @db_session - def user_setroles(self, name, admin, noadmin, jukebox, nojukebox): - user = User.get(name=name) - if user is None: - self.write_error_line("No such user") - else: - if admin: - user.admin = True - self.write_line("Granted '{}' admin rights".format(name)) - elif noadmin: - user.admin = False - self.write_line("Revoked '{}' admin rights".format(name)) - if jukebox: - user.jukebox = True - self.write_line("Granted '{}' jukebox rights".format(name)) - elif nojukebox: - user.jukebox = False - self.write_line("Revoked '{}' jukebox rights".format(name)) + try: + UserManager.add(name, password, mail=email) + except ValueError as e: + raise ClickException(str(e)) from e - @db_session - def user_changepass(self, name, password): - try: - if not password: - password = self._ask_password() # pragma: nocover - UserManager.change_password2(name, password) - self.write_line("Successfully changed '{}' password".format(name)) - except ObjectNotFound as e: - self.write_error_line(str(e)) - @db_session - def user_rename(self, name, newname): - if not name or not newname: - self.write_error_line("Missing user current name or new name") - return +@user.command("delete") +@click.argument("name") +@db_session +def user_delete(name): + """Deletes a user. - if name == newname: - return + NAME is the name of the user to delete. + """ - user = User.get(name=name) - if user is None: - self.write_error_line("No such user") - return + try: + UserManager.delete_by_name(name) + click.echo("Deleted user '{}'".format(name)) + except ObjectNotFound as e: + raise ClickException("User '{}' does not exist.".format(name)) from e - if User.get(name=newname) is not None: - self.write_error_line("This name is already taken") - return - user.name = newname - self.write_line("User '{}' renamed to '{}'".format(name, newname)) +def _echo_role_change(username, name, value): + click.echo( + "{} '{}' {} rights".format("Granted" if value else "Revoked", username, name) + ) + + +@user.command("setroles") +@click.argument("name") +@click.option( + "-A/-a", "--admin/--noadmin", default=None, help="Grant or revoke admin rights" +) +@click.option( + "-J/-j", + "--jukebox/--nojukebox", + default=None, + help="Grant or revoke jukebox rights", +) +@db_session +def user_roles(name, admin, jukebox): + """Enable/disable rights for a user. + + NAME is the login of the user to which grant or revoke rights. + """ + + user = User.get(name=name) + if user is None: + raise ClickException("No such user") + + if admin is not None: + user.admin = admin + _echo_role_change(name, "admin", admin) + if jukebox is not None: + user.jukebox = jukebox + _echo_role_change(name, "jukebox", jukebox) + + +@user.command("changepass") +@click.argument("name") +@click.password_option("-p", "--password", help="New password") +@db_session +def user_changepass(name, password): + """Changes a user's password. + + NAME is the login of the user to which change the password. + """ + + try: + UserManager.change_password2(name, password) + click.echo("Successfully changed '{}' password".format(name)) + except ObjectNotFound as e: + raise ClickException("User '{}' does not exist.".format(name)) from e + + +@user.command("rename") +@click.argument("name") +@click.argument("newname") +@db_session +def user_rename(name, newname): + """Renames a user. + + User NAME will then be known as NEWNAME. + """ + + if not name or not newname: + raise ClickException("Missing user current name or new name") + + if name == newname: + return + + user = User.get(name=name) + if user is None: + raise ClickException("No such user") + + if User.get(name=newname) is not None: + raise ClickException("This name is already taken") + + user.name = newname + click.echo("User '{}' renamed to '{}'".format(name, newname)) def main(): config = IniConfig.from_common_locations() init_database(config.BASE["database_uri"]) - - cli = SupysonicCLI(config) - if len(sys.argv) > 1: - cli.onecmd(" ".join(shlex.quote(arg) for arg in sys.argv[1:])) - else: - cli.cmdloop() - + cli.main(obj=config) release_database() diff --git a/supysonic/daemon/client.py b/supysonic/daemon/client.py index 46540b8..666a187 100644 --- a/supysonic/daemon/client.py +++ b/supysonic/daemon/client.py @@ -159,7 +159,7 @@ class DaemonClient: return c.recv().scanned def scan(self, folders=[], force=False): - if not isinstance(folders, list): + if not isinstance(folders, (list, tuple)): raise TypeError("Expecting list, got " + str(type(folders))) with self.__get_connection() as c: c.send(ScannerStartCommand(folders, force)) diff --git a/tests/base/test_cli.py b/tests/base/test_cli.py index f72ca3f..51e055f 100644 --- a/tests/base/test_cli.py +++ b/tests/base/test_cli.py @@ -10,37 +10,39 @@ import tempfile import shlex import unittest -from io import StringIO +from click.testing import CliRunner from pony.orm import db_session from supysonic.db import Folder, User, init_database, release_database -from supysonic.cli import SupysonicCLI +from supysonic.cli import cli from ..testbase import TestConfig class CLITestCase(unittest.TestCase): - """ Really basic tests. Some even don't check anything but are just there for coverage """ + """Really basic tests. Some even don't check anything but are just there for coverage""" def setUp(self): - conf = TestConfig(False, False) + self.__conf = TestConfig(False, False) self.__db = tempfile.mkstemp() - conf.BASE["database_uri"] = "sqlite:///" + self.__db[1] - init_database(conf.BASE["database_uri"]) + self.__conf.BASE["database_uri"] = "sqlite:///" + self.__db[1] + init_database(self.__conf.BASE["database_uri"]) - self.__stdout = StringIO() - self.__stderr = StringIO() - self.__cli = SupysonicCLI(conf, stdout=self.__stdout, stderr=self.__stderr) + self.__runner = CliRunner() def tearDown(self): - self.__stdout.close() - self.__stderr.close() release_database() os.close(self.__db[0]) os.remove(self.__db[1]) - def __add_folder(self, name, path): - self.__cli.onecmd("folder add {} {}".format(name, shlex.quote(path))) + def __invoke(self, cmd, expect_fail=False): + rv = self.__runner.invoke(cli, shlex.split(cmd), obj=self.__conf) + func = self.assertNotEqual if expect_fail else self.assertEqual + func(rv.exit_code, 0) + return rv + + def __add_folder(self, name, path, expect_fail=False): + self.__invoke("folder add {} {}".format(name, shlex.quote(path)), expect_fail) def test_folder_add(self): with tempfile.TemporaryDirectory() as d: @@ -54,10 +56,10 @@ class CLITestCase(unittest.TestCase): def test_folder_add_errors(self): with tempfile.TemporaryDirectory() as d: self.__add_folder("f1", d) - self.__add_folder("f2", d) + self.__add_folder("f2", d, True) with tempfile.TemporaryDirectory() as d: - self.__add_folder("f1", d) - self.__cli.onecmd("folder add f3 /invalid/path") + self.__add_folder("f1", d, True) + self.__invoke("folder add f3 /invalid/path", True) with db_session: self.assertEqual(Folder.select().count(), 1) @@ -65,8 +67,8 @@ class CLITestCase(unittest.TestCase): def test_folder_delete(self): with tempfile.TemporaryDirectory() as d: self.__add_folder("tmpfolder", d) - self.__cli.onecmd("folder delete randomfolder") - self.__cli.onecmd("folder delete tmpfolder") + self.__invoke("folder delete randomfolder", True) + self.__invoke("folder delete tmpfolder") with db_session: self.assertEqual(Folder.select().count(), 0) @@ -74,94 +76,88 @@ class CLITestCase(unittest.TestCase): def test_folder_list(self): with tempfile.TemporaryDirectory() as d: self.__add_folder("tmpfolder", d) - self.__cli.onecmd("folder list") - self.assertIn("tmpfolder", self.__stdout.getvalue()) - self.assertIn(d, self.__stdout.getvalue()) + rv = self.__invoke("folder list") + self.assertIn("tmpfolder", rv.output) + self.assertIn(d, rv.output) def test_folder_scan(self): with tempfile.TemporaryDirectory() as d: self.__add_folder("tmpfolder", d) with tempfile.NamedTemporaryFile(dir=d): - self.__cli.onecmd("folder scan") - self.__cli.onecmd("folder scan tmpfolder nonexistent") + self.__invoke("folder scan") + self.__invoke("folder scan tmpfolder nonexistent") def test_user_add(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user add -p alice alice") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user add -p alice alice", True) with db_session: self.assertEqual(User.select().count(), 1) def test_user_delete(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user delete alice") - self.__cli.onecmd("user delete bob") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user delete alice") + self.__invoke("user delete bob", True) with db_session: self.assertEqual(User.select().count(), 0) def test_user_list(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user list") - self.assertIn("alice", self.__stdout.getvalue()) + self.__invoke("user add -p Alic3 alice") + rv = self.__invoke("user list") + self.assertIn("alice", rv.output) def test_user_setadmin(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user setroles -A alice") - self.__cli.onecmd("user setroles -A bob") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user setroles -A alice") + self.__invoke("user setroles -A bob", True) with db_session: self.assertTrue(User.get(name="alice").admin) def test_user_unsetadmin(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user setroles -A alice") - self.__cli.onecmd("user setroles -a alice") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user setroles -A alice") + self.__invoke("user setroles -a alice") with db_session: self.assertFalse(User.get(name="alice").admin) def test_user_setjukebox(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user setroles -J alice") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user setroles -J alice") with db_session: self.assertTrue(User.get(name="alice").jukebox) def test_user_unsetjukebox(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user setroles -J alice") - self.__cli.onecmd("user setroles -j alice") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user setroles -J alice") + self.__invoke("user setroles -j alice") with db_session: self.assertFalse(User.get(name="alice").jukebox) def test_user_changepass(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user changepass alice newpass") - self.__cli.onecmd("user changepass bob B0b") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user changepass alice -p newpass") + self.__invoke("user changepass bob -p B0b", True) def test_user_rename(self): - self.__cli.onecmd("user add -p Alic3 alice") - self.__cli.onecmd("user rename alice alice") - self.__cli.onecmd("user rename bob charles") + self.__invoke("user add -p Alic3 alice") + self.__invoke("user rename alice alice") + self.__invoke("user rename bob charles", True) - self.__cli.onecmd("user rename alice ''") + self.__invoke("user rename alice ''", True) with db_session: self.assertEqual(User.select().first().name, "alice") - self.__cli.onecmd("user rename alice bob") + self.__invoke("user rename alice bob") with db_session: self.assertEqual(User.select().first().name, "bob") - self.__cli.onecmd("user add -p Ch4rl3s charles") - self.__cli.onecmd("user rename bob charles") + self.__invoke("user add -p Ch4rl3s charles") + self.__invoke("user rename bob charles", True) with db_session: self.assertEqual(User.select(lambda u: u.name == "bob").count(), 1) self.assertEqual(User.select(lambda u: u.name == "charles").count(), 1) - def test_other(self): - self.assertTrue(self.__cli.do_EOF("")) - self.__cli.onecmd("unknown command") - self.__cli.postloop() - self.__cli.completedefault("user", "user", 4, 4) - if __name__ == "__main__": unittest.main()