1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-23 01:16:18 +00:00

Reworked config handling

This commit is contained in:
spl0k 2017-11-27 22:30:13 +01:00
parent 4ca48fd31c
commit a62976ba9d
32 changed files with 393 additions and 390 deletions

View File

@ -20,227 +20,230 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import sys, cmd, argparse, getpass, time import sys, cmd, argparse, getpass, time
from supysonic import config
from supysonic.config import IniConfig
from supysonic.db import get_store, Folder, User from supysonic.db import get_store, Folder, User
from supysonic.managers.folder import FolderManager from supysonic.managers.folder import FolderManager
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from supysonic.scanner import Scanner from supysonic.scanner import Scanner
class CLIParser(argparse.ArgumentParser): class CLIParser(argparse.ArgumentParser):
def error(self, message): def error(self, message):
self.print_usage(sys.stderr) self.print_usage(sys.stderr)
raise RuntimeError(message) raise RuntimeError(message)
class CLI(cmd.Cmd): class CLI(cmd.Cmd):
prompt = "supysonic> " prompt = "supysonic> "
def _make_do(self, command): def _make_do(self, command):
def method(obj, line): def method(obj, line):
try: try:
args = getattr(obj, command + '_parser').parse_args(line.split()) args = getattr(obj, command + '_parser').parse_args(line.split())
except RuntimeError, e: except RuntimeError, e:
print >>sys.stderr, e print >>sys.stderr, e
return return
if hasattr(obj.__class__, command + '_subparsers'): if hasattr(obj.__class__, command + '_subparsers'):
try: try:
func = getattr(obj, '{}_{}'.format(command, args.action)) func = getattr(obj, '{}_{}'.format(command, args.action))
except AttributeError: except AttributeError:
return obj.default(line) return obj.default(line)
return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' }) return func(** { key: vars(args)[key] for key in vars(args) if key != 'action' })
else: else:
try: try:
func = getattr(obj, command) func = getattr(obj, command)
except AttributeError: except AttributeError:
return obj.default(line) return obj.default(line)
return func(**vars(args)) return func(**vars(args))
return method return method
def __init__(self, store): def __init__(self, config):
cmd.Cmd.__init__(self) cmd.Cmd.__init__(self)
self.__config = config
# Generate do_* and help_* methods # Generate do_* and help_* methods
for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)): for parser_name in filter(lambda attr: attr.endswith('_parser') and '_' not in attr[:-7], dir(self.__class__)):
command = parser_name[:-7] command = parser_name[:-7]
if not hasattr(self.__class__, 'do_' + command): if not hasattr(self.__class__, 'do_' + command):
setattr(self.__class__, 'do_' + command, self._make_do(command)) setattr(self.__class__, 'do_' + command, self._make_do(command))
if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command): if hasattr(self.__class__, 'do_' + command) and not hasattr(self.__class__, 'help_' + command):
setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help) setattr(self.__class__, 'help_' + command, getattr(self.__class__, parser_name).print_help)
if hasattr(self.__class__, command + '_subparsers'): if hasattr(self.__class__, command + '_subparsers'):
for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems(): for action, subparser in getattr(self.__class__, command + '_subparsers').choices.iteritems():
setattr(self, 'help_{} {}'.format(command, action), subparser.print_help) setattr(self, 'help_{} {}'.format(command, action), subparser.print_help)
self.__store = store self.__store = get_store(config.BASE['database_uri'])
def do_EOF(self, line): def do_EOF(self, line):
return True return True
do_exit = do_EOF do_exit = do_EOF
def default(self, line): def default(self, line):
print 'Unknown command %s' % line.split()[0] print 'Unknown command %s' % line.split()[0]
self.do_help(None) self.do_help(None)
def postloop(self): def postloop(self):
print print
def completedefault(self, text, line, begidx, endidx): def completedefault(self, text, line, begidx, endidx):
command = line.split()[0] command = line.split()[0]
parsers = getattr(self.__class__, command + '_subparsers', None) parsers = getattr(self.__class__, command + '_subparsers', None)
if not parsers: if not parsers:
return [] return []
num_words = len(line[len(command):begidx].split()) num_words = len(line[len(command):begidx].split())
if num_words == 0: if num_words == 0:
return [ a for a in parsers.choices.keys() if a.startswith(text) ] return [ a for a in parsers.choices.keys() if a.startswith(text) ]
return [] return []
folder_parser = CLIParser(prog = 'folder', add_help = False) folder_parser = CLIParser(prog = 'folder', add_help = False)
folder_subparsers = folder_parser.add_subparsers(dest = 'action') folder_subparsers = folder_parser.add_subparsers(dest = 'action')
folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False) folder_subparsers.add_parser('list', help = 'Lists folders', add_help = False)
folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False) folder_add_parser = folder_subparsers.add_parser('add', help = 'Adds a folder', add_help = False)
folder_add_parser.add_argument('name', help = 'Name of the folder to add') folder_add_parser.add_argument('name', help = 'Name of the folder to add')
folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder') folder_add_parser.add_argument('path', help = 'Path to the directory pointed by the folder')
folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False) folder_del_parser = folder_subparsers.add_parser('delete', help = 'Deletes a folder', add_help = False)
folder_del_parser.add_argument('name', help = 'Name of the folder to delete') folder_del_parser.add_argument('name', help = 'Name of the folder to delete')
folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False) folder_scan_parser = folder_subparsers.add_parser('scan', help = 'Run a scan on specified folders', add_help = False)
folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned') folder_scan_parser.add_argument('folders', metavar = 'folder', nargs = '*', help = 'Folder(s) to be scanned. If ommitted, all folders are scanned')
folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed") folder_scan_parser.add_argument('-f', '--force', action = 'store_true', help = "Force scan of already know files even if they haven't changed")
def folder_list(self): def folder_list(self):
print 'Name\t\tPath\n----\t\t----' print 'Name\t\tPath\n----\t\t----'
print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True)) print '\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in self.__store.find(Folder, Folder.root == True))
def folder_add(self, name, path): def folder_add(self, name, path):
ret = FolderManager.add(self.__store, name, path) ret = FolderManager.add(self.__store, name, path)
if ret != FolderManager.SUCCESS: if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret) print FolderManager.error_str(ret)
else: else:
print "Folder '{}' added".format(name) print "Folder '{}' added".format(name)
def folder_delete(self, name): def folder_delete(self, name):
ret = FolderManager.delete_by_name(self.__store, name) ret = FolderManager.delete_by_name(self.__store, name)
if ret != FolderManager.SUCCESS: if ret != FolderManager.SUCCESS:
print FolderManager.error_str(ret) print FolderManager.error_str(ret)
else: else:
print "Deleted folder '{}'".format(name) print "Deleted folder '{}'".format(name)
def folder_scan(self, folders, force): def folder_scan(self, folders, force):
class TimedProgressDisplay: class TimedProgressDisplay:
def __init__(self, name, interval = 5): def __init__(self, name, interval = 5):
self.__name = name self.__name = name
self.__interval = interval self.__interval = interval
self.__last_display = 0 self.__last_display = 0
self.__last_len = 0 self.__last_len = 0
def __call__(self, scanned, total): def __call__(self, scanned, total):
if time.time() - self.__last_display > self.__interval or scanned == total: if time.time() - self.__last_display > self.__interval or scanned == total:
if not self.__last_len: if not self.__last_len:
sys.stdout.write("Scanning '{0}': ".format(self.__name)) sys.stdout.write("Scanning '{0}': ".format(self.__name))
progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total) progress = "{0}% ({1}/{2})".format((scanned * 100) / total, scanned, total)
sys.stdout.write('\b' * self.__last_len) sys.stdout.write('\b' * self.__last_len)
sys.stdout.write(progress) sys.stdout.write(progress)
sys.stdout.flush() sys.stdout.flush()
self.__last_len = len(progress) self.__last_len = len(progress)
self.__last_display = time.time() self.__last_display = time.time()
scanner = Scanner(self.__store, force) extensions = self.__config.BASE['scanner_extensions']
if folders: if extensions:
folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders) extensions = extensions.split(' ')
if any(map(lambda f: isinstance(f, basestring), folders)): scanner = Scanner(self.__store, force = force, extensions = extensions)
print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring)) if folders:
for folder in filter(lambda f: isinstance(f, Folder), folders): folders = map(lambda n: self.__store.find(Folder, Folder.name == n, Folder.root == True).one() or n, folders)
scanner.scan(folder, TimedProgressDisplay(folder.name)) if any(map(lambda f: isinstance(f, basestring), folders)):
else: print "No such folder(s): " + ' '.join(f for f in folders if isinstance(f, basestring))
for folder in self.__store.find(Folder, Folder.root == True): for folder in filter(lambda f: isinstance(f, Folder), folders):
scanner.scan(folder, TimedProgressDisplay(folder.name)) scanner.scan(folder, TimedProgressDisplay(folder.name))
else:
for folder in self.__store.find(Folder, Folder.root == True):
scanner.scan(folder, TimedProgressDisplay(folder.name))
scanner.finish() scanner.finish()
added, deleted = scanner.stats() added, deleted = scanner.stats()
self.__store.commit() self.__store.commit()
print print
print "Scanning done" print "Scanning done"
print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2]) print 'Added: %i artists, %i albums, %i tracks' % (added[0], added[1], added[2])
print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2]) print 'Deleted: %i artists, %i albums, %i tracks' % (deleted[0], deleted[1], deleted[2])
user_parser = CLIParser(prog = 'user', add_help = False) user_parser = CLIParser(prog = 'user', add_help = False)
user_subparsers = user_parser.add_subparsers(dest = 'action') user_subparsers = user_parser.add_subparsers(dest = 'action')
user_subparsers.add_parser('list', help = 'List users', add_help = False) user_subparsers.add_parser('list', help = 'List users', add_help = False)
user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False) user_add_parser = user_subparsers.add_parser('add', help = 'Adds a user', add_help = False)
user_add_parser.add_argument('name', help = 'Name/login of the user to add') user_add_parser.add_argument('name', help = 'Name/login of the user to add')
user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user') user_add_parser.add_argument('-a', '--admin', action = 'store_true', help = 'Give admin rights to the new user')
user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password") user_add_parser.add_argument('-p', '--password', help = "Specifies the user's password")
user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address") user_add_parser.add_argument('-e', '--email', default = '', help = "Sets the user's email address")
user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False) user_del_parser = user_subparsers.add_parser('delete', help = 'Deletes a user', add_help = False)
user_del_parser.add_argument('name', help = 'Name/login of the user to delete') user_del_parser.add_argument('name', help = 'Name/login of the user to delete')
user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False) user_admin_parser = user_subparsers.add_parser('setadmin', help = 'Enable/disable admin rights for a user', add_help = False)
user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights') user_admin_parser.add_argument('name', help = 'Name/login of the user to grant/revoke admin rights')
user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise') user_admin_parser.add_argument('--off', action = 'store_true', help = 'Revoke admin rights if present, grant them otherwise')
user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False) user_pass_parser = user_subparsers.add_parser('changepass', help = "Changes a user's password", add_help = False)
user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password') user_pass_parser.add_argument('name', help = 'Name/login of the user to which change the password')
user_pass_parser.add_argument('password', nargs = '?', help = 'New password') user_pass_parser.add_argument('password', nargs = '?', help = 'New password')
def user_list(self): def user_list(self):
print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----' print 'Name\t\tAdmin\tEmail\n----\t\t-----\t-----'
print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User)) print '\n'.join('{0: <16}{1}\t{2}'.format(u.name, '*' if u.admin else '', u.mail) for u in self.__store.find(User))
def user_add(self, name, admin, password, email): def user_add(self, name, admin, password, email):
if not password: if not password:
password = getpass.getpass() password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ') confirm = getpass.getpass('Confirm password: ')
if password != confirm: if password != confirm:
print >>sys.stderr, "Passwords don't match" print >>sys.stderr, "Passwords don't match"
return return
status = UserManager.add(self.__store, name, password, email, admin) status = UserManager.add(self.__store, name, password, email, admin)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status) print >>sys.stderr, UserManager.error_str(status)
def user_delete(self, name): def user_delete(self, name):
user = self.__store.find(User, User.name == name).one() user = self.__store.find(User, User.name == name).one()
if not user: if not user:
print >>sys.stderr, 'No such user' print >>sys.stderr, 'No such user'
else: else:
self.__store.remove(user) self.__store.remove(user)
self.__store.commit() self.__store.commit()
print "User '{}' deleted".format(name) print "User '{}' deleted".format(name)
def user_setadmin(self, name, off): def user_setadmin(self, name, off):
user = self.__store.find(User, User.name == name).one() user = self.__store.find(User, User.name == name).one()
if not user: if not user:
print >>sys.stderr, 'No such user' print >>sys.stderr, 'No such user'
else: else:
user.admin = not off user.admin = not off
self.__store.commit() self.__store.commit()
print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name) print "{0} '{1}' admin rights".format('Revoked' if off else 'Granted', name)
def user_changepass(self, name, password): def user_changepass(self, name, password):
if not password: if not password:
password = getpass.getpass() password = getpass.getpass()
confirm = getpass.getpass('Confirm password: ') confirm = getpass.getpass('Confirm password: ')
if password != confirm: if password != confirm:
print >>sys.stderr, "Passwords don't match" print >>sys.stderr, "Passwords don't match"
return return
status = UserManager.change_password2(self.__store, name, password) status = UserManager.change_password2(self.__store, name, password)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
print >>sys.stderr, UserManager.error_str(status) print >>sys.stderr, UserManager.error_str(status)
else: else:
print "Successfully changed '{}' password".format(name) print "Successfully changed '{}' password".format(name)
if __name__ == "__main__": if __name__ == "__main__":
if not config.check(): config = IniConfig.from_common_locations()
sys.exit(1)
cli = CLI(get_store(config.get('base', 'database_uri'))) cli = CLI(config)
if len(sys.argv) > 1: if len(sys.argv) > 1:
cli.onecmd(' '.join(sys.argv[1:])) cli.onecmd(' '.join(sys.argv[1:]))
else: else:
cli.cmdloop() cli.cmdloop()

