1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-09-19 10:51:04 +00:00

More formatter refactoring

This commit is contained in:
spl0k 2018-02-11 12:40:10 +01:00
parent 27b9c232c2
commit 005ae4803b
12 changed files with 305 additions and 302 deletions

View File

@ -30,8 +30,7 @@ from pony.orm import db_session, ObjectNotFound
from ..managers.user import UserManager
from ..py23 import dict
from .formatters import make_json_response, make_jsonp_response, make_xml_response
from .formatters import make_error_response_func
from .formatters import JSONFormatter, JSONPFormatter, XMLFormatter
api = Blueprint('api', __name__)
@ -40,13 +39,11 @@ def set_formatter():
"""Return a function to create the response."""
f, callback = map(request.values.get, ['f', 'callback'])
if f == 'jsonp':
request.formatter = lambda x, **kwargs: make_jsonp_response(x, callback, kwargs)
request.formatter = JSONPFormatter(callback)
elif f == 'json':
request.formatter = make_json_response
request.formatter = JSONFormatter()
else:
request.formatter = make_xml_response
request.error_formatter = make_error_response_func(request.formatter)
request.formatter = XMLFormatter()
def decode_password(password):
if not password.startswith('enc:'):
@ -59,7 +56,7 @@ def decode_password(password):
@api.before_request
def authorize():
error = request.error_formatter(40, 'Unauthorized'), 401
error = request.formatter.error(40, 'Unauthorized'), 401
if request.authorization:
status, user = UserManager.try_auth(request.authorization.username, request.authorization.password)
@ -83,7 +80,7 @@ def authorize():
@api.before_request
def get_client_prefs():
if 'c' not in request.values:
return request.error_formatter(10, 'Missing required parameter')
return request.formatter.error(10, 'Missing required parameter')
client = request.values.get('c')
with db_session:
@ -97,21 +94,21 @@ def get_client_prefs():
#@api.errorhandler(404)
@api.route('/<path:invalid>', methods = [ 'GET', 'POST' ]) # blueprint 404 workaround
def not_found(*args, **kwargs):
return request.error_formatter(0, 'Not implemented'), 501
return request.formatter.error(0, 'Not implemented'), 501
def get_entity(cls, param = 'id'):
eid = request.values.get(param)
if not eid:
return False, request.error_formatter(10, 'Missing %s id' % cls.__name__)
return False, request.formatter.error(10, 'Missing %s id' % cls.__name__)
try:
eid = uuid.UUID(eid)
entity = cls[eid]
return True, entity
except ValueError:
return False, request.error_formatter(0, 'Invalid %s id' % cls.__name__)
return False, request.formatter.error(0, 'Invalid %s id' % cls.__name__)
except ObjectNotFound:
return False, (request.error_formatter(70, '%s not found' % cls.__name__), 404)
return False, (request.formatter.error(70, '%s not found' % cls.__name__), 404)
from .system import *
from .browse import *

View File

