1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 08:56:17 +00:00

Moved CLI code to package

This commit is contained in:
spl0k 2017-12-08 22:02:12 +01:00
parent 4a99e52caa
commit 5236abf6ff
2 changed files with 244 additions and 230 deletions

View File

@ -2,246 +2,21 @@
# coding: utf-8
# This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013, 2014 Alban 'spl0k' Féron
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# Copyright (C) 2017 Alban 'spl0k' Féron
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# Distributed under terms of the GNU AGPLv3 license.
import sys, cmd, argparse, getpass, time
import sys
from supysonic.cli import SupysonicCLI
from supysonic.config import IniConfig
from supysonic.db import get_store, Folder, User
from supysonic.managers.folder import FolderManager
from supysonic.managers.user import UserManager
from supysonic.scanner import Scanner
class CLIParser(argparse.ArgumentParser):
def error(self, message):
self.print_usage(sys.stderr)
raise RuntimeError(message)
class CLI(cmd.Cmd):
prompt = "supysonic> "
def _make_do(self, command):
def method(obj, line):
try:
args = getattr(obj, command + '_parser').parse_args(line.split())
except RuntimeError, e:
print >>sys.stderr, e
return
if hasattr(obj.__class__, command + '_subparsers'):
try:
func = getattr(obj, '{}_{}'.format(command, args.action))
except AttributeError:
return obj.default(line)
return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' })
else:
try:
func = getattr(obj, command)
except AttributeError:
return obj.default(line)
return func(**vars(args))
return method
def __init__(self, config):
cmd.Cmd.__init__(self)
self.__config = config
# Generate do_* and help_* methods
for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)):
command = parser_name[:-7]
if not hasattr(self.__class__, 'do_' + command):
setattr(self.__class__, 'do_' + command, self._make_do(command))
if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command):
setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help)
if hasattr(self.__class__, command + '_subparsers'):
for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems():
setattr(self, 'help_{} {}'.format(command, action), subparser.print_help)
self.__store = get_store(config.BASE['database_uri'])
def do_EOF(self, line):
return True
do_exit = do_EOF
def default(self, line):
print 'Unknown command %s' % line.split()[0]
self.do_help(None)
def postloop(self):
print
def completedefault(self, text, line, begidx, endidx):
command = line.split()[0]
parsers = getattr(self.__class__, command + '_subparsers', None)
if not parsers:
return []
num_words = len(line[len(command):begidx].split())
if num_words == 0:
return [ a for a in parsers.choices.keys() if a.startswith(text) ]
return []
folder_parser = CLIParser(prog = 'folder', add_help = False)
folder_subparsers = folder_parser.add_subparsers(dest = 'action')
folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False)
folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False)
folder_add_parser.add_argument('name', help = 'Name of the folder to add')
folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder')
folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False)
folder_del_parser.add_argument('name', help = 'Name of the folder to delete')
folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False)
folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
def folder_list(self):
print 'Name\t\tPath\n----\t\t----'
print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))
def folder_add(self, name, path):
ret = FolderManager.add(self.__store, name, path)
if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret)
else:
print "Folder '{}' added".format(name)
def folder_delete(self, name):
ret = FolderManager.delete_by_name(self.__store, name)
if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret)
else:
print "Deleted folder '{}'".format(name)
def folder_scan(self, folders, force):
class TimedProgressDisplay:
def __init__(self, name, interval = 5):
self.__name = name
self.__interval = interval
self.__last_display = 0
self.__last_len = 0
def __call__(self, scanned, total):
if time.time() - self.__last_display > self.__interval or scanned == total:
if not self.__last_len:
sys.stdout.write("Scanning '{0}': ".format(self.__name))
progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
sys.stdout.write('\b' * self.__last_len)
sys.stdout.write(progress)
sys.stdout.flush()
self.__last_len = len(progress)
self.__last_display = time.time()
extensions = self.__config.BASE['scanner_extensions']
if extensions:
extensions = extensions.split(' ')
scanner = Scanner(self.__store, force = force, extensions = extensions)
if folders:
folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
if any(map(lambda f: isinstance(f, basestring), folders)):
print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))
for folder in filter(lambda f: isinstance(f, Folder), folders):
scanner.scan(folder, TimedProgressDisplay(folder.name))
else:
for folder in self.__store.find(Folder, Folder.root == True):
scanner.scan(folder, TimedProgressDisplay(folder.name))
scanner.finish()
added, deleted = scanner.stats()
self.__store.commit()
print
print "Scanning done"
print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])
print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
user_parser = CLIParser(prog = 'user', add_help = False)
user_subparsers = user_parser.add_subparsers(dest = 'action')
user_subparsers.add_parser('list', help = 'List users', add_help = False)
user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False)
user_add_parser.add_argument('name', help = 'Name/login of the user to add')
user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user')
user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password")
user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address")
user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False)
user_del_parser.add_argument('name', help = 'Name/login of the user to delete')
user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False)
user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights')
user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise')
user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False)
user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
def user_list(self):
print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----'
print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))
def user_add(self, name, admin, password, email):
if not password:
password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
print >>sys.stderr, "Passwords don't match"
return
status = UserManager.add(self.__store, name, password, email, admin)
if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status)
def user_delete(self, name):
user = self.__store.find(User, User.name == name).one()
if not user:
print >>sys.stderr, 'No such user'
else:
self.__store.remove(user)
self.__store.commit()
print "User '{}' deleted".format(name)
def user_setadmin(self, name, off):
user = self.__store.find(User, User.name == name).one()
if not user:
print >>sys.stderr, 'No such user'
else:
user.admin = not off
self.__store.commit()
print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)
def user_changepass(self, name, password):
if not password:
password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
print >>sys.stderr, "Passwords don't match"
return
status = UserManager.change_password2(self.__store, name, password)
if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status)
else:
print "Successfully changed '{}' password".format(name)
if __name__ == "__main__":
config = IniConfig.from_common_locations()
cli = CLI(config)
cli = SupysonicCLI(config)
if len(sys.argv) > 1:
cli.onecmd(' '.join(sys.argv[1:]))
else:

239
supysonic/cli.py Executable file
View File

@ -0,0 +1,239 @@
#!/usr/bin/env python
# coding: utf-8
# This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, cmd, argparse, getpass, time
from supysonic.db import get_store, Folder, User
from supysonic.managers.folder import FolderManager
from supysonic.managers.user import UserManager
from supysonic.scanner import Scanner
class CLIParser(argparse.ArgumentParser):
def error(self, message):
self.print_usage(sys.stderr)
raise RuntimeError(message)
class SupysonicCLI(cmd.Cmd):
prompt = "supysonic> "
def _make_do(self, command):
def method(obj, line):
try:
args = getattr(obj, command + '_parser').parse_args(line.split())
except RuntimeError, e:
print >>sys.stderr, e
return
if hasattr(obj.__class__, command + '_subparsers'):
try:
func = getattr(obj, '{}_{}'.format(command, args.action))
except AttributeError:
return obj.default(line)
return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' })
else:
try:
func = getattr(obj, command)
except AttributeError:
return obj.default(line)
return func(**vars(args))
return method
def __init__(self, config):
cmd.Cmd.__init__(self)
self.__config = config
# Generate do_* and help_* methods
for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)):
command = parser_name[:-7]
if not hasattr(self.__class__, 'do_' + command):
setattr(self.__class__, 'do_' + command, self._make_do(command))
if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command):
setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help)
if hasattr(self.__class__, command + '_subparsers'):
for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems():
setattr(self, 'help_{} {}'.format(command, action), subparser.print_help)
self.__store = get_store(config.BASE['database_uri'])
def do_EOF(self, line):
return True
do_exit = do_EOF
def default(self, line):
print 'Unknown command %s' % line.split()[0]
self.do_help(None)
def postloop(self):
print
def completedefault(self, text, line, begidx, endidx):
command = line.split()[0]
parsers = getattr(self.__class__, command + '_subparsers', None)
if not parsers:
return []
num_words = len(line[len(command):begidx].split())
if num_words == 0:
return [ a for a in parsers.choices.keys() if a.startswith(text) ]
return []
folder_parser = CLIParser(prog = 'folder', add_help = False)
folder_subparsers = folder_parser.add_subparsers(dest = 'action')
folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False)
folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False)
folder_add_parser.add_argument('name', help = 'Name of the folder to add')
folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder')
folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False)
folder_del_parser.add_argument('name', help = 'Name of the folder to delete')
folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False)
folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
def folder_list(self):
print 'Name\t\tPath\n----\t\t----'
print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))
def folder_add(self, name, path):
ret = FolderManager.add(self.__store, name, path)
if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret)
else:
print "Folder '{}' added".format(name)
def folder_delete(self, name):
ret = FolderManager.delete_by_name(self.__store, name)
if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret)
else:
print "Deleted folder '{}'".format(name)
def folder_scan(self, folders, force):
class TimedProgressDisplay:
def __init__(self, name, interval = 5):
self.__name = name
self.__interval = interval
self.__last_display = 0
self.__last_len = 0
def __call__(self, scanned, total):
if time.time() - self.__last_display > self.__interval or scanned == total:
if not self.__last_len:
sys.stdout.write("Scanning '{0}': ".format(self.__name))
progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
sys.stdout.write('\b' * self.__last_len)
sys.stdout.write(progress)
sys.stdout.flush()
self.__last_len = len(progress)
self.__last_display = time.time()
extensions = self.__config.BASE['scanner_extensions']
if extensions:
extensions = extensions.split(' ')
scanner = Scanner(self.__store, force = force, extensions = extensions)
if folders:
folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
if any(map(lambda f: isinstance(f, basestring), folders)):
print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))
for folder in filter(lambda f: isinstance(f, Folder), folders):
scanner.scan(folder, TimedProgressDisplay(folder.name))
else:
for folder in self.__store.find(Folder, Folder.root == True):
scanner.scan(folder, TimedProgressDisplay(folder.name))
scanner.finish()
added, deleted = scanner.stats()
self.__store.commit()
print
print "Scanning done"
print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])
print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
user_parser = CLIParser(prog = 'user', add_help = False)
user_subparsers = user_parser.add_subparsers(dest = 'action')
user_subparsers.add_parser('list', help = 'List users', add_help = False)
user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False)
user_add_parser.add_argument('name', help = 'Name/login of the user to add')
user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user')
user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password")
user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address")
user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False)
user_del_parser.add_argument('name', help = 'Name/login of the user to delete')
user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False)
user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights')
user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise')
user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False)
user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
def user_list(self):
print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----'
print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))
def user_add(self, name, admin, password, email):
if not password:
password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
print >>sys.stderr, "Passwords don't match"
return
status = UserManager.add(self.__store, name, password, email, admin)
if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status)
def user_delete(self, name):
user = self.__store.find(User, User.name == name).one()
if not user:
print >>sys.stderr, 'No such user'
else:
self.__store.remove(user)
self.__store.commit()
print "User '{}' deleted".format(name)
def user_setadmin(self, name, off):
user = self.__store.find(User, User.name == name).one()
if not user:
print >>sys.stderr, 'No such user'
else:
user.admin = not off
self.__store.commit()
print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)
def user_changepass(self, name, password):
if not password:
password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ')
if password != confirm:
print >>sys.stderr, "Passwords don't match"
return
status = UserManager.change_password2(self.__store, name, password)
if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status)
else:
print "Successfully changed '{}' password".format(name)