View File

@ -19,9 +19,11 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from supysonic.config import IniConfig
from supysonic.watcher import SupysonicWatcher from supysonic.watcher import SupysonicWatcher
if __name__ == "__main__": if __name__ == "__main__":
watcher = SupysonicWatcher() config = IniConfig.from_common_locations()
watcher = SupysonicWatcher(config)
watcher.run() watcher.run()

View File

@ -18,14 +18,14 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from xml.etree import ElementTree from xml.etree import ElementTree
from xml.dom import minidom from xml.dom import minidom
import simplejson import simplejson
import uuid import uuid
import binascii import binascii
from supysonic.web import app, store from supysonic.web import store
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
@app.before_request @app.before_request

View File

@ -18,14 +18,14 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from storm.expr import Desc, Avg, Min, Max from storm.expr import Desc, Avg, Min, Max
from storm.info import ClassAlias from storm.info import ClassAlias
from datetime import timedelta from datetime import timedelta
import random import random
import uuid import uuid
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Folder, Artist, Album, Track, RatingFolder, StarredFolder, StarredArtist, StarredAlbum, StarredTrack, User from supysonic.db import Folder, Artist, Album, Track, RatingFolder, StarredFolder, StarredArtist, StarredAlbum, StarredTrack, User
from supysonic.db import now from supysonic.db import now

