From d3271e7174f246b5f2fb03085aa0b9b1caab3442 Mon Sep 17 00:00:00 2001 From: spl0k Date: Fri, 8 Dec 2017 23:36:25 +0100 Subject: [PATCH] CLI: added ability to set stdin/stdout/stderr --- supysonic/cli.py | 119 +++++++++++++++++++++++++++-------------------- 1 file changed, 68 insertions(+), 51 deletions(-) diff --git a/supysonic/cli.py b/supysonic/cli.py index 0165132..75c2db6 100755 --- a/supysonic/cli.py +++ b/supysonic/cli.py @@ -19,13 +19,38 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import sys, cmd, argparse, getpass, time +import argparse +import cmd +import getpass +import sys +import time from supysonic.db import get_store, Folder, User from supysonic.managers.folder import FolderManager from supysonic.managers.user import UserManager from supysonic.scanner import Scanner +class TimedProgressDisplay: + def __init__(self, name, stdout, interval = 5): + self.__name = name + self.__stdout = stdout + self.__interval = interval + self.__last_display = 0 + self.__last_len = 0 + + def __call__(self, scanned, total): + if time.time() - self.__last_display > self.__interval or scanned == total: + if not self.__last_len: + self.__stdout.write("Scanning '{0}': ".format(self.__name)) + + progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total) + self.__stdout.write('\b' * self.__last_len) + self.__stdout.write(progress) + self.__stdout.flush() + + self.__last_len = len(progress) + self.__last_display = time.time() + class CLIParser(argparse.ArgumentParser): def error(self, message): self.print_usage(sys.stderr) @@ -39,7 +64,7 @@ class SupysonicCLI(cmd.Cmd): try: args = getattr(obj, command + '_parser').parse_args(line.split()) except RuntimeError, e: - print >>sys.stderr, e + self.write_error_line(str(e)) return if hasattr(obj.__class__, command + '_subparsers'): @@ -57,8 +82,14 @@ class SupysonicCLI(cmd.Cmd): return method - def __init__(self, config): - cmd.Cmd.__init__(self) + def __init__(self, config, stderr=None, *args, **kwargs): + cmd.Cmd.__init__(self, *args, **kwargs) + + if stderr is not None: + self.stderr = stderr + else: + self.stderr = sys.stderr + self.__config = config # Generate do_* and help_* methods @@ -76,17 +107,23 @@ class SupysonicCLI(cmd.Cmd): self.__store = get_store(config.BASE['database_uri']) + def write_line(self, line = ''): + self.stdout.write(line + '\n') + + def write_error_line(self, line = ''): + self.stderr.write(line + '\n') + def do_EOF(self, line): return True do_exit = do_EOF def default(self, line): - print 'Unknown command %s' % line.split()[0] + self.write_line('Unknown command %s' % line.split()[0]) self.do_help(None) def postloop(self): - print + self.write_line() def completedefault(self, text, line, begidx, endidx): command = line.split()[0] @@ -112,45 +149,24 @@ class SupysonicCLI(cmd.Cmd): folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed") def folder_list(self): - print 'Name\t\tPath\n----\t\t----' - print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True)) + self.write_line('Name\t\tPath\n----\t\t----') + self.write_line('\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))) def folder_add(self, name, path): ret = FolderManager.add(self.__store, name, path) if ret != FolderManager.SUCCESS: - print FolderManager.error_str(ret) + self.write_error_line(FolderManager.error_str(ret)) else: - print "Folder '{}' added".format(name) + self.write_line("Folder '{}' added".format(name)) def folder_delete(self, name): ret = FolderManager.delete_by_name(self.__store, name) if ret != FolderManager.SUCCESS: - print FolderManager.error_str(ret) + self.write_error_line(FolderManager.error_str(ret)) else: - print "Deleted folder '{}'".format(name) + self.write_line("Deleted folder '{}'".format(name)) def folder_scan(self, folders, force): - - class TimedProgressDisplay: - def __init__(self, name, interval = 5): - self.__name = name - self.__interval = interval - self.__last_display = 0 - self.__last_len = 0 - - def __call__(self, scanned, total): - if time.time() - self.__last_display > self.__interval or scanned == total: - if not self.__last_len: - sys.stdout.write("Scanning '{0}': ".format(self.__name)) - - progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total) - sys.stdout.write('\b' * self.__last_len) - sys.stdout.write(progress) - sys.stdout.flush() - - self.__last_len = len(progress) - self.__last_display = time.time() - extensions = self.__config.BASE['scanner_extensions'] if extensions: extensions = extensions.split(' ') @@ -158,21 +174,22 @@ class SupysonicCLI(cmd.Cmd): if folders: folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders) if any(map(lambda f: isinstance(f, basestring), folders)): - print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring)) + self.write_line("No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))) for folder in filter(lambda f: isinstance(f, Folder), folders): - scanner.scan(folder, TimedProgressDisplay(folder.name)) + scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout)) + self.write_line() else: for folder in self.__store.find(Folder, Folder.root == True): - scanner.scan(folder, TimedProgressDisplay(folder.name)) + scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout)) + self.write_line() scanner.finish() added, deleted = scanner.stats() self.__store.commit() - print - print "Scanning done" - print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2]) - print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2]) + self.write_line("Scanning done") + self.write_line('Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])) + self.write_line('Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])) user_parser = CLIParser(prog = 'user', add_help = False) user_subparsers = user_parser.add_subparsers(dest = 'action') @@ -192,48 +209,48 @@ class SupysonicCLI(cmd.Cmd): user_pass_parser.add_argument('password', nargs = '?', help = 'New password') def user_list(self): - print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----' - print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User)) + self.write_line('Name\t\tAdmin\tEmail\n----\t\t-----\t-----') + self.write_line('\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))) def user_add(self, name, admin, password, email): if not password: password = getpass.getpass() confirm = getpass.getpass('Confirm password: ') if password != confirm: - print >>sys.stderr, "Passwords don't match" + self.write_error_line("Passwords don't match") return status = UserManager.add(self.__store, name, password, email, admin) if status != UserManager.SUCCESS: - print >>sys.stderr, UserManager.error_str(status) + self.write_error_line(UserManager.error_str(status)) def user_delete(self, name): user = self.__store.find(User, User.name == name).one() if not user: - print >>sys.stderr, 'No such user' + self.write_error_line('No such user') else: self.__store.remove(user) self.__store.commit() - print "User '{}' deleted".format(name) + self.write_line("User '{}' deleted".format(name)) def user_setadmin(self, name, off): user = self.__store.find(User, User.name == name).one() if not user: - print >>sys.stderr, 'No such user' + self.write_error_line('No such user') else: user.admin = not off self.__store.commit() - print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name) + self.write_line("{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)) def user_changepass(self, name, password): if not password: password = getpass.getpass() confirm = getpass.getpass('Confirm password: ') if password != confirm: - print >>sys.stderr, "Passwords don't match" + self.write_error_line("Passwords don't match") return status = UserManager.change_password2(self.__store, name, password) if status != UserManager.SUCCESS: - print >>sys.stderr, UserManager.error_str(status) + self.write_error_line(UserManager.error_str(status)) else: - print "Successfully changed '{}' password".format(name) + self.write_line("Successfully changed '{}' password".format(name))