1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-23 01:16:18 +00:00

CLI: added ability to set stdin/stdout/stderr

This commit is contained in:
spl0k 2017-12-08 23:36:25 +01:00
parent 5236abf6ff
commit d3271e7174

View File

@ -19,13 +19,38 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, cmd, argparse, getpass, time import argparse
import cmd
import getpass
import sys
import time
from supysonic.db import get_store, Folder, User from supysonic.db import get_store, Folder, User
from supysonic.managers.folder import FolderManager from supysonic.managers.folder import FolderManager
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from supysonic.scanner import Scanner from supysonic.scanner import Scanner
class TimedProgressDisplay:
def __init__(self, name, stdout, interval = 5):
self.__name = name
self.__stdout = stdout
self.__interval = interval
self.__last_display = 0
self.__last_len = 0
def __call__(self, scanned, total):
if time.time() - self.__last_display > self.__interval or scanned == total:
if not self.__last_len:
self.__stdout.write("Scanning '{0}': ".format(self.__name))
progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
self.__stdout.write('\b' * self.__last_len)
self.__stdout.write(progress)
self.__stdout.flush()
self.__last_len = len(progress)
self.__last_display = time.time()
class CLIParser(argparse.ArgumentParser): class CLIParser(argparse.ArgumentParser):
def error(self, message): def error(self, message):
self.print_usage(sys.stderr) self.print_usage(sys.stderr)
@ -39,7 +64,7 @@ class SupysonicCLI(cmd.Cmd):
try: try:
args = getattr(obj, command + '_parser').parse_args(line.split()) args = getattr(obj, command + '_parser').parse_args(line.split())
except RuntimeError, e: except RuntimeError, e:
print >>sys.stderr, e self.write_error_line(str(e))
return return
if hasattr(obj.__class__, command + '_subparsers'): if hasattr(obj.__class__, command + '_subparsers'):
@ -57,8 +82,14 @@ class SupysonicCLI(cmd.Cmd):
return method return method
def __init__(self, config): def __init__(self, config, stderr=None, *args, **kwargs):
cmd.Cmd.__init__(self) cmd.Cmd.__init__(self, *args, **kwargs)
if stderr is not None:
self.stderr = stderr
else:
self.stderr = sys.stderr
self.__config = config self.__config = config
# Generate do_* and help_* methods # Generate do_* and help_* methods
@ -76,17 +107,23 @@ class SupysonicCLI(cmd.Cmd):
self.__store = get_store(config.BASE['database_uri']) self.__store = get_store(config.BASE['database_uri'])
def write_line(self, line = ''):
self.stdout.write(line + '\n')
def write_error_line(self, line = ''):
self.stderr.write(line + '\n')
def do_EOF(self, line): def do_EOF(self, line):
return True return True
do_exit = do_EOF do_exit = do_EOF
def default(self, line): def default(self, line):
print 'Unknown command %s' % line.split()[0] self.write_line('Unknown command %s' % line.split()[0])
self.do_help(None) self.do_help(None)
def postloop(self): def postloop(self):
print self.write_line()
def completedefault(self, text, line, begidx, endidx): def completedefault(self, text, line, begidx, endidx):
command = line.split()[0] command = line.split()[0]
@ -112,45 +149,24 @@ class SupysonicCLI(cmd.Cmd):
folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed") folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
def folder_list(self): def folder_list(self):
print 'Name\t\tPath\n----\t\t----' self.write_line('Name\t\tPath\n----\t\t----')
print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True)) self.write_line('\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True)))
def folder_add(self, name, path): def folder_add(self, name, path):
ret = FolderManager.add(self.__store, name, path) ret = FolderManager.add(self.__store, name, path)
if ret != FolderManager.SUCCESS: if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret) self.write_error_line(FolderManager.error_str(ret))
else: else:
print "Folder '{}' added".format(name) self.write_line("Folder '{}' added".format(name))
def folder_delete(self, name): def folder_delete(self, name):
ret = FolderManager.delete_by_name(self.__store, name) ret = FolderManager.delete_by_name(self.__store, name)
if ret != FolderManager.SUCCESS: if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret) self.write_error_line(FolderManager.error_str(ret))
else: else:
print "Deleted folder '{}'".format(name) self.write_line("Deleted folder '{}'".format(name))
def folder_scan(self, folders, force): def folder_scan(self, folders, force):
class TimedProgressDisplay:
def __init__(self, name, interval = 5):
self.__name = name
self.__interval = interval
self.__last_display = 0
self.__last_len = 0
def __call__(self, scanned, total):
if time.time() - self.__last_display > self.__interval or scanned == total:
if not self.__last_len:
sys.stdout.write("Scanning '{0}': ".format(self.__name))
progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
sys.stdout.write('\b' * self.__last_len)
sys.stdout.write(progress)
sys.stdout.flush()
self.__last_len = len(progress)
self.__last_display = time.time()
extensions = self.__config.BASE['scanner_extensions'] extensions = self.__config.BASE['scanner_extensions']
if extensions: if extensions:
extensions = extensions.split(' ') extensions = extensions.split(' ')
@ -158,21 +174,22 @@ class SupysonicCLI(cmd.Cmd):
if folders: if folders:
folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders) folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
if any(map(lambda f: isinstance(f, basestring), folders)): if any(map(lambda f: isinstance(f, basestring), folders)):
print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring)) self.write_line("No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring)))
for folder in filter(lambda f: isinstance(f, Folder), folders): for folder in filter(lambda f: isinstance(f, Folder), folders):
scanner.scan(folder, TimedProgressDisplay(folder.name)) scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout))
self.write_line()
else: else:
for folder in self.__store.find(Folder, Folder.root == True): for folder in self.__store.find(Folder, Folder.root == True):
scanner.scan(folder, TimedProgressDisplay(folder.name)) scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout))
self.write_line()
scanner.finish() scanner.finish()
added, deleted = scanner.stats() added, deleted = scanner.stats()
self.__store.commit() self.__store.commit()
print self.write_line("Scanning done")
print "Scanning done" self.write_line('Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2]))
print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2]) self.write_line('Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2]))
print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
user_parser = CLIParser(prog = 'user', add_help = False) user_parser = CLIParser(prog = 'user', add_help = False)
user_subparsers = user_parser.add_subparsers(dest = 'action') user_subparsers = user_parser.add_subparsers(dest = 'action')
@ -192,48 +209,48 @@ class SupysonicCLI(cmd.Cmd):
user_pass_parser.add_argument('password', nargs = '?', help = 'New password') user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
def user_list(self): def user_list(self):
print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----' self.write_line('Name\t\tAdmin\tEmail\n----\t\t-----\t-----')
print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User)) self.write_line('\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User)))
def user_add(self, name, admin, password, email): def user_add(self, name, admin, password, email):
if not password: if not password:
password = getpass.getpass() password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ') confirm = getpass.getpass('Confirm password: ')
if password != confirm: if password != confirm:
print >>sys.stderr, "Passwords don't match" self.write_error_line("Passwords don't match")
return return
status = UserManager.add(self.__store, name, password, email, admin) status = UserManager.add(self.__store, name, password, email, admin)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status) self.write_error_line(UserManager.error_str(status))
def user_delete(self, name): def user_delete(self, name):
user = self.__store.find(User, User.name == name).one() user = self.__store.find(User, User.name == name).one()
if not user: if not user:
print >>sys.stderr, 'No such user' self.write_error_line('No such user')
else: else:
self.__store.remove(user) self.__store.remove(user)
self.__store.commit() self.__store.commit()
print "User '{}' deleted".format(name) self.write_line("User '{}' deleted".format(name))
def user_setadmin(self, name, off): def user_setadmin(self, name, off):
user = self.__store.find(User, User.name == name).one() user = self.__store.find(User, User.name == name).one()
if not user: if not user:
print >>sys.stderr, 'No such user' self.write_error_line('No such user')
else: else:
user.admin = not off user.admin = not off
self.__store.commit() self.__store.commit()
print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name) self.write_line("{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name))
def user_changepass(self, name, password): def user_changepass(self, name, password):
if not password: if not password:
password = getpass.getpass() password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ') confirm = getpass.getpass('Confirm password: ')
if password != confirm: if password != confirm:
print >>sys.stderr, "Passwords don't match" self.write_error_line("Passwords don't match")
return return
status = UserManager.change_password2(self.__store, name, password) status = UserManager.change_password2(self.__store, name, password)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status) self.write_error_line(UserManager.error_str(status))
else: else:
print "Successfully changed '{}' password".format(name) self.write_line("Successfully changed '{}' password".format(name))