View File

@ -20,8 +20,9 @@
import time import time
import uuid import uuid
from flask import request from flask import request, current_app as app
from supysonic.web import app, store
from supysonic.web import store
from . import get_entity from . import get_entity
from supysonic.lastfm import LastFm from supysonic.lastfm import LastFm
from supysonic.db import Track, Album, Artist, Folder from supysonic.db import Track, Album, Artist, Folder
@ -187,7 +188,7 @@ def scrobble():
else: else:
t = int(time.time()) t = int(time.time())
lfm = LastFm(request.user, app.logger) lfm = LastFm(app.config['LASTFM'], request.user, app.logger)
if submission in (None, '', True, 'true', 'True', 1, '1'): if submission in (None, '', True, 'true', 'True', 1, '1'):
lfm.scrobble(res, t) lfm.scrobble(res, t)

View File

@ -18,8 +18,8 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Folder, Artist, Album, Track from supysonic.db import Folder, Artist, Album, Track
from . import get_entity from . import get_entity
import uuid, string import uuid, string

View File

@ -18,8 +18,8 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import ChatMessage from supysonic.db import ChatMessage
@app.route('/rest/getChatMessages.view', methods = [ 'GET', 'POST' ]) @app.route('/rest/getChatMessages.view', methods = [ 'GET', 'POST' ])

View File

