#!/usr/bin/env python # coding: utf-8 # This file is part of Supysonic. # # Supysonic is a Python implementation of the Subsonic server API. # Copyright (C) 2013, 2014 Alban 'spl0k' FĂ©ron # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . import sys, cmd, argparse, getpass, time from supysonic import config from supysonic.db import get_store, Folder, User from supysonic.managers.folder import FolderManager from supysonic.managers.user import UserManager from supysonic.scanner import Scanner class CLIParser(argparse.ArgumentParser): def error(self, message): self.print_usage(sys.stderr) raise RuntimeError(message) class CLI(cmd.Cmd): prompt = "supysonic> " def _make_do(self, command): def method(obj, line): try: args = getattr(obj, command + '_parser').parse_args(line.split()) except RuntimeError, e: print >>sys.stderr, e return if hasattr(obj.__class__, command + '_subparsers'): try: func = getattr(obj, '{}_{}'.format(command, args.action)) except AttributeError: return obj.default(line) return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' }) else: try: func = getattr(obj, command) except AttributeError: return obj.default(line) return func(**vars(args)) return method def __init__(self, store): cmd.Cmd.__init__(self) # Generate do_* and help_* methods for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)): command = parser_name[:-7] if not hasattr(self.__class__, 'do_' + command): setattr(self.__class__, 'do_' + command, self._make_do(command)) if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command): setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help) if hasattr(self.__class__, command + '_subparsers'): for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems(): setattr(self, 'help_{} {}'.format(command, action), subparser.print_help) self.__store = store def do_EOF(self, line): return True do_exit = do_EOF def default(self, line): print 'Unknown command %s' % line.split()[0] self.do_help(None) def postloop(self): print def completedefault(self, text, line, begidx, endidx): command = line.split()[0] parsers = getattr(self.__class__, command + '_subparsers', None) if not parsers: return [] num_words = len(line[len(command):begidx].split()) if num_words == 0: return [ a for a in parsers.choices.keys() if a.startswith(text) ] return [] folder_parser = CLIParser(prog = 'folder', add_help = False) folder_subparsers = folder_parser.add_subparsers(dest = 'action') folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False) folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False) folder_add_parser.add_argument('name', help = 'Name of the folder to add') folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder') folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False) folder_del_parser.add_argument('name', help = 'Name of the folder to delete') folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False) folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned') folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed") def folder_list(self): print 'Name\t\tPath\n----\t\t----' print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True)) def folder_add(self, name, path): ret = FolderManager.add(self.__store, name, path) if ret != FolderManager.SUCCESS: print FolderManager.error_str(ret) else: print "Folder '{}' added".format(name) def folder_delete(self, name): ret = FolderManager.delete_by_name(self.__store, name) if ret != FolderManager.SUCCESS: print FolderManager.error_str(ret) else: print "Deleted folder '{}'".format(name) def folder_scan(self, folders, force): class TimedProgressDisplay: def __init__(self, name, interval = 5): self.__name = name self.__interval = interval self.__last_display = 0 self.__last_len = 0 def __call__(self, scanned, total): if time.time() - self.__last_display > self.__interval or scanned == total: if not self.__last_len: sys.stdout.write("Scanning '{0}': ".format(self.__name)) progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total) sys.stdout.write('\b' * self.__last_len) sys.stdout.write(progress) sys.stdout.flush() self.__last_len = len(progress) self.__last_display = time.time() scanner = Scanner(self.__store, force) if folders: folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders) if any(map(lambda f: isinstance(f, basestring), folders)): print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring)) for folder in filter(lambda f: isinstance(f, Folder), folders): scanner.scan(folder, TimedProgressDisplay(folder.name)) else: for folder in self.__store.find(Folder, Folder.root == True): scanner.scan(folder, TimedProgressDisplay(folder.name)) scanner.finish() added, deleted = scanner.stats() self.__store.commit() print print "Scanning done" print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2]) print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2]) user_parser = CLIParser(prog = 'user', add_help = False) user_subparsers = user_parser.add_subparsers(dest = 'action') user_subparsers.add_parser('list', help = 'List users', add_help = False) user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False) user_add_parser.add_argument('name', help = 'Name/login of the user to add') user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user') user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password") user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address") user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False) user_del_parser.add_argument('name', help = 'Name/login of the user to delete') user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False) user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights') user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise') user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False) user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password') user_pass_parser.add_argument('password', nargs = '?', help = 'New password') def user_list(self): print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----' print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User)) def user_add(self, name, admin, password, email): if not password: password = getpass.getpass() confirm = getpass.getpass('Confirm password: ') if password != confirm: print >>sys.stderr, "Passwords don't match" return status = UserManager.add(self.__store, name, password, email, admin) if status != UserManager.SUCCESS: print >>sys.stderr, UserManager.error_str(status) def user_delete(self, name): user = self.__store.find(User, User.name == name).one() if not user: print >>sys.stderr, 'No such user' else: self.__store.remove(user) self.__store.commit() print "User '{}' deleted".format(name) def user_setadmin(self, name, off): user = self.__store.find(User, User.name == name).one() if not user: print >>sys.stderr, 'No such user' else: user.admin = not off self.__store.commit() print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name) def user_changepass(self, name, password): if not password: password = getpass.getpass() confirm = getpass.getpass('Confirm password: ') if password != confirm: print >>sys.stderr, "Passwords don't match" return status = UserManager.change_password2(self.__store, name, password) if status != UserManager.SUCCESS: print >>sys.stderr, UserManager.error_str(status) else: print "Successfully changed '{}' password".format(name) if __name__ == "__main__": if not config.check(): sys.exit(1) cli = CLI(get_store(config.get('base', 'database_uri'))) if len(sys.argv) > 1: cli.onecmd(' '.join(sys.argv[1:])) else: cli.cmdloop()