diff --git a/bin/supysonic-cli b/bin/supysonic-cli
index 13df019..0cce62b 100755
--- a/bin/supysonic-cli
+++ b/bin/supysonic-cli
@@ -2,246 +2,21 @@
# coding: utf-8
# This file is part of Supysonic.
-#
# Supysonic is a Python implementation of the Subsonic server API.
-# Copyright (C) 2013, 2014 Alban 'spl0k' Féron
#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# Copyright (C) 2017 Alban 'spl0k' Féron
#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
-#
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
+# Distributed under terms of the GNU AGPLv3 license.
-import sys, cmd, argparse, getpass, time
+import sys
+from supysonic.cli import SupysonicCLI
from supysonic.config import IniConfig
-from supysonic.db import get_store, Folder, User
-from supysonic.managers.folder import FolderManager
-from supysonic.managers.user import UserManager
-from supysonic.scanner import Scanner
-
-class CLIParser(argparse.ArgumentParser):
- def error(self, message):
- self.print_usage(sys.stderr)
- raise RuntimeError(message)
-
-class CLI(cmd.Cmd):
- prompt = "supysonic> "
-
- def _make_do(self, command):
- def method(obj, line):
- try:
- args = getattr(obj, command + '_parser').parse_args(line.split())
- except RuntimeError, e:
- print >>sys.stderr, e
- return
-
- if hasattr(obj.__class__, command + '_subparsers'):
- try:
- func = getattr(obj, '{}_{}'.format(command, args.action))
- except AttributeError:
- return obj.default(line)
- return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' })
- else:
- try:
- func = getattr(obj, command)
- except AttributeError:
- return obj.default(line)
- return func(**vars(args))
-
- return method
-
- def __init__(self, config):
- cmd.Cmd.__init__(self)
- self.__config = config
-
- # Generate do_* and help_* methods
- for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)):
- command = parser_name[:-7]
-
- if not hasattr(self.__class__, 'do_' + command):
- setattr(self.__class__, 'do_' + command, self._make_do(command))
-
- if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command):
- setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help)
- if hasattr(self.__class__, command + '_subparsers'):
- for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems():
- setattr(self, 'help_{} {}'.format(command, action), subparser.print_help)
-
- self.__store = get_store(config.BASE['database_uri'])
-
- def do_EOF(self, line):
- return True
-
- do_exit = do_EOF
-
- def default(self, line):
- print 'Unknown command %s' % line.split()[0]
- self.do_help(None)
-
- def postloop(self):
- print
-
- def completedefault(self, text, line, begidx, endidx):
- command = line.split()[0]
- parsers = getattr(self.__class__, command + '_subparsers', None)
- if not parsers:
- return []
-
- num_words = len(line[len(command):begidx].split())
- if num_words == 0:
- return [ a for a in parsers.choices.keys() if a.startswith(text) ]
- return []
-
- folder_parser = CLIParser(prog = 'folder', add_help = False)
- folder_subparsers = folder_parser.add_subparsers(dest = 'action')
- folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False)
- folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False)
- folder_add_parser.add_argument('name', help = 'Name of the folder to add')
- folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder')
- folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False)
- folder_del_parser.add_argument('name', help = 'Name of the folder to delete')
- folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False)
- folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
- folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
-
- def folder_list(self):
- print 'Name\t\tPath\n----\t\t----'
- print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))
-
- def folder_add(self, name, path):
- ret = FolderManager.add(self.__store, name, path)
- if ret != FolderManager.SUCCESS:
- print FolderManager.error_str(ret)
- else:
- print "Folder '{}' added".format(name)
-
- def folder_delete(self, name):
- ret = FolderManager.delete_by_name(self.__store, name)
- if ret != FolderManager.SUCCESS:
- print FolderManager.error_str(ret)
- else:
- print "Deleted folder '{}'".format(name)
-
- def folder_scan(self, folders, force):
-
- class TimedProgressDisplay:
- def __init__(self, name, interval = 5):
- self.__name = name
- self.__interval = interval
- self.__last_display = 0
- self.__last_len = 0
-
- def __call__(self, scanned, total):
- if time.time() - self.__last_display > self.__interval or scanned == total:
- if not self.__last_len:
- sys.stdout.write("Scanning '{0}': ".format(self.__name))
-
- progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
- sys.stdout.write('\b' * self.__last_len)
- sys.stdout.write(progress)
- sys.stdout.flush()
-
- self.__last_len = len(progress)
- self.__last_display = time.time()
-
- extensions = self.__config.BASE['scanner_extensions']
- if extensions:
- extensions = extensions.split(' ')
- scanner = Scanner(self.__store, force = force, extensions = extensions)
- if folders:
- folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
- if any(map(lambda f: isinstance(f, basestring), folders)):
- print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))
- for folder in filter(lambda f: isinstance(f, Folder), folders):
- scanner.scan(folder, TimedProgressDisplay(folder.name))
- else:
- for folder in self.__store.find(Folder, Folder.root == True):
- scanner.scan(folder, TimedProgressDisplay(folder.name))
-
- scanner.finish()
- added, deleted = scanner.stats()
- self.__store.commit()
-
- print
- print "Scanning done"
- print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])
- print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
-
- user_parser = CLIParser(prog = 'user', add_help = False)
- user_subparsers = user_parser.add_subparsers(dest = 'action')
- user_subparsers.add_parser('list', help = 'List users', add_help = False)
- user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False)
- user_add_parser.add_argument('name', help = 'Name/login of the user to add')
- user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user')
- user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password")
- user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address")
- user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False)
- user_del_parser.add_argument('name', help = 'Name/login of the user to delete')
- user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False)
- user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights')
- user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise')
- user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False)
- user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
- user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
-
- def user_list(self):
- print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----'
- print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))
-
- def user_add(self, name, admin, password, email):
- if not password:
- password = getpass.getpass()
- confirm = getpass.getpass('Confirm password: ')
- if password != confirm:
- print >>sys.stderr, "Passwords don't match"
- return
- status = UserManager.add(self.__store, name, password, email, admin)
- if status != UserManager.SUCCESS:
- print >>sys.stderr, UserManager.error_str(status)
-
- def user_delete(self, name):
- user = self.__store.find(User, User.name == name).one()
- if not user:
- print >>sys.stderr, 'No such user'
- else:
- self.__store.remove(user)
- self.__store.commit()
- print "User '{}' deleted".format(name)
-
- def user_setadmin(self, name, off):
- user = self.__store.find(User, User.name == name).one()
- if not user:
- print >>sys.stderr, 'No such user'
- else:
- user.admin = not off
- self.__store.commit()
- print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)
-
- def user_changepass(self, name, password):
- if not password:
- password = getpass.getpass()
- confirm = getpass.getpass('Confirm password: ')
- if password != confirm:
- print >>sys.stderr, "Passwords don't match"
- return
- status = UserManager.change_password2(self.__store, name, password)
- if status != UserManager.SUCCESS:
- print >>sys.stderr, UserManager.error_str(status)
- else:
- print "Successfully changed '{}' password".format(name)
if __name__ == "__main__":
config = IniConfig.from_common_locations()
- cli = CLI(config)
+ cli = SupysonicCLI(config)
if len(sys.argv) > 1:
cli.onecmd(' '.join(sys.argv[1:]))
else:
diff --git a/supysonic/cli.py b/supysonic/cli.py
new file mode 100755
index 0000000..0165132
--- /dev/null
+++ b/supysonic/cli.py
@@ -0,0 +1,239 @@
+#!/usr/bin/env python
+# coding: utf-8
+
+# This file is part of Supysonic.
+#
+# Supysonic is a Python implementation of the Subsonic server API.
+# Copyright (C) 2013-2017 Alban 'spl0k' Féron
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+import sys, cmd, argparse, getpass, time
+
+from supysonic.db import get_store, Folder, User
+from supysonic.managers.folder import FolderManager
+from supysonic.managers.user import UserManager
+from supysonic.scanner import Scanner
+
+class CLIParser(argparse.ArgumentParser):
+ def error(self, message):
+ self.print_usage(sys.stderr)
+ raise RuntimeError(message)
+
+class SupysonicCLI(cmd.Cmd):
+ prompt = "supysonic> "
+
+ def _make_do(self, command):
+ def method(obj, line):
+ try:
+ args = getattr(obj, command + '_parser').parse_args(line.split())
+ except RuntimeError, e:
+ print >>sys.stderr, e
+ return
+
+ if hasattr(obj.__class__, command + '_subparsers'):
+ try:
+ func = getattr(obj, '{}_{}'.format(command, args.action))
+ except AttributeError:
+ return obj.default(line)
+ return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' })
+ else:
+ try:
+ func = getattr(obj, command)
+ except AttributeError:
+ return obj.default(line)
+ return func(**vars(args))
+
+ return method
+
+ def __init__(self, config):
+ cmd.Cmd.__init__(self)
+ self.__config = config
+
+ # Generate do_* and help_* methods
+ for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)):
+ command = parser_name[:-7]
+
+ if not hasattr(self.__class__, 'do_' + command):
+ setattr(self.__class__, 'do_' + command, self._make_do(command))
+
+ if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command):
+ setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help)
+ if hasattr(self.__class__, command + '_subparsers'):
+ for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems():
+ setattr(self, 'help_{} {}'.format(command, action), subparser.print_help)
+
+ self.__store = get_store(config.BASE['database_uri'])
+
+ def do_EOF(self, line):
+ return True
+
+ do_exit = do_EOF
+
+ def default(self, line):
+ print 'Unknown command %s' % line.split()[0]
+ self.do_help(None)
+
+ def postloop(self):
+ print
+
+ def completedefault(self, text, line, begidx, endidx):
+ command = line.split()[0]
+ parsers = getattr(self.__class__, command + '_subparsers', None)
+ if not parsers:
+ return []
+
+ num_words = len(line[len(command):begidx].split())
+ if num_words == 0:
+ return [ a for a in parsers.choices.keys() if a.startswith(text) ]
+ return []
+
+ folder_parser = CLIParser(prog = 'folder', add_help = False)
+ folder_subparsers = folder_parser.add_subparsers(dest = 'action')
+ folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False)
+ folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False)
+ folder_add_parser.add_argument('name', help = 'Name of the folder to add')
+ folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder')
+ folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False)
+ folder_del_parser.add_argument('name', help = 'Name of the folder to delete')
+ folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False)
+ folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
+ folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
+
+ def folder_list(self):
+ print 'Name\t\tPath\n----\t\t----'
+ print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))
+
+ def folder_add(self, name, path):
+ ret = FolderManager.add(self.__store, name, path)
+ if ret != FolderManager.SUCCESS:
+ print FolderManager.error_str(ret)
+ else:
+ print "Folder '{}' added".format(name)
+
+ def folder_delete(self, name):
+ ret = FolderManager.delete_by_name(self.__store, name)
+ if ret != FolderManager.SUCCESS:
+ print FolderManager.error_str(ret)
+ else:
+ print "Deleted folder '{}'".format(name)
+
+ def folder_scan(self, folders, force):
+
+ class TimedProgressDisplay:
+ def __init__(self, name, interval = 5):
+ self.__name = name
+ self.__interval = interval
+ self.__last_display = 0
+ self.__last_len = 0
+
+ def __call__(self, scanned, total):
+ if time.time() - self.__last_display > self.__interval or scanned == total:
+ if not self.__last_len:
+ sys.stdout.write("Scanning '{0}': ".format(self.__name))
+
+ progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
+ sys.stdout.write('\b' * self.__last_len)
+ sys.stdout.write(progress)
+ sys.stdout.flush()
+
+ self.__last_len = len(progress)
+ self.__last_display = time.time()
+
+ extensions = self.__config.BASE['scanner_extensions']
+ if extensions:
+ extensions = extensions.split(' ')
+ scanner = Scanner(self.__store, force = force, extensions = extensions)
+ if folders:
+ folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
+ if any(map(lambda f: isinstance(f, basestring), folders)):
+ print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))
+ for folder in filter(lambda f: isinstance(f, Folder), folders):
+ scanner.scan(folder, TimedProgressDisplay(folder.name))
+ else:
+ for folder in self.__store.find(Folder, Folder.root == True):
+ scanner.scan(folder, TimedProgressDisplay(folder.name))
+
+ scanner.finish()
+ added, deleted = scanner.stats()
+ self.__store.commit()
+
+ print
+ print "Scanning done"
+ print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])
+ print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
+
+ user_parser = CLIParser(prog = 'user', add_help = False)
+ user_subparsers = user_parser.add_subparsers(dest = 'action')
+ user_subparsers.add_parser('list', help = 'List users', add_help = False)
+ user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False)
+ user_add_parser.add_argument('name', help = 'Name/login of the user to add')
+ user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user')
+ user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password")
+ user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address")
+ user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False)
+ user_del_parser.add_argument('name', help = 'Name/login of the user to delete')
+ user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False)
+ user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights')
+ user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise')
+ user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False)
+ user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
+ user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
+
+ def user_list(self):
+ print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----'
+ print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))
+
+ def user_add(self, name, admin, password, email):
+ if not password:
+ password = getpass.getpass()
+ confirm = getpass.getpass('Confirm password: ')
+ if password != confirm:
+ print >>sys.stderr, "Passwords don't match"
+ return
+ status = UserManager.add(self.__store, name, password, email, admin)
+ if status != UserManager.SUCCESS:
+ print >>sys.stderr, UserManager.error_str(status)
+
+ def user_delete(self, name):
+ user = self.__store.find(User, User.name == name).one()
+ if not user:
+ print >>sys.stderr, 'No such user'
+ else:
+ self.__store.remove(user)
+ self.__store.commit()
+ print "User '{}' deleted".format(name)
+
+ def user_setadmin(self, name, off):
+ user = self.__store.find(User, User.name == name).one()
+ if not user:
+ print >>sys.stderr, 'No such user'
+ else:
+ user.admin = not off
+ self.__store.commit()
+ print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)
+
+ def user_changepass(self, name, password):
+ if not password:
+ password = getpass.getpass()
+ confirm = getpass.getpass('Confirm password: ')
+ if password != confirm:
+ print >>sys.stderr, "Passwords don't match"
+ return
+ status = UserManager.change_password2(self.__store, name, password)
+ if status != UserManager.SUCCESS:
+ print >>sys.stderr, UserManager.error_str(status)
+ else:
+ print "Successfully changed '{}' password".format(name)
+