@ -18,16 +18,18 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request, send_file, Response
import requests
import os.path
from PIL import Image
import subprocess
import codecs import codecs
import mimetypes
import os.path
import requests
import subprocess
from flask import request, send_file, Response, current_app as app
from PIL import Image
from xml.etree import ElementTree from xml.etree import ElementTree
from supysonic import config, scanner from supysonic import scanner
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Track, Album, Artist, Folder, User, ClientPrefs, now from supysonic.db import Track, Album, Artist, Folder, User, ClientPrefs, now
from . import get_entity from . import get_entity
@ -70,14 +72,15 @@ def stream_media():
if format and format != 'raw' and format != src_suffix: if format and format != 'raw' and format != src_suffix:
dst_suffix = format dst_suffix = format
dst_mimetype = config.get_mime(dst_suffix) dst_mimetype = mimetypes.guess_type('dummyname.' + dst_suffix, False)[0] or 'application/octet-stream'
if format != 'raw' and (dst_suffix != src_suffix or dst_bitrate != res.bitrate): if format != 'raw' and (dst_suffix != src_suffix or dst_bitrate != res.bitrate):
transcoder = config.get('transcoding', 'transcoder_{}_{}'.format(src_suffix, dst_suffix)) config = app.config['TRANSCODING']
decoder = config.get('transcoding', 'decoder_' + src_suffix) or config.get('transcoding', 'decoder') transcoder = config.get('transcoder_{}_{}'.format(src_suffix, dst_suffix))
encoder = config.get('transcoding', 'encoder_' + dst_suffix) or config.get('transcoding', 'encoder') decoder = config.get('decoder_' + src_suffix) or config.get('decoder')
encoder = config.get('encoder_' + dst_suffix) or config.get('encoder')
if not transcoder and (not decoder or not encoder): if not transcoder and (not decoder or not encoder):
transcoder = config.get('transcoding', 'transcoder') transcoder = config.get('transcoder')
if not transcoder: if not transcoder:
message = 'No way to transcode from {} to {}'.format(src_suffix, dst_suffix) message = 'No way to transcode from {} to {}'.format(src_suffix, dst_suffix)
app.logger.info(message) app.logger.info(message)
@ -153,7 +156,7 @@ def cover_art():
if size > im.size[0] and size > im.size[1]: if size > im.size[0] and size > im.size[1]:
return send_file(os.path.join(res.path, 'cover.jpg')) return send_file(os.path.join(res.path, 'cover.jpg'))
size_path = os.path.join(config.get('webapp', 'cache_dir'), str(size)) size_path = os.path.join(app.config['WEBAPP']['cache_dir'], str(size))
path = os.path.abspath(os.path.join(size_path, str(res.id))) path = os.path.abspath(os.path.join(size_path, str(res.id)))
if os.path.exists(path): if os.path.exists(path):
return send_file(path) return send_file(path)

View File

@ -18,10 +18,10 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from storm.expr import Or from storm.expr import Or
import uuid import uuid
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Playlist, User, Track from supysonic.db import Playlist, User, Track
from . import get_entity from . import get_entity

View File

@ -19,9 +19,9 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from datetime import datetime from datetime import datetime
from flask import request from flask import request, current_app as app
from storm.info import ClassAlias from storm.info import ClassAlias
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Folder, Track, Artist, Album from supysonic.db import Folder, Track, Artist, Album
@app.route('/rest/search.view', methods = [ 'GET', 'POST' ]) @app.route('/rest/search.view', methods = [ 'GET', 'POST' ])

View File

@ -18,8 +18,7 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from supysonic.web import app
@app.route('/rest/ping.view', methods = [ 'GET', 'POST' ]) @app.route('/rest/ping.view', methods = [ 'GET', 'POST' ])
def ping(): def ping():

View File

@ -18,8 +18,8 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request from flask import request, current_app as app
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import User from supysonic.db import User
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from . import decode_password from . import decode_password

View File

@ -9,63 +9,60 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
from ConfigParser import ConfigParser, NoOptionError, NoSectionError from ConfigParser import SafeConfigParser
import mimetypes
import os import os
import tempfile import tempfile
# Seek for standard locations class DefaultConfig(object):
config_file = [ DEBUG = False
'supysonic.conf', SECRET_KEY = os.urandom(128)
os.path.expanduser('~/.config/supysonic/supysonic.conf'),
tempdir = os.path.join(tempfile.gettempdir(), 'supysonic')
BASE = {
'database_uri': 'sqlite://' + os.path.join(tempdir, 'supysonic.db'),
'scanner_extensions': None
}
WEBAPP = {
'cache_dir': tempdir,
'log_file': None,
'log_level': 'WARNING',
'mount_webui': True,
'mount_api': True
}
DAEMON = {
'log_file': None,
'log_level': 'WARNING'
}
LASTFM = {
'api_key': None,
'secret': None
}
TRANSCODING = {}
MIMETYPES = {}
class IniConfig(DefaultConfig):
common_paths = [
'/etc/supysonic',
os.path.expanduser('~/.supysonic'), os.path.expanduser('~/.supysonic'),
'/etc/supysonic' os.path.expanduser('~/.config/supysonic/supysonic.conf'),
] 'supysonic.conf'
]
config = ConfigParser({ 'cache_dir': os.path.join(tempfile.gettempdir(), 'supysonic') }) def __init__(self, paths):
parser = SafeConfigParser()
parser.read(paths)
for section in parser.sections():
options = { k: v for k, v in parser.items(section) }
section = section.upper()
if hasattr(self, section):
getattr(self, section).update(options)
else:
setattr(self, section, options)
def check(): @classmethod
""" def from_common_locations(cls):
Checks the config file and mandatory fields return IniConfig(cls.common_paths)
"""
try:
config.read(config_file)
except Exception as e:
err = 'Config file is corrupted.\n{0}'.format(e)
raise SystemExit(err)
try:
config.get('base', 'database_uri')
except (NoSectionError, NoOptionError):
raise SystemExit('No database URI set')
return True
def get(section, option):
"""
Returns a config option value from config file
:param section: section where the option is stored
:param option: option name
:return: a config option value
:rtype: string
"""
try:
return config.get(section, option)
except (NoSectionError, NoOptionError):
return None
def get_mime(extension):
"""
Returns mimetype of an extension based on config file
:param extension: extension string
:return: mimetype
:rtype: string
"""
guessed_mime = mimetypes.guess_type('dummy.' + extension, False)[0]
config_mime = get('mimetypes', extension)
default_mime = 'application/octet-stream'
return guessed_mime or config_mime or default_mime