@ -41,7 +41,7 @@ def rand_songs():
toYear = int(toYear) if toYear else None
fid = uuid.UUID(musicFolderId) if musicFolderId else None
except ValueError:
return request.error_formatter(0, 'Invalid parameter format')
return request.formatter.error(0, 'Invalid parameter format')
query = Track.select()
if fromYear:
@ -53,35 +53,31 @@ def rand_songs():
if fid:
with db_session:
if not Folder.exists(id = fid, root = True):
return request.error_formatter(70, 'Unknown folder')
return request.formatter.error(70, 'Unknown folder')
query = query.filter(lambda t: t.root_folder.id == fid)
with db_session:
return request.formatter(dict(
randomSongs = dict(
song = [ t.as_subsonic_child(request.user, request.client) for t in query.random(size) ]
)
return request.formatter('randomSongs', dict(
song = [ t.as_subsonic_child(request.user, request.client) for t in query.random(size) ]
))
@api.route('/getAlbumList.view', methods = [ 'GET', 'POST' ])
def album_list():
ltype, size, offset = map(request.values.get, [ 'type', 'size', 'offset' ])
if not ltype:
return request.error_formatter(10, 'Missing type')
return request.formatter.error(10, 'Missing type')
try:
size = int(size) if size else 10
offset = int(offset) if offset else 0
except ValueError:
return request.error_formatter(0, 'Invalid parameter format')
return request.formatter.error(0, 'Invalid parameter format')
query = select(t.folder for t in Track)
if ltype == 'random':
with db_session:
return request.formatter(dict(
albumList = dict(
album = [ a.as_subsonic_child(request.user) for a in query.random(size) ]
)
return request.formatter('albumList', dict(
album = [ a.as_subsonic_child(request.user) for a in query.random(size) ]
))
elif ltype == 'newest':
query = query.order_by(desc(Folder.created))
@ -98,33 +94,29 @@ def album_list():
elif ltype == 'alphabeticalByArtist':
query = query.order_by(lambda f: f.parent.name + f.name)
else:
return request.error_formatter(0, 'Unknown search type')
return request.formatter.error(0, 'Unknown search type')
with db_session:
return request.formatter(dict(
albumList = dict(
album = [ f.as_subsonic_child(request.user) for f in query.limit(size, offset) ]
)
return request.formatter('albumList', dict(
album = [ f.as_subsonic_child(request.user) for f in query.limit(size, offset) ]
))
@api.route('/getAlbumList2.view', methods = [ 'GET', 'POST' ])
def album_list_id3():
ltype, size, offset = map(request.values.get, [ 'type', 'size', 'offset' ])
if not ltype:
return request.error_formatter(10, 'Missing type')
return request.formatter.error(10, 'Missing type')
try:
size = int(size) if size else 10
offset = int(offset) if offset else 0
except ValueError:
return request.error_formatter(0, 'Invalid parameter format')
return request.formatter.error(0, 'Invalid parameter format')
query = Album.select()
if ltype == 'random':
with db_session:
return request.formatter(dict(
albumList2 = dict(
album = [ a.as_subsonic_album(request.user) for a in query.random(size) ]
)
return request.formatter('albumList2', dict(
album = [ a.as_subsonic_album(request.user) for a in query.random(size) ]
))
elif ltype == 'newest':
query = query.order_by(lambda a: desc(min(a.tracks.created)))
@ -139,13 +131,11 @@ def album_list_id3():
elif ltype == 'alphabeticalByArtist':
query = query.order_by(lambda a: a.artist.name + a.name)
else:
return request.error_formatter(0, 'Unknown search type')
return request.formatter.error(0, 'Unknown search type')
with db_session:
return request.formatter(dict(
albumList2 = dict(
album = [ f.as_subsonic_album(request.user) for f in query.limit(size, offset) ]
)
return request.formatter('albumList2', dict(
album = [ f.as_subsonic_album(request.user) for f in query.limit(size, offset) ]
))
@api.route('/getNowPlaying.view', methods = [ 'GET', 'POST' ])
@ -153,13 +143,11 @@ def album_list_id3():
def now_playing():
query = User.select(lambda u: u.last_play is not None and u.last_play_date + timedelta(minutes = 3) > now())
return request.formatter(dict(
nowPlaying = dict(
entry = [ dict(
u.last_play.as_subsonic_child(request.user, request.client),
username = u.name, minutesAgo = (now() - u.last_play_date).seconds / 60, playerId = 0
) for u in query ]
)
return request.formatter('nowPlaying', dict(
entry = [ dict(
u.last_play.as_subsonic_child(request.user, request.client),
username = u.name, minutesAgo = (now() - u.last_play_date).seconds / 60, playerId = 0
) for u in query ]
))
@api.route('/getStarred.view', methods = [ 'GET', 'POST' ])
@ -167,22 +155,18 @@ def now_playing():
def get_starred():
folders = select(s.starred for s in StarredFolder if s.user.id == request.user.id)
return request.formatter(dict(
starred = dict(
artist = [ dict(id = str(sf.id), name = sf.name) for sf in folders.filter(lambda f: count(f.tracks) == 0) ],
album = [ sf.as_subsonic_child(request.user) for sf in folders.filter(lambda f: count(f.tracks) > 0) ],
song = [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
)
return request.formatter('starred', dict(
artist = [ dict(id = str(sf.id), name = sf.name) for sf in folders.filter(lambda f: count(f.tracks) == 0) ],
album = [ sf.as_subsonic_child(request.user) for sf in folders.filter(lambda f: count(f.tracks) > 0) ],
song = [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
))
@api.route('/getStarred2.view', methods = [ 'GET', 'POST' ])
@db_session
def get_starred_id3():
return request.formatter(dict(
starred2 = dict(
artist = [ sa.as_subsonic_artist(request.user) for sa in select(s.starred for s in StarredArtist if s.user.id == request.user.id) ],
album = [ sa.as_subsonic_album(request.user) for sa in select(s.starred for s in StarredAlbum if s.user.id == request.user.id) ],
song = [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
)
return request.formatter('starred2', dict(
artist = [ sa.as_subsonic_artist(request.user) for sa in select(s.starred for s in StarredArtist if s.user.id == request.user.id) ],
album = [ sa.as_subsonic_album(request.user) for sa in select(s.starred for s in StarredAlbum if s.user.id == request.user.id) ],
song = [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
))

View File

@ -93,7 +93,7 @@ def star():
id, albumId, artistId = map(request.values.getlist, [ 'id', 'albumId', 'artistId' ])
if not id and not albumId and not artistId:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
errors = []
for eid in id:
@ -109,14 +109,16 @@ def star():
errors.append(try_star(Artist, StarredArtist, arId))
error = merge_errors(errors)
return request.formatter(dict(error = error), error = True) if error else request.formatter(dict())
if error:
return request.formatter('error', error)
return request.formatter.empty
@api.route('/unstar.view', methods = [ 'GET', 'POST' ])
def unstar():
id, albumId, artistId = map(request.values.getlist, [ 'id', 'albumId', 'artistId' ])
if not id and not albumId and not artistId:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
errors = []
for eid in id:
@ -132,22 +134,24 @@ def unstar():
errors.append(try_unstar(StarredArtist, arId))
error = merge_errors(errors)
return request.formatter(dict(error = error), error = True) if error else request.formatter(dict())
if error:
return request.formatter('error', error)
return request.formatter.empty
@api.route('/setRating.view', methods = [ 'GET', 'POST' ])
def rate():
id, rating = map(request.values.get, [ 'id', 'rating' ])
if not id or not rating:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
try:
uid = uuid.UUID(id)
rating = int(rating)
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
if not 0 <= rating <= 5:
return request.error_formatter(0, 'rating must be between 0 and 5 (inclusive)')
return request.formatter.error(0, 'rating must be between 0 and 5 (inclusive)')
with db_session:
if rating == 0:
@ -162,7 +166,7 @@ def rate():
rated = Folder[uid]
rating_cls = RatingFolder
except ObjectNotFound:
return request.error_formatter(70, 'Unknown id')
return request.formatter.error(70, 'Unknown id')
try:
rating_info = rating_cls[request.user.id, uid]
@ -170,7 +174,7 @@ def rate():
except ObjectNotFound:
rating_cls(user = User[request.user.id], rated = rated, rating = rating)
return request.formatter(dict())
return request.formatter.empty
@api.route('/scrobble.view', methods = [ 'GET', 'POST' ])
@db_session
@ -185,7 +189,7 @@ def scrobble():
try:
t = int(t) / 1000
except ValueError:
return request.error_formatter(0, 'Invalid time value')
return request.formatter.error(0, 'Invalid time value')
else:
t = int(time.time())
@ -196,5 +200,5 @@ def scrobble():
else:
lfm.now_playing(res)
return request.formatter(dict())
return request.formatter.empty

View File

@ -33,13 +33,11 @@ from . import api, get_entity
@api.route('/getMusicFolders.view', methods = [ 'GET', 'POST' ])
@db_session
def list_folders():
return request.formatter(dict(
musicFolders = dict(
musicFolder = [ dict(
id = str(f.id),
name = f.name
) for f in Folder.select(lambda f: f.root).order_by(Folder.name) ]
)
return request.formatter('musicFolders', dict(
musicFolder = [ dict(
id = str(f.id),
name = f.name
) for f in Folder.select(lambda f: f.root).order_by(Folder.name) ]
))
@api.route('/getIndexes.view', methods = [ 'GET', 'POST' ])
@ -51,7 +49,7 @@ def list_indexes():
try:
ifModifiedSince = int(ifModifiedSince) / 1000
except ValueError:
return request.error_formatter(0, 'Invalid timestamp')
return request.formatter.error(0, 'Invalid timestamp')
if musicFolderId is None:
folders = Folder.select(lambda f: f.root)[:]
@ -60,16 +58,16 @@ def list_indexes():
mfid = uuid.UUID(musicFolderId)
folder = Folder[mfid]
except ValueError:
return request.error_formatter(0, 'Invalid id')
return request.formatter.error(0, 'Invalid id')
except ObjectNotFound:
return request.error_formatter(70, 'Folder not found')
return request.formatter.error(70, 'Folder not found')
if not folder.root:
return request.error_formatter(70, 'Folder not found')
return request.formatter.error(70, 'Folder not found')
folders = [ folder ]
last_modif = max(map(lambda f: f.last_scan, folders))
if ifModifiedSince is not None and last_modif < ifModifiedSince:
return request.formatter(dict(indexes = dict(lastModified = last_modif * 1000)))
return request.formatter('indexes', dict(lastModified = last_modif * 1000))
# The XSD lies, we don't return artists but a directory structure
artists = []
@ -91,18 +89,16 @@ def list_indexes():
indexes[index].append(artist)
return request.formatter(dict(
indexes = dict(
lastModified = last_modif * 1000,
index = [ dict(
name = k,
artist = [ dict(
id = str(a.id),
name = a.name
) for a in sorted(v, key = lambda a: a.name.lower()) ]
) for k, v in sorted(indexes.items()) ],
child = [ c.as_subsonic_child(request.user, request.client) for c in sorted(children, key = lambda t: t.sort_key()) ]
)
return request.formatter('indexes', dict(
lastModified = last_modif * 1000,
index = [ dict(
name = k,
artist = [ dict(
id = str(a.id),
name = a.name
) for a in sorted(v, key = lambda a: a.name.lower()) ]
) for k, v in sorted(indexes.items()) ],
child = [ c.as_subsonic_child(request.user, request.client) for c in sorted(children, key = lambda t: t.sort_key()) ]
))
@api.route('/getMusicDirectory.view', methods = [ 'GET', 'POST' ])
@ -120,7 +116,7 @@ def show_directory():
if not res.root:
directory['parent'] = str(res.parent.id)
return request.formatter(dict(directory = directory))
return request.formatter('directory', directory)
@api.route('/getArtists.view', methods = [ 'GET', 'POST' ])
@db_session
@ -139,13 +135,11 @@ def list_artists():
indexes[index].append(artist)
return request.formatter(dict(
artists = dict(
index = [ dict(
name = k,
artist = [ a.as_subsonic_artist(request.user) for a in sorted(v, key = lambda a: a.name.lower()) ]
) for k, v in sorted(indexes.items()) ]
)
return request.formatter('artists', dict(
index = [ dict(
name = k,
artist = [ a.as_subsonic_artist(request.user) for a in sorted(v, key = lambda a: a.name.lower()) ]
) for k, v in sorted(indexes.items()) ]
))
@api.route('/getArtist.view', methods = [ 'GET', 'POST' ])
@ -160,7 +154,7 @@ def artist_info():
albums |= { t.album for t in res.tracks }
info['album'] = [ a.as_subsonic_album(request.user) for a in sorted(albums, key = lambda a: a.sort_key()) ]
return request.formatter(dict(artist = info))
return request.formatter('artist', info)
@api.route('/getAlbum.view', methods = [ 'GET', 'POST' ])
@db_session
@ -172,7 +166,7 @@ def album_info():
info = res.as_subsonic_album(request.user)
info['song'] = [ t.as_subsonic_child(request.user, request.client) for t in sorted(res.tracks, key = lambda t: t.sort_key()) ]
return request.formatter(dict(album = info))
return request.formatter('album', info)
@api.route('/getSong.view', methods = [ 'GET', 'POST' ])
@db_session
@ -181,9 +175,9 @@ def track_info():
if not status:
return res
return request.formatter(dict(song = res.as_subsonic_child(request.user, request.client)))
return request.formatter('song', res.as_subsonic_child(request.user, request.client))
@api.route('/getVideos.view', methods = [ 'GET', 'POST' ])
def list_videos():
return request.error_formatter(0, 'Video streaming not supported')
return request.formatter.error(0, 'Video streaming not supported')

View File

@ -31,23 +31,23 @@ def get_chat():
try:
since = int(since) / 1000 if since else None
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
with db_session:
query = ChatMessage.select().order_by(ChatMessage.time)
if since:
query = query.filter(lambda m: m.time > since)
return request.formatter(dict(chatMessages = dict(chatMessage = [ msg.responsize() for msg in query ] )))
return request.formatter('chatMessages', dict(chatMessage = [ msg.responsize() for msg in query ] ))
@api.route('/addChatMessage.view', methods = [ 'GET', 'POST' ])
def add_chat_message():
msg = request.values.get('message')
if not msg:
return request.error_formatter(10, 'Missing message')
return request.formatter.error(10, 'Missing message')
with db_session:
ChatMessage(user = User[request.user.id], message = msg)
return request.formatter(dict())
return request.formatter.empty

View File

@ -14,106 +14,132 @@ from xml.etree import ElementTree
from ..py23 import dict, strtype
from . import API_VERSION
def remove_empty_lists(d):
if not isinstance(d, dict):
raise TypeError('Expecting a dict got ' + type(d).__name__)
class BaseFormatter(object):
def make_response(self, elem, data):
raise NotImplementedError()
keys_to_remove = []
for key, value in d.items():
if isinstance(value, dict):
d[key] = remove_empty_lists(value)
elif isinstance(value, list):
if len(value) == 0:
keys_to_remove.append(key)
else:
d[key] = [ remove_empty_lists(item) if isinstance(item, dict) else item for item in value ]
def make_error(self, code, message):
return self.make_response('error', dict(code = code, message = message))
for key in keys_to_remove:
del d[key]
def make_empty(self):
return self.make_response(None, None)
return d
def __call__(self, *args, **kwargs):
return self.make_response(*args, **kwargs)
def subsonicify(response, error):
rv = remove_empty_lists(response)
error = make_error
empty = property(make_empty)
# add headers to response
rv.update(
status = 'failed' if error else 'ok',
version = API_VERSION
)
return { 'subsonic-response': rv }
class JSONBaseFormatter(BaseFormatter):
def __remove_empty_lists(self, d):
if not isinstance(d, dict):
raise TypeError('Expecting a dict got ' + type(d).__name__)
def dict2xml(elem, dictionary):
"""Convert a json structure to xml. The game is trivial. Nesting uses the [] parenthesis.
ex. { 'musicFolder': {'id': 1234, 'name': "sss" } }
ex. { 'musicFolder': [{'id': 1234, 'name': "sss" }, {'id': 456, 'name': "aaa" }]}
ex. { 'musicFolders': {'musicFolder' : [{'id': 1234, 'name': "sss" }, {'id': 456, 'name': "aaa" }] } }
ex. { 'index': [{'name': 'A', 'artist': [{'id': '517674445', 'name': 'Antonello Venditti'}] }] }
ex. {"subsonic-response": { "musicFolders": {"musicFolder": [{ "id": 0,"name": "Music"}]},
"status": "ok","version": "1.7.0","xmlns": "http://subsonic.org/restapi"}}
"""
if not isinstance(dictionary, dict):
raise TypeError('Expecting a dict')
if not all(map(lambda x: isinstance(x, strtype), dictionary)):
raise TypeError('Dictionary keys must be strings')
for name, value in dictionary.items():
if name == '_value_':
elem.text = value_tostring(value)
elif isinstance(value, dict):
subelem = ElementTree.SubElement(elem, name)
dict2xml(subelem, value)
elif isinstance(value, list):
for v in value:
subelem = ElementTree.SubElement(elem, name)
if isinstance(v, dict):
dict2xml(subelem, v)
keys_to_remove = []
for key, value in d.items():
if isinstance(value, dict):
d[key] = self.__remove_empty_lists(value)
elif isinstance(value, list):
if len(value) == 0:
keys_to_remove.append(key)
else:
subelem.text = value_tostring(v)
else:
elem.set(name, value_tostring(value))
d[key] = [ self.__remove_empty_lists(item) if isinstance(item, dict) else item for item in value ]
def value_tostring(value):
if value is None:
return None
if isinstance(value, strtype):
return value
if isinstance(value, bool):
return str(value).lower()
return str(value)
for key in keys_to_remove:
del d[key]
def make_json_response(response, error = False):
rv = jsonify(subsonicify(response, error))
rv.headers.add('Access-Control-Allow-Origin', '*')
return rv
return d
def make_jsonp_response(response, callback, error = False):
if not callback:
return make_json_response(dict(error = dict(code = 10, message = 'Missing callback')), error = True)
def _subsonicify(self, elem, data):
if (elem is None) != (data is None):
raise ValueError('Expecting both elem and data or neither of them')
rv = subsonicify(response, error)
rv = '{}({})'.format(callback, json.dumps(rv))
rv = make_response(rv)
rv.mimetype = 'application/javascript'
return rv
rv = {
'status': 'failed' if elem is 'error' else 'ok',
'version': API_VERSION
}
if data:
rv[elem] = self.__remove_empty_lists(data)
def make_xml_response(response, error = False):
response.update(
status = 'failed' if error else 'ok',
version = API_VERSION,
xmlns = "http://subsonic.org/restapi"
)
return { 'subsonic-response': rv }
elem = ElementTree.Element('subsonic-response')
dict2xml(elem, response)
class JSONFormatter(JSONBaseFormatter):
def make_response(self, elem, data):
rv = jsonify(self._subsonicify(elem, data))
rv.headers.add('Access-Control-Allow-Origin', '*')
return rv
rv = minidom.parseString(ElementTree.tostring(elem)).toprettyxml(indent = ' ')
rv = make_response(rv)
rv.mimetype = 'text/xml'
return rv
class JSONPFormatter(JSONBaseFormatter):
def __init__(self, callback):
self.__callback = callback
def make_error_response_func(f):
def make_error_response(code, message):
return f(dict(error = dict(code = code, message = message)), error = True)
return make_error_response
def make_response(self, elem, data):
if not self.__callback:
return jsonify(self._subsonicify('error', dict(code = 10, message = 'Missing callback')))
rv = self._subsonicify(elem, data)
rv = '{}({})'.format(self.__callback, json.dumps(rv))
rv = make_response(rv)
rv.mimetype = 'application/javascript'
return rv
class XMLFormatter(BaseFormatter):
def __dict2xml(self, elem, dictionary):
"""Convert a dict structure to xml. The game is trivial. Nesting uses the [] parenthesis.
ex. { 'musicFolder': {'id': 1234, 'name': "sss" } }
ex. { 'musicFolder': [{'id': 1234, 'name': "sss" }, {'id': 456, 'name': "aaa" }]}
ex. { 'musicFolders': {'musicFolder' : [{'id': 1234, 'name': "sss" }, {'id': 456, 'name': "aaa" }] } }
ex. { 'index': [{'name': 'A', 'artist': [{'id': '517674445', 'name': 'Antonello Venditti'}] }] }
ex. {"subsonic-response": { "musicFolders": {"musicFolder": [{ "id": 0,"name": "Music"}]},
"status": "ok","version": "1.7.0","xmlns": "http://subsonic.org/restapi"}}
"""
if not isinstance(dictionary, dict):
raise TypeError('Expecting a dict')
if not all(map(lambda x: isinstance(x, strtype), dictionary)):
raise TypeError('Dictionary keys must be strings')
for name, value in dictionary.items():
if name == '_value_':
elem.text = self.__value_tostring(value)
elif isinstance(value, dict):
subelem = ElementTree.SubElement(elem, name)
self.__dict2xml(subelem, value)
elif isinstance(value, list):
for v in value:
subelem = ElementTree.SubElement(elem, name)
if isinstance(v, dict):
self.__dict2xml(subelem, v)
else:
subelem.text = self.__value_tostring(v)
else:
elem.set(name, self.__value_tostring(value))
def __value_tostring(self, value):
if value is None:
return None
if isinstance(value, strtype):
return value
if isinstance(value, bool):
return str(value).lower()
return str(value)
def make_response(self, elem, data):
if (elem is None) != (data is None):
raise ValueError('Expecting both elem and data or neither of them')
response = {
'status': 'failed' if elem is 'error' else 'ok',
'version': API_VERSION,
'xmlns': "http://subsonic.org/restapi"
}
if elem:
response[elem] = data
root = ElementTree.Element('subsonic-response')
self.__dict2xml(root, response)
rv = minidom.parseString(ElementTree.tostring(root)).toprettyxml(indent = ' ')
rv = make_response(rv)
rv.mimetype = 'text/xml'
return rv

View File

@ -72,7 +72,7 @@ def stream_media():
try:
maxBitRate = int(maxBitRate)
except ValueError:
return request.error_formatter(0, 'Invalid bitrate value')
return request.formatter.error(0, 'Invalid bitrate value')
if dst_bitrate > maxBitRate and maxBitRate != 0:
dst_bitrate = maxBitRate
@ -91,7 +91,7 @@ def stream_media():
if not transcoder:
message = 'No way to transcode from {} to {}'.format(src_suffix, dst_suffix)
current_app.logger.info(message)
return request.error_formatter(0, message)
return request.formatter.error(0, message)
transcoder, decoder, encoder = map(lambda x: prepare_transcoding_cmdline(x, res.path, src_suffix, dst_suffix, dst_bitrate), [ transcoder, decoder, encoder ])
try:
@ -102,7 +102,7 @@ def stream_media():
dec_proc = subprocess.Popen(decoder, stdout = subprocess.PIPE)
proc = subprocess.Popen(encoder, stdin = dec_proc.stdout, stdout = subprocess.PIPE)
except OSError:
return request.error_formatter(0, 'Error while running the transcoding process')
return request.formatter.error(0, 'Error while running the transcoding process')
def transcode():
try:
@ -150,14 +150,14 @@ def cover_art():
return res
if not res.has_cover_art or not os.path.isfile(os.path.join(res.path, 'cover.jpg')):
return request.error_formatter(70, 'Cover art not found')
return request.formatter.error(70, 'Cover art not found')
size = request.values.get('size')
if size:
try:
size = int(size)
except ValueError:
return request.error_formatter(0, 'Invalid size value')
return request.formatter.error(0, 'Invalid size value')
else:
return send_file(os.path.join(res.path, 'cover.jpg'))
@ -180,9 +180,9 @@ def cover_art():
def lyrics():
artist, title = map(request.values.get, [ 'artist', 'title' ])
if not artist:
return request.error_formatter(10, 'Missing artist parameter')
return request.formatter.error(10, 'Missing artist parameter')
if not title:
return request.error_formatter(10, 'Missing title parameter')
return request.formatter.error(10, 'Missing title parameter')
with db_session:
query = Track.select(lambda t: title in t.title and artist in t.artist.name)
@ -199,11 +199,11 @@ def lyrics():
current_app.logger.warning('Unsupported encoding for lyrics file ' + lyrics_path)
continue
return request.formatter(dict(lyrics = dict(
return request.formatter('lyrics', dict(
artist = track.album.artist.name,
title = track.title,
_value_ = lyrics
)))
))
try:
r = requests.get("http://api.chartlyrics.com/apiv1.asmx/SearchLyricDirect",
@ -211,15 +211,15 @@ def lyrics():
root = ElementTree.fromstring(r.content)
ns = { 'cl': 'http://api.chartlyrics.com/' }
return request.formatter(dict(lyrics = dict(
return request.formatter('lyrics', dict(
artist = root.find('cl:LyricArtist', namespaces = ns).text,
title = root.find('cl:LyricSong', namespaces = ns).text,
_value_ = root.find('cl:Lyric', namespaces = ns).text
)))
))
except requests.exceptions.RequestException as e:
current_app.logger.warning('Error while requesting the ChartLyrics API: ' + str(e))
return request.formatter(dict(lyrics = dict()))
return request.formatter('lyrics', dict())
def read_file_as_unicode(path):
""" Opens a file trying with different encodings and returns the contents as a unicode string """

View File

@ -36,17 +36,17 @@ def list_playlists():
username = request.values.get('username')
if username:
if not request.user.admin:
return request.error_formatter(50, 'Restricted to admins')
return request.formatter.error(50, 'Restricted to admins')
with db_session:
user = User.get(name = username)
if user is None:
return request.error_formatter(70, 'No such user')
return request.formatter.error(70, 'No such user')
query = Playlist.select(lambda p: p.user.name == username).order_by(Playlist.name)
with db_session:
return request.formatter(dict(playlists = dict(playlist = [ p.as_subsonic_playlist(request.user) for p in query ] )))
return request.formatter('playlists', dict(playlist = [ p.as_subsonic_playlist(request.user) for p in query ] ))
@api.route('/getPlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
@ -56,11 +56,11 @@ def show_playlist():
return res
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter('50', 'Private playlist')
return request.formatter.error('50', 'Private playlist')
info = res.as_subsonic_playlist(request.user)
info['entry'] = [ t.as_subsonic_child(request.user, request.client) for t in res.get_tracks() ]
return request.formatter(dict(playlist = info))
return request.formatter('playlist', info)
@api.route('/createPlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
@ -71,16 +71,16 @@ def create_playlist():
try:
playlist_id = uuid.UUID(playlist_id) if playlist_id else None
except ValueError:
return request.error_formatter(0, 'Invalid playlist id')
return request.formatter.error(0, 'Invalid playlist id')
if playlist_id:
try:
playlist = Playlist[playlist_id]
except ObjectNotFound:
return request.error_formatter(70, 'Unknwon playlist')
return request.formatter.error(70, 'Unknwon playlist')
if playlist.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to modify a playlist that isn't yours")
return request.formatter.error(50, "You're not allowed to modify a playlist that isn't yours")
playlist.clear()
if name:
@ -88,7 +88,7 @@ def create_playlist():
elif name:
playlist = Playlist(user = User[request.user.id], name = name)
else:
return request.error_formatter(10, 'Missing playlist id or name')
return request.formatter.error(10, 'Missing playlist id or name')
try:
songs = map(uuid.UUID, songs)
@ -97,12 +97,12 @@ def create_playlist():
playlist.add(track)
except ValueError:
rollback()
return request.error_formatter(0, 'Invalid song id')
return request.formatter.error(0, 'Invalid song id')
except ObjectNotFound:
rollback()
return request.error_formatter(70, 'Unknown song')
return request.formatter.error(70, 'Unknown song')
return request.formatter(dict())
return request.formatter.empty
@api.route('/deletePlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
@ -112,10 +112,10 @@ def delete_playlist():
return res
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to delete a playlist that isn't yours")
return request.formatter.error(50, "You're not allowed to delete a playlist that isn't yours")
res.delete()
return request.formatter(dict())
return request.formatter.empty
@api.route('/updatePlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
@ -125,7 +125,7 @@ def update_playlist():
return res
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to delete a playlist that isn't yours")
return request.formatter.error(50, "You're not allowed to delete a playlist that isn't yours")
playlist = res
name, comment, public = map(request.values.get, [ 'name', 'comment', 'public' ])
@ -148,9 +148,9 @@ def update_playlist():
playlist.remove_at_indexes(to_remove)
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
except ObjectNotFound:
return request.error_formatter(70, 'Unknown song')
return request.formatter.error(70, 'Unknown song')
return request.formatter(dict())
return request.formatter.empty

View File

@ -35,7 +35,7 @@ def old_search():
offset = int(offset) if offset else 0
newer_than = int(newer_than) / 1000 if newer_than else 0
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
min_date = datetime.fromtimestamp(newer_than)
@ -56,20 +56,20 @@ def old_search():
tend = offset + count - fcount
res += tracks[toff : tend]
return request.formatter(dict(searchResult = dict(
return request.formatter('searchResult', dict(
totalHits = folders.count() + tracks.count(),
offset = offset,
match = [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.client) for r in res ]
)))
))
else:
return request.error_formatter(10, 'Missing search parameter')
return request.formatter.error(10, 'Missing search parameter')
with db_session:
return request.formatter(dict(searchResult = dict(
return request.formatter('searchResult', dict(
totalHits = query.count(),
offset = offset,
match = [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.client) for r in query[offset : offset + count] ]
)))
))
@api.route('/search2.view', methods = [ 'GET', 'POST' ])
def new_search():
@ -84,21 +84,21 @@ def new_search():
song_count = int(song_count) if song_count else 20
song_offset = int(song_offset) if song_offset else 0
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
if not query:
return request.error_formatter(10, 'Missing query parameter')
return request.formatter.error(10, 'Missing query parameter')
with db_session:
artists = select(t.folder.parent for t in Track if query in t.folder.parent.name).limit(artist_count, artist_offset)
albums = select(t.folder for t in Track if query in t.folder.name).limit(album_count, album_offset)
songs = Track.select(lambda t: query in t.title).limit(song_count, song_offset)
return request.formatter(dict(searchResult2 = OrderedDict((
return request.formatter('searchResult2', OrderedDict((
('artist', [ dict(id = str(a.id), name = a.name) for a in artists ]),
('album', [ f.as_subsonic_child(request.user) for f in albums ]),
('song', [ t.as_subsonic_child(request.user, request.client) for t in songs ])
))))
)))
@api.route('/search3.view', methods = [ 'GET', 'POST' ])
def search_id3():
@ -113,19 +113,19 @@ def search_id3():
song_count = int(song_count) if song_count else 20
song_offset = int(song_offset) if song_offset else 0
except ValueError:
return request.error_formatter(0, 'Invalid parameter')
return request.formatter.error(0, 'Invalid parameter')
if not query:
return request.error_formatter(10, 'Missing query parameter')
return request.formatter.error(10, 'Missing query parameter')
with db_session:
artists = Artist.select(lambda a: query in a.name).limit(artist_count, artist_offset)
albums = Album.select(lambda a: query in a.name).limit(album_count, album_offset)
songs = Track.select(lambda t: query in t.title).limit(song_count, song_offset)
return request.formatter(dict(searchResult3 = OrderedDict((
return request.formatter('searchResult3', OrderedDict((
('artist', [ a.as_subsonic_artist(request.user) for a in artists ]),
('album', [ a.as_subsonic_album(request.user) for a in albums ]),
('song', [ t.as_subsonic_child(request.user, request.client) for t in songs ])
))))
)))

View File

@ -25,9 +25,9 @@ from . import api
@api.route('/ping.view', methods = [ 'GET', 'POST' ])
def ping():
return request.formatter(dict())
return request.formatter.empty
@api.route('/getLicense.view', methods = [ 'GET', 'POST' ])
def license():
return request.formatter(dict(license = dict(valid = True )))
return request.formatter('license', dict(valid = True))

View File

@ -31,71 +31,71 @@ from . import api, decode_password
def user_info():
username = request.values.get('username')
if username is None:
return request.error_formatter(10, 'Missing username')
return request.formatter.error(10, 'Missing username')
if username != request.username and not request.user.admin:
return request.error_formatter(50, 'Admin restricted')
return request.formatter.error(50, 'Admin restricted')
with db_session:
user = User.get(name = username)
if user is None:
return request.error_formatter(70, 'Unknown user')
return request.formatter.error(70, 'Unknown user')
return request.formatter(dict(user = user.as_subsonic_user()))
return request.formatter('user', user.as_subsonic_user())
@api.route('/getUsers.view', methods = [ 'GET', 'POST' ])
def users_info():
if not request.user.admin:
return request.error_formatter(50, 'Admin restricted')
return request.formatter.error(50, 'Admin restricted')
with db_session:
return request.formatter(dict(users = dict(user = [ u.as_subsonic_user() for u in User.select() ] )))
return request.formatter('users', dict(user = [ u.as_subsonic_user() for u in User.select() ] ))
@api.route('/createUser.view', methods = [ 'GET', 'POST' ])
def user_add():
if not request.user.admin:
return request.error_formatter(50, 'Admin restricted')
return request.formatter.error(50, 'Admin restricted')
username, password, email, admin = map(request.values.get, [ 'username', 'password', 'email', 'adminRole' ])
if not username or not password or not email:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
admin = True if admin in (True, 'True', 'true', 1, '1') else False
password = decode_password(password)
status = UserManager.add(username, password, email, admin)
if status == UserManager.NAME_EXISTS:
return request.error_formatter(0, 'There is already a user with that username')
return request.formatter.error(0, 'There is already a user with that username')
return request.formatter(dict())
return request.formatter.empty
@api.route('/deleteUser.view', methods = [ 'GET', 'POST' ])
def user_del():
if not request.user.admin:
return request.error_formatter(50, 'Admin restricted')
return request.formatter.error(50, 'Admin restricted')
username = request.values.get('username')
if not username:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
with db_session:
user = User.get(name = username)
if user is None:
return request.error_formatter(70, 'Unknown user')
return request.formatter.error(70, 'Unknown user')
status = UserManager.delete(user.id)
if status != UserManager.SUCCESS:
return request.error_formatter(0, UserManager.error_str(status))
return request.formatter.error(0, UserManager.error_str(status))
return request.formatter(dict())
return request.formatter.empty
@api.route('/changePassword.view', methods = [ 'GET', 'POST' ])
def user_changepass():
username, password = map(request.values.get, [ 'username', 'password' ])
if not username or not password:
return request.error_formatter(10, 'Missing parameter')
return request.formatter.error(10, 'Missing parameter')
if username != request.username and not request.user.admin:
return request.error_formatter(50, 'Admin restricted')
return request.formatter.error(50, 'Admin restricted')
password = decode_password(password)
status = UserManager.change_password2(username, password)
@ -103,7 +103,7 @@ def user_changepass():
code = 0
if status == UserManager.NO_SUCH_USER:
code = 70
return request.error_formatter(code, UserManager.error_str(status))
return request.formatter.error(code, UserManager.error_str(status))
return request.formatter(dict())
return request.formatter.empty

View File

@ -14,40 +14,33 @@ import flask.json
from xml.etree import ElementTree
from supysonic.api.formatters import JSONFormatter, JSONPFormatter, XMLFormatter
from supysonic.py23 import strtype
from ..testbase import TestBase
class ResponseHelperBaseCase(TestBase):
def setUp(self):
super(ResponseHelperBaseCase, self).setUp()
class UnwrapperMixin(object):
def make_response(self, elem, data):
with self.request_context():
rv = super(UnwrapperMixin, self).make_response(elem, data)
return rv.get_data(as_text = True)
from supysonic.api.formatters import make_json_response, make_jsonp_response, make_xml_response
self.json = self.__response_unwrapper(make_json_response)
self.jsonp = self.__response_unwrapper(make_jsonp_response)
self.xml = self.__response_unwrapper(make_xml_response)
@staticmethod
def create_from(cls):
class Unwrapper(UnwrapperMixin, cls):
pass
return Unwrapper
def __response_unwrapper(self, func):
def execute(*args, **kwargs):
with self.request_context():
rv = func(*args, **kwargs)
return rv.get_data(as_text = True)
return execute
class ResponseHelperJsonTestCase(TestBase, UnwrapperMixin.create_from(JSONFormatter)):
def make_response(self, elem, data):
rv = super(ResponseHelperJsonTestCase, self).make_response(elem, data)
return flask.json.loads(rv)
class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
def serialize_and_deserialize(self, d, error = False):
if not isinstance(d, dict):
raise TypeError('Invalid tested value, expecting a dict')
json = self.json(d, error)
return flask.json.loads(json)
def process_and_extract(self, d, error = False):
# Basically returns d with additional version and status
return self.serialize_and_deserialize(d, error)['subsonic-response']
def process_and_extract(self, d):
return self.make_response('tag', d)['subsonic-response']['tag']
def test_basic(self):
empty = self.serialize_and_deserialize({})
empty = self.empty
self.assertEqual(len(empty), 1)
self.assertIn('subsonic-response', empty)
self.assertIsInstance(empty['subsonic-response'], dict)
@ -58,7 +51,7 @@ class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
self.assertIn('version', resp)
self.assertEqual(resp['status'], 'ok')
resp = self.process_and_extract({}, True)
resp = self.error(0, 'message')['subsonic-response']
self.assertEqual(resp['status'], 'failed')
some_dict = {
@ -104,7 +97,7 @@ class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
]
})
self.assertEqual(len(resp), 4) # dict, list, status and version
self.assertEqual(len(resp), 2)
self.assertIn('dict', resp)
self.assertIn('list', resp)
@ -126,50 +119,55 @@ class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
'final string'
])
class ResponseHelperJsonpTestCase(ResponseHelperBaseCase):
class ResponseHelperJsonpTestCase(TestBase, UnwrapperMixin.create_from(JSONPFormatter)):
def test_basic(self):
result = self.jsonp({}, 'callback')
self._JSONPFormatter__callback = 'callback' # hacky
result = self.empty
self.assertTrue(result.startswith('callback({'))
self.assertTrue(result.endswith('})'))
json = flask.json.loads(result[9:-1])
self.assertIn('subsonic-response', json)
class ResponseHelperXMLTestCase(ResponseHelperBaseCase):
def serialize_and_deserialize(self, d, error = False):
xml = self.xml(d, error)
class ResponseHelperXMLTestCase(TestBase, UnwrapperMixin.create_from(XMLFormatter)):
def make_response(self, elem, data):
xml = super(ResponseHelperXMLTestCase, self).make_response(elem, data)
xml = xml.replace('xmlns="http://subsonic.org/restapi"', '')
root = ElementTree.fromstring(xml)
return root
def process_and_extract(self, d):
rv = self.make_response('tag', d)
return rv.find('tag')
def assertAttributesMatchDict(self, elem, d):
d = { k: str(v) for k, v in d.items() }
self.assertDictEqual(elem.attrib, d)
def test_root(self):
xml = self.xml({ 'tag': {}})
xml = super(ResponseHelperXMLTestCase, self).make_response('tag', {})
self.assertIn('<subsonic-response ', xml)
self.assertIn('xmlns="http://subsonic.org/restapi"', xml)
self.assertTrue(xml.strip().endswith('</subsonic-response>'))
def test_basic(self):
empty = self.serialize_and_deserialize({})
empty = self.empty
self.assertIsNotNone(empty.find('.[@version]'))
self.assertIsNotNone(empty.find(".[@status='ok']"))
resp = self.serialize_and_deserialize({}, True)
resp = self.error(0, 'message')
self.assertIsNotNone(resp.find(".[@status='failed']"))
some_dict = {
'intValue': 2,
'someString': 'Hello world!'
}
resp = self.serialize_and_deserialize(some_dict)
resp = self.process_and_extract(some_dict)
self.assertIsNotNone(resp.find('.[@intValue]'))
self.assertIsNotNone(resp.find('.[@someString]'))
def test_lists(self):
resp = self.serialize_and_deserialize({
resp = self.process_and_extract({
'someList': [ 2, 4, 8, 16 ],
'emptyList': []
})
@ -182,7 +180,7 @@ class ResponseHelperXMLTestCase(ResponseHelperBaseCase):
self.assertEqual(int(e.text), i)
def test_dicts(self):
resp = self.serialize_and_deserialize({
resp = self.process_and_extract({
'dict': { 's': 'Blah', 'i': 20 },
'empty': {}
})
@ -193,7 +191,7 @@ class ResponseHelperXMLTestCase(ResponseHelperBaseCase):
self.assertAttributesMatchDict(d, { 's': 'Blah', 'i': 20 })
def test_nesting(self):
resp = self.serialize_and_deserialize({
resp = self.process_and_extract({
'dict': {
'value': 'hey look! a string',
'list': [ 1, 2, 3 ],