View File

@ -25,10 +25,9 @@ from storm.store import Store
from storm.variables import Variable from storm.variables import Variable
import uuid, datetime, time import uuid, datetime, time
import mimetypes
import os.path import os.path
from supysonic import config
def now(): def now():
return datetime.datetime.now().replace(microsecond = 0) return datetime.datetime.now().replace(microsecond = 0)
@ -213,7 +212,7 @@ class Track(object):
if prefs and prefs.format and prefs.format != self.suffix(): if prefs and prefs.format and prefs.format != self.suffix():
info['transcodedSuffix'] = prefs.format info['transcodedSuffix'] = prefs.format
info['transcodedContentType'] = config.get_mime(prefs.format) info['transcodedContentType'] = mimetypes.guess_type('dummyname.' + prefs.format, False)[0] or 'application/octet-stream'
return info return info

View File

@ -9,8 +9,8 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
from flask import session, request, redirect, url_for from flask import session, request, redirect, url_for, current_app as app
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Artist, Album, Track from supysonic.db import Artist, Album, Track
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from functools import wraps from functools import wraps

View File

@ -18,11 +18,11 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request, flash, render_template, redirect, url_for from flask import request, flash, render_template, redirect, url_for, current_app as app
import os.path import os.path
import uuid import uuid
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Folder from supysonic.db import Folder
from supysonic.scanner import Scanner from supysonic.scanner import Scanner
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
@ -84,7 +84,10 @@ def del_folder(id):
@app.route('/folder/scan/<id>') @app.route('/folder/scan/<id>')
@admin_only @admin_only
def scan_folder(id = None): def scan_folder(id = None):
scanner = Scanner(store) extensions = app.config['BASE']['scanner_extensions']
if extensions:
extensions = extensions.split(' ')
scanner = Scanner(store, extensions = extensions)
if id is None: if id is None:
for folder in store.find(Folder, Folder.root == True): for folder in store.find(Folder, Folder.root == True):
scanner.scan(folder) scanner.scan(folder)

View File

@ -18,9 +18,9 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import request, flash, render_template, redirect, url_for from flask import request, flash, render_template, redirect, url_for, current_app as app
import uuid import uuid
from supysonic.web import app, store from supysonic.web import store
from supysonic.db import Playlist from supysonic.db import Playlist
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager

View File

@ -18,15 +18,12 @@
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import uuid, csv from flask import request, session, flash, render_template, redirect, url_for, current_app as app
from flask import request, session, flash, render_template, redirect, url_for, make_response
from functools import wraps from functools import wraps
from supysonic.web import app, store from supysonic.web import store
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from supysonic.db import User, ClientPrefs from supysonic.db import User, ClientPrefs
from supysonic import config
from supysonic.lastfm import LastFm from supysonic.lastfm import LastFm
from . import admin_only from . import admin_only
@ -67,7 +64,7 @@ def user_index():
@me_or_uuid @me_or_uuid
def user_profile(uid, user): def user_profile(uid, user):
prefs = store.find(ClientPrefs, ClientPrefs.user_id == user.id) prefs = store.find(ClientPrefs, ClientPrefs.user_id == user.id)
return render_template('profile.html', user = user, has_lastfm = config.get('lastfm', 'api_key') != None, clients = prefs) return render_template('profile.html', user = user, has_lastfm = app.config['LASTFM']['api_key'] != None, clients = prefs)
@app.route('/user/<uid>', methods = [ 'POST' ]) @app.route('/user/<uid>', methods = [ 'POST' ])
@me_or_uuid @me_or_uuid
@ -251,7 +248,7 @@ def lastfm_reg(uid, user):
flash('Missing LastFM auth token') flash('Missing LastFM auth token')
return redirect(url_for('user_profile', uid = uid)) return redirect(url_for('user_profile', uid = uid))
lfm = LastFm(user, app.logger) lfm = LastFm(app.config['LASTFM'], user, app.logger)
status, error = lfm.link_account(token) status, error = lfm.link_account(token)
store.commit() store.commit()
flash(error if not status else 'Successfully linked LastFM account') flash(error if not status else 'Successfully linked LastFM account')
@ -261,7 +258,7 @@ def lastfm_reg(uid, user):
@app.route('/user/<uid>/lastfm/unlink') @app.route('/user/<uid>/lastfm/unlink')
@me_or_uuid @me_or_uuid
def lastfm_unreg(uid, user): def lastfm_unreg(uid, user):
lfm = LastFm(user, app.logger) lfm = LastFm(app.config['LASTFM'], user, app.logger)
lfm.unlink_account() lfm.unlink_account()
store.commit() store.commit()
flash('Unlinked LastFM account') flash('Unlinked LastFM account')

View File

@ -3,7 +3,7 @@
# This file is part of Supysonic. # This file is part of Supysonic.
# #
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013 Alban 'spl0k' Féron # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -19,13 +19,12 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import requests, hashlib import requests, hashlib
from supysonic import config
class LastFm: class LastFm:
def __init__(self, user, logger): def __init__(self, config, user, logger):
self.__api_key = config['api_key']
self.__api_secret = config['secret']
self.__user = user self.__user = user
self.__api_key = config.get('lastfm', 'api_key')
self.__api_secret = config.get('lastfm', 'secret')
self.__enabled = self.__api_key is not None and self.__api_secret is not None self.__enabled = self.__api_key is not None and self.__api_secret is not None
self.__logger = logger self.__logger = logger

View File

@ -19,13 +19,13 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, os.path import os, os.path
import time import mimetypes
import mutagen import mutagen
import time
from storm.expr import ComparableExpr, compile, Like from storm.expr import ComparableExpr, compile, Like
from storm.exceptions import NotSupportedError from storm.exceptions import NotSupportedError
from supysonic import config
from supysonic.db import Folder, Artist, Album, Track, User from supysonic.db import Folder, Artist, Album, Track, User
from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
from supysonic.db import RatingFolder, RatingTrack from supysonic.db import RatingFolder, RatingTrack
@ -52,7 +52,10 @@ def compile_concat(compile, concat, state):
return statement % (left, right) return statement % (left, right)
class Scanner: class Scanner:
def __init__(self, store, force = False): def __init__(self, store, force = False, extensions = None):
if extensions is not None and not isinstance(extensions, list):
raise TypeError('Invalid extensions type')
self.__store = store self.__store = store
self.__force = force self.__force = force
@ -63,8 +66,7 @@ class Scanner:
self.__deleted_albums = 0 self.__deleted_albums = 0
self.__deleted_tracks = 0 self.__deleted_tracks = 0
extensions = config.get('base', 'scanner_extensions') self.__extensions = extensions
self.__extensions = map(str.lower, extensions.split()) if extensions else None
self.__folders_to_check = set() self.__folders_to_check = set()
self.__artists_to_check = set() self.__artists_to_check = set()
@ -172,7 +174,7 @@ class Scanner:
tr.duration = int(tag.info.length) tr.duration = int(tag.info.length)
tr.bitrate = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000 tr.bitrate = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000
tr.content_type = config.get_mime(os.path.splitext(path)[1][1:]) tr.content_type = mimetypes.guess_type(path, False)[0] or 'application/octet-stream'
tr.last_modification = os.path.getmtime(path) tr.last_modification = os.path.getmtime(path)
tralbum = self.__find_album(albumartist, album) tralbum = self.__find_album(albumartist, album)

View File

@ -26,7 +26,7 @@ from logging.handlers import TimedRotatingFileHandler
from watchdog.observers import Observer from watchdog.observers import Observer
from watchdog.events import PatternMatchingEventHandler from watchdog.events import PatternMatchingEventHandler
from supysonic import config, db from supysonic import db
from supysonic.scanner import Scanner from supysonic.scanner import Scanner
OP_SCAN = 1 OP_SCAN = 1
@ -35,8 +35,7 @@ OP_MOVE = 4
FLAG_CREATE = 8 FLAG_CREATE = 8
class SupysonicWatcherEventHandler(PatternMatchingEventHandler): class SupysonicWatcherEventHandler(PatternMatchingEventHandler):
def __init__(self, queue, logger): def __init__(self, extensions, queue, logger):
extensions = config.get('base', 'scanner_extensions')
patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None
super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True) super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True)
@ -109,10 +108,11 @@ class Event(object):
return self.__src return self.__src
class ScannerProcessingQueue(Thread): class ScannerProcessingQueue(Thread):
def __init__(self, logger): def __init__(self, database_uri, logger):
super(ScannerProcessingQueue, self).__init__() super(ScannerProcessingQueue, self).__init__()
self.__logger = logger self.__logger = logger
self.__database_uri = database_uri
self.__cond = Condition() self.__cond = Condition()
self.__timer = None self.__timer = None
self.__queue = {} self.__queue = {}
@ -135,7 +135,7 @@ class ScannerProcessingQueue(Thread):
continue continue
self.__logger.debug("Instantiating scanner") self.__logger.debug("Instantiating scanner")
store = db.get_store(config.get('base', 'database_uri')) store = db.get_store(self.__database_uri)
scanner = Scanner(store) scanner = Scanner(store)
item = self.__next_item() item = self.__next_item()
@ -202,18 +202,18 @@ class ScannerProcessingQueue(Thread):
return None return None
class SupysonicWatcher(object): class SupysonicWatcher(object):
def run(self): def __init__(self, config):
if not config.check(): self.__config = config
return
def run(self):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if config.get('daemon', 'log_file'): if self.__config.DAEMON['log_file']:
log_handler = TimedRotatingFileHandler(config.get('daemon', 'log_file'), when = 'midnight') log_handler = TimedRotatingFileHandler(self.__config.DAEMON['log_file'], when = 'midnight')
else: else:
log_handler = logging.NullHandler() log_handler = logging.NullHandler()
log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(log_handler) logger.addHandler(log_handler)
if config.get('daemon', 'log_level'): if self.__config.DAEMON['log_level']:
mapping = { mapping = {
'DEBUG': logging.DEBUG, 'DEBUG': logging.DEBUG,
'INFO': logging.INFO, 'INFO': logging.INFO,
@ -221,9 +221,9 @@ class SupysonicWatcher(object):
'ERROR': logging.ERROR, 'ERROR': logging.ERROR,
'CRTICAL': logging.CRITICAL 'CRTICAL': logging.CRITICAL
} }
logger.setLevel(mapping.get(config.get('daemon', 'log_level').upper(), logging.NOTSET)) logger.setLevel(mapping.get(self.__config.DAEMON['log_level'].upper(), logging.NOTSET))
store = db.get_store(config.get('base', 'database_uri')) store = db.get_store(self.__config.BASE['database_uri'])
folders = store.find(db.Folder, db.Folder.root == True) folders = store.find(db.Folder, db.Folder.root == True)
if not folders.count(): if not folders.count():
@ -231,8 +231,8 @@ class SupysonicWatcher(object):
store.close() store.close()
return return
queue = ScannerProcessingQueue(logger) queue = ScannerProcessingQueue(self.__config.BASE['database_uri'], logger)
handler = SupysonicWatcherEventHandler(queue, logger) handler = SupysonicWatcherEventHandler(self.__config.BASE['scanner_extensions'], queue, logger)
observer = Observer() observer = Observer()
for folder in folders: for folder in folders:

View File

@ -9,17 +9,19 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
from flask import Flask, g import mimetypes
from flask import Flask, g, current_app
from os import makedirs, path from os import makedirs, path
from werkzeug.local import LocalProxy from werkzeug.local import LocalProxy
from supysonic import config from supysonic.config import IniConfig
from supysonic.db import get_store from supysonic.db import get_store
# Supysonic database open # Supysonic database open
def get_db(): def get_db():
if not hasattr(g, 'database'): if not hasattr(g, 'database'):
g.database = get_store(config.get('base', 'database_uri')) g.database = get_store(current_app.config['BASE']['database_uri'])
return g.database return g.database
# Supysonic database close # Supysonic database close
@ -29,36 +31,28 @@ def close_db(error):
store = LocalProxy(get_db) store = LocalProxy(get_db)
def create_application(): def create_application(config = None):
global app global app
# Check config for mandatory fields
config.check()
# Test for the cache directory
if not path.exists(config.get('webapp', 'cache_dir')):
makedirs(config.get('webapp', 'cache_dir'))
# Flask! # Flask!
app = Flask(__name__) app = Flask(__name__)
app.config.from_object('supysonic.config.DefaultConfig')
# Set a secret key for sessions if not config:
secret_key = config.get('base', 'secret_key') config = IniConfig.from_common_locations()
# If secret key is not defined in config, set develop key app.config.from_object(config)
if secret_key is None:
app.secret_key = 'd3v3l0p'
else:
app.secret_key = secret_key
# Close database connection on teardown # Close database connection on teardown
app.teardown_appcontext(close_db) app.teardown_appcontext(close_db)
# Set loglevel # Set loglevel
if config.get('webapp', 'log_file'): logfile = app.config['WEBAPP']['log_file']
if logfile:
import logging import logging
from logging.handlers import TimedRotatingFileHandler from logging.handlers import TimedRotatingFileHandler
handler = TimedRotatingFileHandler(config.get('webapp', 'log_file'), when = 'midnight') handler = TimedRotatingFileHandler(logfile, when = 'midnight')
if config.get('webapp', 'log_level'): loglevel = app.config['WEBAPP']['log_level']
if loglevel:
mapping = { mapping = {
'DEBUG': logging.DEBUG, 'DEBUG': logging.DEBUG,
'INFO': logging.INFO, 'INFO': logging.INFO,
@ -66,11 +60,26 @@ def create_application():
'ERROR': logging.ERROR, 'ERROR': logging.ERROR,
'CRTICAL': logging.CRITICAL 'CRTICAL': logging.CRITICAL
} }
handler.setLevel(mapping.get(config.get('webapp', 'log_level').upper(), logging.NOTSET)) handler.setLevel(mapping.get(loglevel.upper(), logging.NOTSET))
app.logger.addHandler(handler) app.logger.addHandler(handler)
# Insert unknown mimetypes
for k, v in app.config['MIMETYPES'].iteritems():
extension = '.' + k.lower()
if extension not in mimetypes.types_map:
mimetypes.add_type(v, extension, False)
# Test for the cache directory
cache_path = app.config['WEBAPP']['cache_dir']
if not path.exists(cache_path):
makedirs(cache_path)
# Import app sections # Import app sections
from supysonic import frontend with app.app_context():
from supysonic import api if app.config['WEBAPP']['mount_webui']:
from supysonic import frontend
if app.config['WEBAPP']['mount_api']:
from supysonic import api
return app return app

View File

@ -21,7 +21,7 @@ NS = 'http://subsonic.org/restapi'
NSMAP = { 'sub': NS } NSMAP = { 'sub': NS }
class ApiTestBase(TestBase): class ApiTestBase(TestBase):
__module_to_test__ = 'supysonic.api' __with_api__ = True
def setUp(self): def setUp(self):
super(ApiTestBase, self).setUp() super(ApiTestBase, self).setUp()

View File

@ -19,7 +19,7 @@ from xml.etree import ElementTree
from ..testbase import TestBase from ..testbase import TestBase
class ApiSetupTestCase(TestBase): class ApiSetupTestCase(TestBase):
__module_to_test__ = 'supysonic.api' __with_api__ = True
def __basic_auth_get(self, username, password): def __basic_auth_get(self, username, password):
hashed = base64.b64encode('{}:{}'.format(username, password)) hashed = base64.b64encode('{}:{}'.format(username, password))

View File

@ -9,24 +9,20 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
import unittest, sys import unittest
import simplejson import simplejson
from xml.etree import ElementTree from xml.etree import ElementTree
from ..appmock import AppMock from ..testbase import TestBase
class ResponseHelperBaseCase(unittest.TestCase): class ResponseHelperBaseCase(TestBase):
def setUp(self): def setUp(self):
sys.modules[u'supysonic.web'] = AppMock(with_store = False) super(ResponseHelperBaseCase, self).setUp()
from supysonic.api import ResponseHelper from supysonic.api import ResponseHelper
self.helper = ResponseHelper self.helper = ResponseHelper
def tearDown(self):
to_unload = [ m for m in sys.modules if m.startswith('supysonic') ]
for m in to_unload:
del sys.modules[m]
class ResponseHelperJsonTestCase(ResponseHelperBaseCase): class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
def serialize_and_deserialize(self, d, error = False): def serialize_and_deserialize(self, d, error = False):
if not isinstance(d, dict): if not isinstance(d, dict):

View File

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import io
from flask import Flask
from supysonic.db import get_store
class AppMock(object):
def __init__(self, with_store = True):
self.app = Flask(__name__, template_folder = '../supysonic/templates')
self.app.testing = True
self.app.secret_key = 'Testing secret'
if with_store:
self.store = get_store('sqlite:')
with io.open('schema/sqlite.sql', 'r') as sql:
schema = sql.read()
for statement in schema.split(';'):
self.store.execute(statement)
else:
self.store = None

View File

@ -11,6 +11,8 @@
from ..testbase import TestBase from ..testbase import TestBase
class FrontendTestBase(TestBase): class FrontendTestBase(TestBase):
__with_webui__ = True
def _login(self, username, password): def _login(self, username, password):
return self.client.post('/user/login', data = { 'user': username, 'password': password }, follow_redirects = True) return self.client.post('/user/login', data = { 'user': username, 'password': password }, follow_redirects = True)

View File

@ -16,8 +16,6 @@ from supysonic.db import Folder
from .frontendtestbase import FrontendTestBase from .frontendtestbase import FrontendTestBase
class FolderTestCase(FrontendTestBase): class FolderTestCase(FrontendTestBase):
__module_to_test__ = 'supysonic.frontend'
def test_index(self): def test_index(self):
self._login('bob', 'B0b') self._login('bob', 'B0b')
rv = self.client.get('/folder', follow_redirects = True) rv = self.client.get('/folder', follow_redirects = True)

View File

@ -17,8 +17,6 @@ from supysonic.db import User
from .frontendtestbase import FrontendTestBase from .frontendtestbase import FrontendTestBase
class LoginTestCase(FrontendTestBase): class LoginTestCase(FrontendTestBase):
__module_to_test__ = 'supysonic.frontend'
def test_unauthorized_request(self): def test_unauthorized_request(self):
# Unauthorized request # Unauthorized request
rv = self.client.get('/', follow_redirects=True) rv = self.client.get('/', follow_redirects=True)

View File

@ -16,8 +16,6 @@ from supysonic.db import Folder, Artist, Album, Track, Playlist, User
from .frontendtestbase import FrontendTestBase from .frontendtestbase import FrontendTestBase
class PlaylistTestCase(FrontendTestBase): class PlaylistTestCase(FrontendTestBase):
__module_to_test__ = 'supysonic.frontend'
def setUp(self): def setUp(self):
super(PlaylistTestCase, self).setUp() super(PlaylistTestCase, self).setUp()

View File

@ -16,8 +16,6 @@ from supysonic.db import User, ClientPrefs
from .frontendtestbase import FrontendTestBase from .frontendtestbase import FrontendTestBase
class UserTestCase(FrontendTestBase): class UserTestCase(FrontendTestBase):
__module_to_test__ = 'supysonic.frontend'
def setUp(self): def setUp(self):
super(UserTestCase, self).setUp() super(UserTestCase, self).setUp()

View File

@ -8,29 +8,57 @@
# #
# Distributed under terms of the GNU AGPLv3 license. # Distributed under terms of the GNU AGPLv3 license.
import importlib import io
import unittest
import sys import sys
import unittest
from supysonic.config import DefaultConfig
from supysonic.managers.user import UserManager from supysonic.managers.user import UserManager
from supysonic.web import create_application, store
from .appmock import AppMock class TestConfig(DefaultConfig):
TESTING = True
LOGGER_HANDLER_POLICY = 'never'
BASE = {
'database_uri': 'sqlite:',
'scanner_extensions': None
}
MIMETYPES = {
'mp3': 'audio/mpeg',
'weirdextension': 'application/octet-stream'
}
def __init__(self, with_webui, with_api):
super(TestConfig, self).__init__
self.WEBAPP.update({
'mount_webui': with_webui,
'mount_api': with_api
})
class TestBase(unittest.TestCase): class TestBase(unittest.TestCase):
def setUp(self): __with_webui__ = False
app_mock = AppMock() __with_api__ = False
self.app = app_mock.app
self.store = app_mock.store
self.client = self.app.test_client()
sys.modules['supysonic.web'] = app_mock def setUp(self):
importlib.import_module(self.__module_to_test__) app = create_application(TestConfig(self.__with_webui__, self.__with_api__))
self.__ctx = app.app_context()
self.__ctx.push()
self.store = store
with io.open('schema/sqlite.sql', 'r') as sql:
schema = sql.read()
for statement in schema.split(';'):
self.store.execute(statement)
self.client = app.test_client()
UserManager.add(self.store, 'alice', 'Alic3', 'test@example.com', True) UserManager.add(self.store, 'alice', 'Alic3', 'test@example.com', True)
UserManager.add(self.store, 'bob', 'B0b', 'bob@example.com', False) UserManager.add(self.store, 'bob', 'B0b', 'bob@example.com', False)
def tearDown(self): def tearDown(self):
self.store.close() self.__ctx.pop()
to_unload = [ m for m in sys.modules if m.startswith('supysonic') ] to_unload = [ m for m in sys.modules if m.startswith('supysonic') ]
for m in to_unload: for m in to_unload:
del sys.modules[m] del sys.modules[m]