1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 17:06:17 +00:00

Look at my pony, my pony is amazing

This commit is contained in:
spl0k 2017-12-23 22:59:04 +01:00
parent 53fd4865cb
commit df63919634
14 changed files with 516 additions and 534 deletions

View File

@ -99,11 +99,11 @@ def get_client_prefs():
client = request.values.get('c')
with db_session:
try:
prefs = ClientPrefs[request.user.id, client]
ClientPrefs[request.user.id, client]
except ObjectNotFound:
prefs = ClientPrefs(user = User[request.user.id], client_name = client)
ClientPrefs(user = User[request.user.id], client_name = client)
request.prefs = prefs
request.client = client
@app.after_request
def set_headers(response):
@ -216,19 +216,20 @@ class ResponseHelper:
return str(value).lower()
return str(value)
def get_entity(req, ent, param = 'id'):
def get_entity(req, cls, param = 'id'):
eid = req.values.get(param)
if not eid:
return False, req.error_formatter(10, 'Missing %s id' % ent.__name__)
return False, req.error_formatter(10, 'Missing %s id' % cls.__name__)
try:
eid = uuid.UUID(eid)
except:
return False, req.error_formatter(0, 'Invalid %s id' % ent.__name__)
return False, req.error_formatter(0, 'Invalid %s id' % cls.__name__)
entity = store.get(ent, eid)
if not entity:
return False, (req.error_formatter(70, '%s not found' % ent.__name__), 404)
try:
entity = cls[eid]
except ObjectNotFound:
return False, (req.error_formatter(70, '%s not found' % cls.__name__), 404)
return True, entity

View File

@ -23,6 +23,7 @@ import uuid
from datetime import timedelta
from flask import request, current_app as app
from pony.orm import db_session, select, desc, avg, max, min, count
from ..db import Folder, Artist, Album, Track, RatingFolder, StarredFolder, StarredArtist, StarredAlbum, StarredTrack, User
from ..db import now
@ -40,31 +41,24 @@ def rand_songs():
except:
return request.error_formatter(0, 'Invalid parameter format')
query = store.find(Track)
query = Track.select()
if fromYear:
query = query.find(Track.year >= fromYear)
query = query.filter(lambda t: t.year >= fromYear)
if toYear:
query = query.find(Track.year <= toYear)
query = query.filter(lambda t: t.year <= toYear)
if genre:
query = query.find(Track.genre == genre)
query = query.filter(lambda t: t.genre == genre)
if fid:
if not store.find(Folder, Folder.id == fid, Folder.root == True).one():
with db_session:
if not Folder.exists(id = fid, root = True):
return request.error_formatter(70, 'Unknown folder')
query = query.find(Track.root_folder_id == fid)
count = query.count()
if not count:
return request.formatter({ 'randomSongs': {} })
tracks = []
for _ in xrange(size):
x = random.choice(xrange(count))
tracks.append(query[x])
query = query.filter(lambda t: t.root_folder.id == fid)
with db_session:
return request.formatter({
'randomSongs': {
'song': [ t.as_subsonic_child(request.user, request.prefs) for t in tracks ]
'song': [ t.as_subsonic_child(request.user, request.client) for t in query.random(size) ]
}
})
@ -79,44 +73,35 @@ def album_list():
except:
return request.error_formatter(0, 'Invalid parameter format')
query = store.find(Folder, Track.folder_id == Folder.id)
query = select(t.folder for t in Track)
if ltype == 'random':
albums = []
count = query.count()
if not count:
return request.formatter({ 'albumList': {} })
for _ in xrange(size):
x = random.choice(xrange(count))
albums.append(query[x])
with db_session:
return request.formatter({
'albumList': {
'album': [ a.as_subsonic_child(request.user) for a in albums ]
'album': [ a.as_subsonic_child(request.user) for a in query.random(size) ]
}
})
elif ltype == 'newest':
query = query.order_by(Desc(Folder.created)).config(distinct = True)
query = query.order_by(desc(Folder.created))
elif ltype == 'highest':
query = query.find(RatingFolder.rated_id == Folder.id).group_by(Folder.id).order_by(Desc(Avg(RatingFolder.rating)))
query = query.order_by(lambda f: desc(avg(f.ratings.rating)))
elif ltype == 'frequent':
query = query.group_by(Folder.id).order_by(Desc(Avg(Track.play_count)))
query = query.order_by(lambda f: desc(avg(f.tracks.play_count)))
elif ltype == 'recent':
query = query.group_by(Folder.id).order_by(Desc(Max(Track.last_play)))
query = query.order_by(lambda f: desc(max(f.tracks.last_play)))
elif ltype == 'starred':
query = query.find(StarredFolder.starred_id == Folder.id, User.id == StarredFolder.user_id, User.name == request.username)
query = select(s.starred for s in StarredFolder if s.user.id == request.user.id and count(s.starred.tracks) > 0)
elif ltype == 'alphabeticalByName':
query = query.order_by(Folder.name).config(distinct = True)
query = query.order_by(Folder.name)
elif ltype == 'alphabeticalByArtist':
parent = ClassAlias(Folder)
query = query.find(Folder.parent_id == parent.id).order_by(parent.name, Folder.name).config(distinct = True)
query = query.order_by(lambda f: f.parent.name + f.name)
else:
return request.error_formatter(0, 'Unknown search type')
with db_session:
return request.formatter({
'albumList': {
'album': [ f.as_subsonic_child(request.user) for f in query[offset:offset+size] ]
'album': [ f.as_subsonic_child(request.user) for f in query.limit(size, offset) ]
}
})
@ -131,76 +116,71 @@ def album_list_id3():
except:
return request.error_formatter(0, 'Invalid parameter format')
query = store.find(Album)
query = Album.select()
if ltype == 'random':
albums = []
count = query.count()
if not count:
return request.formatter({ 'albumList2': {} })
for _ in xrange(size):
x = random.choice(xrange(count))
albums.append(query[x])
with db_session:
return request.formatter({
'albumList2': {
'album': [ a.as_subsonic_album(request.user) for a in albums ]
'album': [ a.as_subsonic_album(request.user) for a in query.random(size) ]
}
})
elif ltype == 'newest':
query = query.find(Track.album_id == Album.id).group_by(Album.id).order_by(Desc(Min(Track.created)))
query = query.order_by(lambda a: desc(min(a.tracks.created)))
elif ltype == 'frequent':
query = query.find(Track.album_id == Album.id).group_by(Album.id).order_by(Desc(Avg(Track.play_count)))
query = query.order_by(lambda a: desc(avg(a.tracks.play_count)))
elif ltype == 'recent':
query = query.find(Track.album_id == Album.id).group_by(Album.id).order_by(Desc(Max(Track.last_play)))
query = query.order_by(lambda a: desc(max(a.tracks.last_play)))
elif ltype == 'starred':
query = query.find(StarredAlbum.starred_id == Album.id, User.id == StarredAlbum.user_id, User.name == request.username)
query = select(s.starred for s in StarredAlbum if s.user.id == request.user.id)
elif ltype == 'alphabeticalByName':
query = query.order_by(Album.name)
elif ltype == 'alphabeticalByArtist':
query = query.find(Artist.id == Album.artist_id).order_by(Artist.name, Album.name)
query = query.order_by(lambda a: a.artist.name + a.name)
else:
return request.error_formatter(0, 'Unknown search type')
with db_session:
return request.formatter({
'albumList2': {
'album': [ f.as_subsonic_album(request.user) for f in query[offset:offset+size] ]
'album': [ f.as_subsonic_album(request.user) for f in query.limit(size, offset) ]
}
})
@app.route('/rest/getNowPlaying.view', methods = [ 'GET', 'POST' ])
@db_session
def now_playing():
query = store.find(User, Track.id == User.last_play_id)
query = User.select(lambda u: u.last_play is not None and u.last_play_date + timedelta(minutes = 3) > now())
return request.formatter({
'nowPlaying': {
'entry': [ dict(
u.last_play.as_subsonic_child(request.user, request.prefs).items() +
u.last_play.as_subsonic_child(request.user, request.client).items() +
{ 'username': u.name, 'minutesAgo': (now() - u.last_play_date).seconds / 60, 'playerId': 0 }.items()
) for u in query if u.last_play_date + timedelta(seconds = u.last_play.duration * 2) > now() ]
) for u in query ]
}
})
@app.route('/rest/getStarred.view', methods = [ 'GET', 'POST' ])
@db_session
def get_starred():
folders = store.find(StarredFolder, StarredFolder.user_id == User.id, User.name == request.username)
folders = select(s.starred for s in StarredFolder if s.user.id == request.user.id)
return request.formatter({
'starred': {
'artist': [ { 'id': str(sf.starred_id), 'name': sf.starred.name } for sf in folders.find(Folder.parent_id == StarredFolder.starred_id, Track.folder_id == Folder.id).config(distinct = True) ],
'album': [ sf.starred.as_subsonic_child(request.user) for sf in folders.find(Track.folder_id == StarredFolder.starred_id).config(distinct = True) ],
'song': [ st.starred.as_subsonic_child(request.user, request.prefs) for st in store.find(StarredTrack, StarredTrack.user_id == User.id, User.name == request.username) ]
'artist': [ { 'id': str(sf.id), 'name': sf.name } for sf in folders.filter(lambda f: count(f.tracks) == 0) ],
'album': [ sf.as_subsonic_child(request.user) for sf in folders.filter(lambda f: count(f.tracks) > 0) ],
'song': [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
}
})
@app.route('/rest/getStarred2.view', methods = [ 'GET', 'POST' ])
@db_session
def get_starred_id3():
return request.formatter({
'starred2': {
'artist': [ sa.starred.as_subsonic_artist(request.user) for sa in store.find(StarredArtist, StarredArtist.user_id == User.id, User.name == request.username) ],
'album': [ sa.starred.as_subsonic_album(request.user) for sa in store.find(StarredAlbum, StarredAlbum.user_id == User.id, User.name == request.username) ],
'song': [ st.starred.as_subsonic_child(request.user, request.prefs) for st in store.find(StarredTrack, StarredTrack.user_id == User.id, User.name == request.username) ]
'artist': [ sa.as_subsonic_artist(request.user) for sa in select(s.starred for s in StarredArtist if s.user.id == request.user.id) ],
'album': [ sa.as_subsonic_album(request.user) for sa in select(s.starred for s in StarredAlbum if s.user.id == request.user.id) ],
'song': [ st.as_subsonic_child(request.user, request.client) for st in select(s.starred for s in StarredTrack if s.user.id == request.user.id) ]
}
})

View File

@ -22,19 +22,22 @@ import time
import uuid
from flask import request, current_app as app
from pony.orm import db_session, delete
from pony.orm import ObjectNotFound
from ..db import Track, Album, Artist, Folder
from ..db import Track, Album, Artist, Folder, User
from ..db import StarredTrack, StarredAlbum, StarredArtist, StarredFolder
from ..db import RatingTrack, RatingFolder
from ..lastfm import LastFm
from . import get_entity
def try_star(ent, starred_ent, eid):
@db_session
def try_star(cls, starred_cls, eid):
""" Stars an entity
:param ent: entity class, Folder, Artist, Album or Track
:param starred_ent: class used for the db representation of the starring of ent
:param cls: entity class, Folder, Artist, Album or Track
:param starred_cls: class used for the db representation of the starring of ent
:param eid: id of the entity to star
:return error dict, if any. None otherwise
"""
@ -42,26 +45,27 @@ def try_star(ent, starred_ent, eid):
try:
uid = uuid.UUID(eid)
except:
return { 'code': 0, 'message': 'Invalid {} id {}'.format(ent.__name__, eid) }
return { 'code': 0, 'message': 'Invalid {} id {}'.format(cls.__name__, eid) }
if store.get(starred_ent, (request.user.id, uid)):
return { 'code': 0, 'message': '{} {} already starred'.format(ent.__name__, eid) }
try:
e = cls[uid]
except ObjectNotFound:
return { 'code': 70, 'message': 'Unknown {} id {}'.format(cls.__name__, eid) }
e = store.get(ent, uid)
if not e:
return { 'code': 70, 'message': 'Unknown {} id {}'.format(ent.__name__, eid) }
starred = starred_ent()
starred.user_id = request.user.id
starred.starred_id = uid
store.add(starred)
try:
starred_cls[request.user.id, uid]
return { 'code': 0, 'message': '{} {} already starred'.format(cls.__name__, eid) }
except ObjectNotFound:
pass
starred_cls(user = User[request.user.id], starred = e)
return None
def try_unstar(starred_ent, eid):
@db_session
def try_unstar(starred_cls, eid):
""" Unstars an entity
:param starred_ent: class used for the db representation of the starring of the entity
:param starred_cls: class used for the db representation of the starring of the entity
:param eid: id of the entity to unstar
:return error dict, if any. None otherwise
"""
@ -71,7 +75,7 @@ def try_unstar(starred_ent, eid):
except:
return { 'code': 0, 'message': 'Invalid id {}'.format(eid) }
store.find(starred_ent, starred_ent.user_id == request.user.id, starred_ent.starred_id == uid).remove()
delete(s for s in starred_cls if s.user.id == request.user.id and s.starred.id == uid)
return None
def merge_errors(errors):
@ -105,7 +109,6 @@ def star():
for arId in artistId:
errors.append(try_star(Artist, StarredArtist, arId))
store.commit()
error = merge_errors(errors)
return request.formatter({ 'error': error }, error = True) if error else request.formatter({})
@ -129,7 +132,6 @@ def unstar():
for arId in artistId:
errors.append(try_unstar(StarredArtist, arId))
store.commit()
error = merge_errors(errors)
return request.formatter({ 'error': error }, error = True) if error else request.formatter({})
@ -148,32 +150,31 @@ def rate():
if not rating in xrange(6):
return request.error_formatter(0, 'rating must be between 0 and 5 (inclusive)')
with db_session:
if rating == 0:
store.find(RatingTrack, RatingTrack.user_id == request.user.id, RatingTrack.rated_id == uid).remove()
store.find(RatingFolder, RatingFolder.user_id == request.user.id, RatingFolder.rated_id == uid).remove()
delete(r for r in RatingTrack if r.user.id == request.user.id and r.rated.id == uid)
delete(r for r in RatingFolder if r.user.id == request.user.id and r.rated.id == uid)
else:
rated = store.get(Track, uid)
rating_ent = RatingTrack
if not rated:
rated = store.get(Folder, uid)
rating_ent = RatingFolder
if not rated:
try:
rated = Track[uid]
rating_cls = RatingTrack
except ObjectNotFound:
try:
rated = Folder[uid]
rating_cls = RatingFolder
except ObjectNotFound:
return request.error_formatter(70, 'Unknown id')
rating_info = store.get(rating_ent, (request.user.id, uid))
if rating_info:
try:
rating_info = rating_cls[request.user.id, uid]
rating_info.rating = rating
else:
rating_info = rating_ent()
rating_info.user_id = request.user.id
rating_info.rated_id = uid
rating_info.rating = rating
store.add(rating_info)
except ObjectNotFound:
rating_cls(user = User[request.user.id], rated = rated, rating = rating)
store.commit()
return request.formatter({})
@app.route('/rest/scrobble.view', methods = [ 'GET', 'POST' ])
@db_session
def scrobble():
status, res = get_entity(request, Track)
if not status:
@ -189,7 +190,7 @@ def scrobble():
else:
t = int(time.time())
lfm = LastFm(app.config['LASTFM'], request.user, app.logger)
lfm = LastFm(app.config['LASTFM'], User[request.user.id], app.logger)
if submission in (None, '', True, 'true', 'True', 1, '1'):
lfm.scrobble(res, t)

View File

@ -22,23 +22,27 @@ import string
import uuid
from flask import request, current_app as app
from pony.orm import db_session
from pony.orm import ObjectNotFound
from ..db import Folder, Artist, Album, Track
from . import get_entity
@app.route('/rest/getMusicFolders.view', methods = [ 'GET', 'POST' ])
@db_session
def list_folders():
return request.formatter({
'musicFolders': {
'musicFolder': [ {
'id': str(f.id),
'name': f.name
} for f in store.find(Folder, Folder.root == True).order_by(Folder.name) ]
} for f in Folder.select(lambda f: f.root).order_by(Folder.name) ]
}
})
@app.route('/rest/getIndexes.view', methods = [ 'GET', 'POST' ])
@db_session
def list_indexes():
musicFolderId = request.values.get('musicFolderId')
ifModifiedSince = request.values.get('ifModifiedSince')
@ -49,33 +53,31 @@ def list_indexes():
return request.error_formatter(0, 'Invalid timestamp')
if musicFolderId is None:
folder = store.find(Folder, Folder.root == True)
folders = Folder.select(lambda f: f.root)[:]
else:
try:
mfid = uuid.UUID(musicFolderId)
except:
return request.error_formatter(0, 'Invalid id')
folder = store.get(Folder, mfid)
if not folder or (type(folder) is Folder and not folder.root):
try:
folder = Folder[mfid]
except ObjectNotFound:
return request.error_formatter(70, 'Folder not found')
if not folder.root:
return request.error_formatter(70, 'Folder not found')
folders = [ folder ]
last_modif = max(map(lambda f: f.last_scan, folder)) if type(folder) is not Folder else folder.last_scan
if (not ifModifiedSince is None) and last_modif < ifModifiedSince:
last_modif = max(map(lambda f: f.last_scan, folders))
if ifModifiedSince is not None and last_modif < ifModifiedSince:
return request.formatter({ 'indexes': { 'lastModified': last_modif * 1000 } })
# The XSD lies, we don't return artists but a directory structure
if type(folder) is not Folder:
artists = []
childs = []
for f in folder:
artists += f.children
childs += f.tracks
else:
artists = folder.children
childs = folder.tracks
children = []
for f in folders:
artists += f.children.select()[:]
children += f.tracks.select()[:]
indexes = {}
for artist in artists:
@ -100,11 +102,12 @@ def list_indexes():
'name': a.name
} for a in sorted(v, key = lambda a: a.name.lower()) ]
} for k, v in sorted(indexes.iteritems()) ],
'child': [ c.as_subsonic_child(request.user, request.prefs) for c in sorted(childs, key = lambda t: t.sort_key()) ]
'child': [ c.as_subsonic_child(request.user, request.client) for c in sorted(children, key = lambda t: t.sort_key()) ]
}
})
@app.route('/rest/getMusicDirectory.view', methods = [ 'GET', 'POST' ])
@db_session
def show_directory():
status, res = get_entity(request, Folder)
if not status:
@ -113,18 +116,19 @@ def show_directory():
directory = {
'id': str(res.id),
'name': res.name,
'child': [ f.as_subsonic_child(request.user) for f in sorted(res.children, key = lambda c: c.name.lower()) ] + [ t.as_subsonic_child(request.user, request.prefs) for t in sorted(res.tracks, key = lambda t: t.sort_key()) ]
'child': [ f.as_subsonic_child(request.user) for f in res.children.order_by(lambda c: c.name.lower()) ] + [ t.as_subsonic_child(request.user, request.client) for t in sorted(res.tracks, key = lambda t: t.sort_key()) ]
}
if not res.root:
directory['parent'] = str(res.parent_id)
directory['parent'] = str(res.parent.id)
return request.formatter({ 'directory': directory })
@app.route('/rest/getArtists.view', methods = [ 'GET', 'POST' ])
@db_session
def list_artists():
# According to the API page, there are no parameters?
indexes = {}
for artist in store.find(Artist):
for artist in Artist.select():
index = artist.name[0].upper() if artist.name else '?'
if index in map(str, xrange(10)):
index = '#'
@ -146,6 +150,7 @@ def list_artists():
})
@app.route('/rest/getArtist.view', methods = [ 'GET', 'POST' ])
@db_session
def artist_info():
status, res = get_entity(request, Artist)
if not status:
@ -159,23 +164,25 @@ def artist_info():
return request.formatter({ 'artist': info })
@app.route('/rest/getAlbum.view', methods = [ 'GET', 'POST' ])
@db_session
def album_info():
status, res = get_entity(request, Album)
if not status:
return res
info = res.as_subsonic_album(request.user)
info['song'] = [ t.as_subsonic_child(request.user, request.prefs) for t in sorted(res.tracks, key = lambda t: t.sort_key()) ]
info['song'] = [ t.as_subsonic_child(request.user, request.client) for t in sorted(res.tracks, key = lambda t: t.sort_key()) ]
return request.formatter({ 'album': info })
@app.route('/rest/getSong.view', methods = [ 'GET', 'POST' ])
@db_session
def track_info():
status, res = get_entity(request, Track)
if not status:
return res
return request.formatter({ 'song': res.as_subsonic_child(request.user, request.prefs) })
return request.formatter({ 'song': res.as_subsonic_child(request.user, request.client) })
@app.route('/rest/getVideos.view', methods = [ 'GET', 'POST' ])
def list_videos():

View File

@ -26,6 +26,7 @@ import subprocess
from flask import request, send_file, Response, current_app as app
from PIL import Image
from pony.orm import db_session
from xml.etree import ElementTree
from .. import scanner
@ -42,6 +43,7 @@ def prepare_transcoding_cmdline(base_cmdline, input_file, input_format, output_f
return ret
@app.route('/rest/stream.view', methods = [ 'GET', 'POST' ])
@db_session
def stream_media():
status, res = get_entity(request, Track)
if not status:
@ -56,10 +58,11 @@ def stream_media():
dst_bitrate = res.bitrate
dst_mimetype = res.content_type
if request.prefs.format:
dst_suffix = request.prefs.format
if request.prefs.bitrate and request.prefs.bitrate < dst_bitrate:
dst_bitrate = request.prefs.bitrate
prefs = ClientPrefs.get(lambda p: p.user.id == request.user.id and p.client_name == request.client)
if prefs.format:
dst_suffix = prefs.format
if prefs.bitrate and prefs.bitrate < dst_bitrate:
dst_bitrate = prefs.bitrate
if maxBitRate:
try:
@ -120,14 +123,15 @@ def stream_media():
res.play_count = res.play_count + 1
res.last_play = now()
request.user.last_play = res
request.user.last_play_date = now()
store.commit()
user = User[request.user.id]
user.last_play = res
user.last_play_date = now()
return response
@app.route('/rest/download.view', methods = [ 'GET', 'POST' ])
def download_media():
with db_session:
status, res = get_entity(request, Track)
if not status:
return res
@ -136,6 +140,7 @@ def download_media():
@app.route('/rest/getCoverArt.view', methods = [ 'GET', 'POST' ])
def cover_art():
with db_session:
status, res = get_entity(request, Folder)
if not status:
return res
@ -175,7 +180,8 @@ def lyrics():
if not title:
return request.error_formatter(10, 'Missing title parameter')
query = store.find(Track, Album.id == Track.album_id, Artist.id == Album.artist_id, Track.title.like(title), Artist.name.like(artist))
with db_session:
query = Track.select(lambda t: title in t.title and artist in t.artist.name)
for track in query:
lyrics_path = os.path.splitext(track.path)[0] + '.txt'
if os.path.exists(lyrics_path):

View File

@ -21,6 +21,8 @@
import uuid
from flask import request, current_app as app
from pony.orm import db_session, rollback
from pony.orm import ObjectNotFound
from ..db import Playlist, User, Track
@ -28,37 +30,40 @@ from . import get_entity
@app.route('/rest/getPlaylists.view', methods = [ 'GET', 'POST' ])
def list_playlists():
query = store.find(Playlist, Or(Playlist.user_id == request.user.id, Playlist.public == True)).order_by(Playlist.name)
query = Playlist.select(lambda p: p.user.id == request.user.id or p.public).order_by(Playlist.name)
username = request.values.get('username')
if username:
if not request.user.admin:
return request.error_formatter(50, 'Restricted to admins')
user = store.find(User, User.name == username).one()
if not user:
with db_session:
user = User.get(name = username)
if user is None:
return request.error_formatter(70, 'No such user')
query = store.find(Playlist, Playlist.user_id == User.id, User.name == username).order_by(Playlist.name)
query = Playlist.select(lambda p: p.user.name == username).order_by(Playlist.name)
with db_session:
return request.formatter({ 'playlists': { 'playlist': [ p.as_subsonic_playlist(request.user) for p in query ] } })
@app.route('/rest/getPlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
def show_playlist():
status, res = get_entity(request, Playlist)
if not status:
return res
if res.user_id != request.user.id and not request.user.admin:
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter('50', 'Private playlist')
info = res.as_subsonic_playlist(request.user)
info['entry'] = [ t.as_subsonic_child(request.user, request.prefs) for t in res.get_tracks() ]
info['entry'] = [ t.as_subsonic_child(request.user, request.client) for t in res.get_tracks() ]
return request.formatter({ 'playlist': info })
@app.route('/rest/createPlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
def create_playlist():
# Only(?) method where the android client uses form data rather than GET params
playlist_id, name = map(request.values.get, [ 'playlistId', 'name' ])
# songId actually doesn't seem to be required
songs = request.values.getlist('songId')
@ -69,55 +74,54 @@ def create_playlist():
return request.error_formatter(0, 'Invalid parameter')
if playlist_id:
playlist = store.get(Playlist, playlist_id)
if not playlist:
try:
playlist = Playlist[playlist_id]
except ObjectNotFound:
return request.error_formatter(70, 'Unknwon playlist')
if playlist.user_id != request.user.id and not request.user.admin:
if playlist.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to modify a playlist that isn't yours")
playlist.clear()
if name:
playlist.name = name
elif name:
playlist = Playlist()
playlist.user_id = request.user.id
playlist.name = name
store.add(playlist)
playlist = Playlist(user = User[request.user.id], name = name)
else:
return request.error_formatter(10, 'Missing playlist id or name')
for sid in songs:
track = store.get(Track, sid)
if not track:
store.rollback()
try:
track = Track[sid]
except ObjectNotFound:
rollback()
return request.error_formatter(70, 'Unknown song')
playlist.add(track)
store.commit()
return request.formatter({})
@app.route('/rest/deletePlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
def delete_playlist():
status, res = get_entity(request, Playlist)
if not status:
return res
if res.user_id != request.user.id and not request.user.admin:
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to delete a playlist that isn't yours")
store.remove(res)
store.commit()
res.delete()
return request.formatter({})
@app.route('/rest/updatePlaylist.view', methods = [ 'GET', 'POST' ])
@db_session
def update_playlist():
status, res = get_entity(request, Playlist, 'playlistId')
if not status:
return res
if res.user_id != request.user.id and not request.user.admin:
if res.user.id != request.user.id and not request.user.admin:
return request.error_formatter(50, "You're not allowed to delete a playlist that isn't yours")
playlist = res
@ -137,13 +141,13 @@ def update_playlist():
playlist.public = public in (True, 'True', 'true', 1, '1')
for sid in to_add:
track = store.get(Track, sid)
if not track:
try:
track = Track[sid]
except ObjectNotFound:
return request.error_formatter(70, 'Unknown song')
playlist.add(track)
playlist.remove_at_indexes(to_remove)
store.commit()
return request.formatter({})

View File

@ -56,7 +56,7 @@ def old_search():
return request.formatter({ 'searchResult': {
'totalHits': folders.count() + tracks.count(),
'offset': offset,
'match': [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.prefs) for r in res ]
'match': [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.client) for r in res ]
}})
else:
return request.error_formatter(10, 'Missing search parameter')
@ -65,7 +65,7 @@ def old_search():
return request.formatter({ 'searchResult': {
'totalHits': query.count(),
'offset': offset,
'match': [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.prefs) for r in query[offset : offset + count] ]
'match': [ r.as_subsonic_child(request.user) if isinstance(r, Folder) else r.as_subsonic_child(request.user, request.client) for r in query[offset : offset + count] ]
}})
@app.route('/rest/search2.view', methods = [ 'GET', 'POST' ])
@ -94,7 +94,7 @@ def new_search():
return request.formatter({ 'searchResult2': {
'artist': [ { 'id': str(a.id), 'name': a.name } for a in artists ],
'album': [ f.as_subsonic_child(request.user) for f in albums ],
'song': [ t.as_subsonic_child(request.user, request.prefs) for t in songs ]
'song': [ t.as_subsonic_child(request.user, request.client) for t in songs ]
}})
@app.route('/rest/search3.view', methods = [ 'GET', 'POST' ])
@ -123,6 +123,6 @@ def search_id3():
return request.formatter({ 'searchResult3': {
'artist': [ a.as_subsonic_artist(request.user) for a in artists ],
'album': [ a.as_subsonic_album(request.user) for a in albums ],
'song': [ t.as_subsonic_child(request.user, request.prefs) for t in songs ]
'song': [ t.as_subsonic_child(request.user, request.client) for t in songs ]
}})

View File

@ -177,7 +177,7 @@ class Track(db.Entity):
stars = Set(lambda: StarredTrack)
ratings = Set(lambda: RatingTrack)
def as_subsonic_child(self, user, prefs):
def as_subsonic_child(self, user, client):
info = {
'id': str(self.id),
'parent': str(self.folder.id),
@ -221,7 +221,8 @@ class Track(db.Entity):
if avgRating:
info['averageRating'] = avgRating
if prefs and prefs.format and prefs.format != self.suffix():
prefs = ClientPrefs.get(lambda p: p.user.id == user.id and p.client_name == client)
if prefs is not None and prefs.format is not None and prefs.format != self.suffix():
info['transcodedSuffix'] = prefs.format
info['transcodedContentType'] = mimetypes.guess_type('dummyname.' + prefs.format, False)[0] or 'application/octet-stream'

View File

@ -11,6 +11,8 @@
import uuid
from pony.orm import db_session
from supysonic.db import Folder, Artist, Album, Track
from .apitestbase import ApiTestBase
@ -22,34 +24,25 @@ class AlbumSongsTestCase(ApiTestBase):
def setUp(self):
super(AlbumSongsTestCase, self).setUp()
folder = Folder()
folder.name = 'Root'
folder.root = True
folder.path = 'tests/assets'
with db_session:
folder = Folder(name = 'Root', root = True, path = 'tests/assets')
artist = Artist(name = 'Artist')
album = Album(name = 'Album', artist = artist)
artist = Artist()
artist.name = 'Artist'
album = Album()
album.name = 'Album'
album.artist = artist
track = Track()
track.title = 'Track'
track.album = album
track.artist = artist
track.disc = 1
track.number = 1
track.path = 'tests/assets/empty'
track.folder = folder
track.root_folder = folder
track.duration = 2
track.bitrate = 320
track.content_type = 'audio/mpeg'
track.last_modification = 0
self.store.add(track)
self.store.commit()
track = Track(
title = 'Track',
album = album,
artist = artist,
disc = 1,
number = 1,
path = 'tests/assets/empty',
folder = folder,
root_folder = folder,
duration = 2,
bitrate = 320,
content_type = 'audio/mpeg',
last_modification = 0
)
def test_get_album_list(self):
self._make_request('getAlbumList', error = 10)
@ -63,11 +56,9 @@ class AlbumSongsTestCase(ApiTestBase):
self._make_request('getAlbumList', { 'type': t }, tag = 'albumList', skip_post = True)
rv, child = self._make_request('getAlbumList', { 'type': 'random' }, tag = 'albumList', skip_post = True)
self.assertEqual(len(child), 10)
rv, child = self._make_request('getAlbumList', { 'type': 'random', 'size': 3 }, tag = 'albumList', skip_post = True)
self.assertEqual(len(child), 3)
self.store.remove(self.store.find(Folder).one())
with db_session:
Folder.get().delete()
rv, child = self._make_request('getAlbumList', { 'type': 'random' }, tag = 'albumList')
self.assertEqual(len(child), 0)
@ -82,12 +73,10 @@ class AlbumSongsTestCase(ApiTestBase):
self._make_request('getAlbumList2', { 'type': t }, tag = 'albumList2', skip_post = True)
rv, child = self._make_request('getAlbumList2', { 'type': 'random' }, tag = 'albumList2', skip_post = True)
self.assertEqual(len(child), 10)
rv, child = self._make_request('getAlbumList2', { 'type': 'random', 'size': 3 }, tag = 'albumList2', skip_post = True)
self.assertEqual(len(child), 3)
self.store.remove(self.store.find(Track).one())
self.store.remove(self.store.find(Album).one())
with db_session:
Track.get().delete()
Album.get().delete()
rv, child = self._make_request('getAlbumList2', { 'type': 'random' }, tag = 'albumList2')
self.assertEqual(len(child), 0)
@ -98,12 +87,10 @@ class AlbumSongsTestCase(ApiTestBase):
self._make_request('getRandomSongs', { 'musicFolderId': 'idid' }, error = 0)
self._make_request('getRandomSongs', { 'musicFolderId': uuid.uuid4() }, error = 70)
rv, child = self._make_request('getRandomSongs', tag = 'randomSongs')
self.assertEqual(len(child), 10)
rv, child = self._make_request('getRandomSongs', { 'size': 3 }, tag = 'randomSongs')
self.assertEqual(len(child), 3)
rv, child = self._make_request('getRandomSongs', tag = 'randomSongs', skip_post = True)
fid = self.store.find(Folder).one().id
with db_session:
fid = Folder.get().id
self._make_request('getRandomSongs', { 'fromYear': -52, 'toYear': '1984', 'genre': 'some cryptic subgenre youve never heard of', 'musicFolderId': fid }, tag = 'randomSongs')
def test_now_playing(self):

View File

@ -11,6 +11,8 @@
import uuid
from pony.orm import db_session
from supysonic.db import Folder, Artist, Album, Track, User, ClientPrefs
from .apitestbase import ApiTestBase
@ -19,45 +21,32 @@ class AnnotationTestCase(ApiTestBase):
def setUp(self):
super(AnnotationTestCase, self).setUp()
root = Folder()
root.name = 'Root'
root.root = True
root.path = 'tests/assets'
with db_session:
root = Folder(name = 'Root', root = True, path = 'tests')
folder = Folder(name = 'Folder', path = 'tests/assets', parent = root)
artist = Artist(name = 'Artist')
album = Album(name = 'Album', artist = artist)
folder = Folder()
folder.name = 'Folder'
folder.path = 'tests/assets'
folder.parent = root
track = Track(
title = 'Track',
album = album,
artist = artist,
disc = 1,
number = 1,
path = 'tests/assets/empty',
folder = folder,
root_folder = root,
duration = 2,
bitrate = 320,
content_type = 'audio/mpeg',
last_modification = 0
)
artist = Artist()
artist.name = 'Artist'
album = Album()
album.name = 'Album'
album.artist = artist
track = Track()
track.title = 'Track'
track.album = album
track.artist = artist
track.disc = 1
track.number = 1
track.path = 'tests/assets/empty'
track.folder = folder
track.root_folder = root
track.duration = 2
track.bitrate = 320
track.content_type = 'audio/mpeg'
track.last_modification = 0
self.store.add(track)
self.store.commit()
self.folder = folder
self.artist = artist
self.album = album
self.track = track
self.user = self.store.find(User, User.name == 'alice').one()
self.folderid = folder.id
self.artistid = artist.id
self.albumid = album.id
self.trackid = track.id
self.user = User.get(name = 'alice')
def test_star(self):
self._make_request('star', error = 10)
@ -68,88 +57,101 @@ class AnnotationTestCase(ApiTestBase):
self._make_request('star', { 'albumId': str(uuid.uuid4()) }, error = 70)
self._make_request('star', { 'artistId': str(uuid.uuid4()) }, error = 70)
self._make_request('star', { 'id': str(self.artist.id) }, error = 70, skip_xsd = True)
self._make_request('star', { 'id': str(self.album.id) }, error = 70, skip_xsd = True)
self._make_request('star', { 'id': str(self.track.id) }, skip_post = True)
self.assertIn('starred', self.track.as_subsonic_child(self.user, ClientPrefs()))
self._make_request('star', { 'id': str(self.track.id) }, error = 0, skip_xsd = True)
self._make_request('star', { 'id': str(self.artistid) }, error = 70, skip_xsd = True)
self._make_request('star', { 'id': str(self.albumid) }, error = 70, skip_xsd = True)
self._make_request('star', { 'id': str(self.trackid) }, skip_post = True)
with db_session:
self.assertIn('starred', Track[self.trackid].as_subsonic_child(self.user, 'tests'))
self._make_request('star', { 'id': str(self.trackid) }, error = 0, skip_xsd = True)
self._make_request('star', { 'id': str(self.folder.id) }, skip_post = True)
self.assertIn('starred', self.folder.as_subsonic_child(self.user))
self._make_request('star', { 'id': str(self.folder.id) }, error = 0, skip_xsd = True)
self._make_request('star', { 'id': str(self.folderid) }, skip_post = True)
with db_session:
self.assertIn('starred', Folder[self.folderid].as_subsonic_child(self.user))
self._make_request('star', { 'id': str(self.folderid) }, error = 0, skip_xsd = True)
self._make_request('star', { 'albumId': str(self.folder.id) }, error = 70)
self._make_request('star', { 'albumId': str(self.artist.id) }, error = 70)
self._make_request('star', { 'albumId': str(self.track.id) }, error = 70)
self._make_request('star', { 'albumId': str(self.album.id) }, skip_post = True)
self.assertIn('starred', self.album.as_subsonic_album(self.user))
self._make_request('star', { 'albumId': str(self.album.id) }, error = 0)
self._make_request('star', { 'albumId': str(self.folderid) }, error = 70)
self._make_request('star', { 'albumId': str(self.artistid) }, error = 70)
self._make_request('star', { 'albumId': str(self.trackid) }, error = 70)
self._make_request('star', { 'albumId': str(self.albumid) }, skip_post = True)
with db_session:
self.assertIn('starred', Album[self.albumid].as_subsonic_album(self.user))
self._make_request('star', { 'albumId': str(self.albumid) }, error = 0)
self._make_request('star', { 'artistId': str(self.folder.id) }, error = 70)
self._make_request('star', { 'artistId': str(self.album.id) }, error = 70)
self._make_request('star', { 'artistId': str(self.track.id) }, error = 70)
self._make_request('star', { 'artistId': str(self.artist.id) }, skip_post = True)
self.assertIn('starred', self.artist.as_subsonic_artist(self.user))
self._make_request('star', { 'artistId': str(self.artist.id) }, error = 0)
self._make_request('star', { 'artistId': str(self.folderid) }, error = 70)
self._make_request('star', { 'artistId': str(self.albumid) }, error = 70)
self._make_request('star', { 'artistId': str(self.trackid) }, error = 70)
self._make_request('star', { 'artistId': str(self.artistid) }, skip_post = True)
with db_session:
self.assertIn('starred', Artist[self.artistid].as_subsonic_artist(self.user))
self._make_request('star', { 'artistId': str(self.artistid) }, error = 0)
def test_unstar(self):
self._make_request('star', { 'id': [ str(self.folder.id), str(self.track.id) ], 'artistId': str(self.artist.id), 'albumId': str(self.album.id) }, skip_post = True)
self._make_request('star', { 'id': [ str(self.folderid), str(self.trackid) ], 'artistId': str(self.artistid), 'albumId': str(self.albumid) }, skip_post = True)
self._make_request('unstar', error = 10)
self._make_request('unstar', { 'id': 'unknown' }, error = 0, skip_xsd = True)
self._make_request('unstar', { 'albumId': 'unknown' }, error = 0)
self._make_request('unstar', { 'artistId': 'unknown' }, error = 0)
self._make_request('unstar', { 'id': str(self.track.id) }, skip_post = True)
self.assertNotIn('starred', self.track.as_subsonic_child(self.user, ClientPrefs()))
self._make_request('unstar', { 'id': str(self.trackid) }, skip_post = True)
with db_session:
self.assertNotIn('starred', Track[self.trackid].as_subsonic_child(self.user, 'tests'))
self._make_request('unstar', { 'id': str(self.folder.id) }, skip_post = True)
self.assertNotIn('starred', self.folder.as_subsonic_child(self.user))
self._make_request('unstar', { 'id': str(self.folderid) }, skip_post = True)
with db_session:
self.assertNotIn('starred', Folder[self.folderid].as_subsonic_child(self.user))
self._make_request('unstar', { 'albumId': str(self.album.id) }, skip_post = True)
self.assertNotIn('starred', self.album.as_subsonic_album(self.user))
self._make_request('unstar', { 'albumId': str(self.albumid) }, skip_post = True)
with db_session:
self.assertNotIn('starred', Album[self.albumid].as_subsonic_album(self.user))
self._make_request('unstar', { 'artistId': str(self.artist.id) }, skip_post = True)
self.assertNotIn('starred', self.artist.as_subsonic_artist(self.user))
self._make_request('unstar', { 'artistId': str(self.artistid) }, skip_post = True)
with db_session:
self.assertNotIn('starred', Artist[self.artistid].as_subsonic_artist(self.user))
def test_set_rating(self):
self._make_request('setRating', error = 10)
self._make_request('setRating', { 'id': str(self.track.id) }, error = 10)
self._make_request('setRating', { 'id': str(self.trackid) }, error = 10)
self._make_request('setRating', { 'rating': 3 }, error = 10)
self._make_request('setRating', { 'id': 'string', 'rating': 3 }, error = 0)
self._make_request('setRating', { 'id': str(uuid.uuid4()), 'rating': 3 }, error = 70)
self._make_request('setRating', { 'id': str(self.artist.id), 'rating': 3 }, error = 70)
self._make_request('setRating', { 'id': str(self.album.id), 'rating': 3 }, error = 70)
self._make_request('setRating', { 'id': str(self.track.id), 'rating': 'string' }, error = 0)
self._make_request('setRating', { 'id': str(self.track.id), 'rating': -1 }, error = 0)
self._make_request('setRating', { 'id': str(self.track.id), 'rating': 6 }, error = 0)
self._make_request('setRating', { 'id': str(self.artistid), 'rating': 3 }, error = 70)
self._make_request('setRating', { 'id': str(self.albumid), 'rating': 3 }, error = 70)
self._make_request('setRating', { 'id': str(self.trackid), 'rating': 'string' }, error = 0)
self._make_request('setRating', { 'id': str(self.trackid), 'rating': -1 }, error = 0)
self._make_request('setRating', { 'id': str(self.trackid), 'rating': 6 }, error = 0)
prefs = ClientPrefs()
self.assertNotIn('userRating', self.track.as_subsonic_child(self.user, prefs))
with db_session:
self.assertNotIn('userRating', Track[self.trackid].as_subsonic_child(self.user, 'tests'))
for i in range(1, 6):
self._make_request('setRating', { 'id': str(self.track.id), 'rating': i }, skip_post = True)
self.assertEqual(self.track.as_subsonic_child(self.user, prefs)['userRating'], i)
self._make_request('setRating', { 'id': str(self.track.id), 'rating': 0 }, skip_post = True)
self.assertNotIn('userRating', self.track.as_subsonic_child(self.user, prefs))
self._make_request('setRating', { 'id': str(self.trackid), 'rating': i }, skip_post = True)
with db_session:
self.assertEqual(Track[self.trackid].as_subsonic_child(self.user, 'tests')['userRating'], i)
self.assertNotIn('userRating', self.folder.as_subsonic_child(self.user))
self._make_request('setRating', { 'id': str(self.trackid), 'rating': 0 }, skip_post = True)
with db_session:
self.assertNotIn('userRating', Track[self.trackid].as_subsonic_child(self.user, 'tests'))
self.assertNotIn('userRating', Folder[self.folderid].as_subsonic_child(self.user))
for i in range(1, 6):
self._make_request('setRating', { 'id': str(self.folder.id), 'rating': i }, skip_post = True)
self.assertEqual(self.folder.as_subsonic_child(self.user)['userRating'], i)
self._make_request('setRating', { 'id': str(self.folder.id), 'rating': 0 }, skip_post = True)
self.assertNotIn('userRating', self.folder.as_subsonic_child(self.user))
self._make_request('setRating', { 'id': str(self.folderid), 'rating': i }, skip_post = True)
with db_session:
self.assertEqual(Folder[self.folderid].as_subsonic_child(self.user)['userRating'], i)
self._make_request('setRating', { 'id': str(self.folderid), 'rating': 0 }, skip_post = True)
with db_session:
self.assertNotIn('userRating', Folder[self.folderid].as_subsonic_child(self.user))
def test_scrobble(self):
self._make_request('scrobble', error = 10)
self._make_request('scrobble', { 'id': 'song' }, error = 0)
self._make_request('scrobble', { 'id': str(uuid.uuid4()) }, error = 70)
self._make_request('scrobble', { 'id': str(self.folder.id) }, error = 70)
self._make_request('scrobble', { 'id': str(self.folderid) }, error = 70)
self.skipTest('Weird request context/logger issue at exit')
self._make_request('scrobble', { 'id': str(self.track.id) })
self._make_request('scrobble', { 'id': str(self.track.id), 'submission': True })
self._make_request('scrobble', { 'id': str(self.track.id), 'submission': False })
self._make_request('scrobble', { 'id': str(self.trackid) })
self._make_request('scrobble', { 'id': str(self.trackid), 'submission': True })
self._make_request('scrobble', { 'id': str(self.trackid), 'submission': False })
if __name__ == '__main__':
unittest.main()

View File

@ -9,10 +9,12 @@
#
# Distributed under terms of the GNU AGPLv3 license.
from lxml import etree
import time
import uuid
from lxml import etree
from pony.orm import db_session
from supysonic.db import Folder, Artist, Album, Track
from .apitestbase import ApiTestBase
@ -21,60 +23,49 @@ class BrowseTestCase(ApiTestBase):
def setUp(self):
super(BrowseTestCase, self).setUp()
empty = Folder()
empty.root = True
empty.name = 'Empty root'
empty.path = '/tmp'
self.store.add(empty)
root = Folder()
root.root = True
root.name = 'Root folder'
root.path = 'tests/assets'
self.store.add(root)
with db_session:
Folder(root = True, name = 'Empty root', path = '/tmp')
root = Folder(root = True, name = 'Root folder', path = 'tests/assets')
for letter in 'ABC':
folder = Folder()
folder.name = letter + 'rtist'
folder.path = 'tests/assets/{}rtist'.format(letter)
folder.parent = root
folder = Folder(
name = letter + 'rtist',
path = 'tests/assets/{}rtist'.format(letter),
parent = root
)
artist = Artist()
artist.name = letter + 'rtist'
artist = Artist(name = letter + 'rtist')
for lether in 'AB':
afolder = Folder()
afolder.name = letter + lether + 'lbum'
afolder.path = 'tests/assets/{0}rtist/{0}{1}lbum'.format(letter, lether)
afolder.parent = folder
afolder = Folder(
name = letter + lether + 'lbum',
path = 'tests/assets/{0}rtist/{0}{1}lbum'.format(letter, lether),
parent = folder
)
album = Album()
album.name = letter + lether + 'lbum'
album.artist = artist
album = Album(name = letter + lether + 'lbum', artist = artist)
for num, song in enumerate([ 'One', 'Two', 'Three' ]):
track = Track()
track.disc = 1
track.number = num
track.title = song
track.duration = 2
track.album = album
track.artist = artist
track.bitrate = 320
track.path = 'tests/assets/{0}rtist/{0}{1}lbum/{2}'.format(letter, lether, song)
track.content_type = 'audio/mpeg'
track.last_modification = 0
track.root_folder = root
track.folder = afolder
self.store.add(track)
track = Track(
disc = 1,
number = num,
title = song,
duration = 2,
album = album,
artist = artist,
bitrate = 320,
path = 'tests/assets/{0}rtist/{0}{1}lbum/{2}'.format(letter, lether, song),
content_type = 'audio/mpeg',
last_modification = 0,
root_folder = root,
folder = afolder
)
self.store.commit()
self.assertEqual(self.store.find(Folder).count(), 11)
self.assertEqual(self.store.find(Folder, Folder.root == True).count(), 2)
self.assertEqual(self.store.find(Artist).count(), 3)
self.assertEqual(self.store.find(Album).count(), 6)
self.assertEqual(self.store.find(Track).count(), 18)
self.assertEqual(Folder.select().count(), 11)
self.assertEqual(Folder.select(lambda f: f.root).count(), 2)
self.assertEqual(Artist.select().count(), 3)
self.assertEqual(Album.select().count(), 6)
self.assertEqual(Track.select().count(), 18)
def test_get_music_folders(self):
# Do not validate against the XSD here, this is the only place where the API should return ids as ints
@ -91,7 +82,8 @@ class BrowseTestCase(ApiTestBase):
rv, child = self._make_request('getIndexes', { 'ifModifiedSince': int(time.time() * 1000 + 1000) }, tag = 'indexes')
self.assertEqual(len(child), 0)
fid = self.store.find(Folder, Folder.name == 'Empty root').one().id
with db_session:
fid = Folder.get(name = 'Empty root').id
rv, child = self._make_request('getIndexes', { 'musicFolderId': str(fid) }, tag = 'indexes')
self.assertEqual(len(child), 0)
@ -108,7 +100,8 @@ class BrowseTestCase(ApiTestBase):
self._make_request('getMusicDirectory', { 'id': str(uuid.uuid4()) }, error = 70)
# should test with folders with both children folders and tracks. this code would break in that case
for f in self.store.find(Folder):
with db_session:
for f in Folder.select():
rv, child = self._make_request('getMusicDirectory', { 'id': str(f.id) }, tag = 'directory')
self.assertEqual(child.get('id'), str(f.id))
self.assertEqual(child.get('name'), f.name)
@ -138,7 +131,8 @@ class BrowseTestCase(ApiTestBase):
self._make_request('getArtist', { 'id': 'artist' }, error = 0)
self._make_request('getArtist', { 'id': str(uuid.uuid4()) }, error = 70)
for ar in self.store.find(Artist):
with db_session:
for ar in Artist.select():
rv, child = self._make_request('getArtist', { 'id': str(ar.id) }, tag = 'artist')
self.assertEqual(child.get('id'), str(ar.id))
self.assertEqual(child.get('albumCount'), str(len(child)))
@ -153,7 +147,8 @@ class BrowseTestCase(ApiTestBase):
self._make_request('getAlbum', { 'id': 'nastynasty' }, error = 0)
self._make_request('getAlbum', { 'id': str(uuid.uuid4()) }, error = 70)
a = self.store.find(Album)[0]
with db_session:
a = Album.select().first()
rv, child = self._make_request('getAlbum', { 'id': str(a.id) }, tag = 'album')
self.assertEqual(child.get('id'), str(a.id))
self.assertEqual(child.get('songCount'), str(len(child)))
@ -169,7 +164,8 @@ class BrowseTestCase(ApiTestBase):
self._make_request('getSong', { 'id': 'nastynasty' }, error = 0)
self._make_request('getSong', { 'id': str(uuid.uuid4()) }, error = 70)
s = self.store.find(Track)[0]
with db_session:
s = Track.select().first()
self._make_request('getSong', { 'id': str(s.id) }, tag = 'song')
def test_get_videos(self):

View File

@ -11,8 +11,10 @@
import os.path
import uuid
from io import BytesIO
from PIL import Image
from pony.orm import db_session
from supysonic.db import Folder, Artist, Album, Track
@ -22,69 +24,69 @@ class MediaTestCase(ApiTestBase):
def setUp(self):
super(MediaTestCase, self).setUp()
self.folder = Folder()
self.folder.name = 'Root'
self.folder.path = os.path.abspath('tests/assets')
self.folder.root = True
self.folder.has_cover_art = True # 420x420 PNG
with db_session:
folder = Folder(
name = 'Root',
path = os.path.abspath('tests/assets'),
root = True,
has_cover_art = True # 420x420 PNG
)
self.folderid = folder.id
artist = Artist()
artist.name = 'Artist'
artist = Artist(name = 'Artist')
album = Album(artist = artist, name = 'Album')
album = Album()
album.artist = artist
album.name = 'Album'
self.track = Track()
self.track.title = '23bytes'
self.track.number = 1
self.track.disc = 1
self.track.artist = artist
self.track.album = album
self.track.path = os.path.abspath('tests/assets/23bytes')
self.track.root_folder = self.folder
self.track.folder = self.folder
self.track.duration = 2
self.track.bitrate = 320
self.track.content_type = 'audio/mpeg'
self.track.last_modification = 0
self.store.add(self.track)
self.store.commit()
track = Track(
title = '23bytes',
number = 1,
disc = 1,
artist = artist,
album = album,
path = os.path.abspath('tests/assets/23bytes'),
root_folder = folder,
folder = folder,
duration = 2,
bitrate = 320,
content_type = 'audio/mpeg',
last_modification = 0
)
self.trackid = track.id
def test_stream(self):
self._make_request('stream', error = 10)
self._make_request('stream', { 'id': 'string' }, error = 0)
self._make_request('stream', { 'id': str(uuid.uuid4()) }, error = 70)
self._make_request('stream', { 'id': str(self.folder.id) }, error = 70)
self._make_request('stream', { 'id': str(self.track.id), 'maxBitRate': 'string' }, error = 0)
self._make_request('stream', { 'id': str(self.folderid) }, error = 70)
self._make_request('stream', { 'id': str(self.trackid), 'maxBitRate': 'string' }, error = 0)
rv = self.client.get('/rest/stream.view', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.track.id) })
rv = self.client.get('/rest/stream.view', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.trackid) })
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'audio/mpeg')
self.assertEqual(len(rv.data), 23)
self.assertEqual(self.track.play_count, 1)
with db_session:
self.assertEqual(Track[self.trackid].play_count, 1)
def test_download(self):
self._make_request('download', error = 10)
self._make_request('download', { 'id': 'string' }, error = 0)
self._make_request('download', { 'id': str(uuid.uuid4()) }, error = 70)
self._make_request('download', { 'id': str(self.folder.id) }, error = 70)
self._make_request('download', { 'id': str(self.folderid) }, error = 70)
rv = self.client.get('/rest/download.view', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.track.id) })
rv = self.client.get('/rest/download.view', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.trackid) })
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'audio/mpeg')
self.assertEqual(len(rv.data), 23)
self.assertEqual(self.track.play_count, 0)
with db_session:
self.assertEqual(Track[self.trackid].play_count, 0)
def test_get_cover_art(self):
self._make_request('getCoverArt', error = 10)
self._make_request('getCoverArt', { 'id': 'string' }, error = 0)
self._make_request('getCoverArt', { 'id': str(uuid.uuid4()) }, error = 70)
self._make_request('getCoverArt', { 'id': str(self.track.id) }, error = 70)
self._make_request('getCoverArt', { 'id': str(self.folder.id), 'size': 'large' }, error = 0)
self._make_request('getCoverArt', { 'id': str(self.trackid) }, error = 70)
self._make_request('getCoverArt', { 'id': str(self.folderid), 'size': 'large' }, error = 0)
args = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.folder.id) }
args = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'id': str(self.folderid) }
rv = self.client.get('/rest/getCoverArt.view', query_string = args)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'image/jpeg')

View File

@ -11,6 +11,8 @@
import uuid
from pony.orm import db_session
from supysonic.db import Folder, Artist, Album, Track, Playlist, User
from .apitestbase import ApiTestBase
@ -19,63 +21,42 @@ class PlaylistTestCase(ApiTestBase):
def setUp(self):
super(PlaylistTestCase, self).setUp()
root = Folder()
root.root = True
root.name = 'Root folder'
root.path = 'tests/assets'
self.store.add(root)
artist = Artist()
artist.name = 'Artist'
album = Album()
album.name = 'Album'
album.artist = artist
with db_session:
root = Folder(root = True, name = 'Root folder', path = 'tests/assets')
artist = Artist(name = 'Artist')
album = Album(name = 'Album', artist = artist)
songs = {}
for num, song in enumerate([ 'One', 'Two', 'Three', 'Four' ]):
track = Track()
track.disc = 1
track.number = num
track.title = song
track.duration = 2
track.album = album
track.artist = artist
track.bitrate = 320
track.path = 'tests/assets/empty'
track.content_type = 'audio/mpeg'
track.last_modification = 0
track.root_folder = root
track.folder = root
self.store.add(track)
track = Track(
disc = 1,
number = num,
title = song,
duration = 2,
album = album,
artist = artist,
bitrate = 320,
path = 'tests/assets/' + song,
content_type = 'audio/mpeg',
last_modification = 0,
root_folder = root,
folder = root
)
songs[song] = track
users = { u.name: u for u in self.store.find(User) }
users = { u.name: u for u in User.select() }
playlist = Playlist()
playlist.user = users['alice']
playlist.name = "Alice's"
playlist = Playlist(user = users['alice'], name = "Alice's")
playlist.add(songs['One'])
playlist.add(songs['Three'])
self.store.add(playlist)
playlist = Playlist()
playlist.user = users['alice']
playlist.public = True
playlist.name = "Alice's public"
playlist = Playlist(user = users['alice'], public = True, name = "Alice's public")
playlist.add(songs['One'])
playlist.add(songs['Two'])
self.store.add(playlist)
playlist = Playlist()
playlist.user = users['bob']
playlist.name = "Bob's"
playlist = Playlist(user = users['bob'], name = "Bob's")
playlist.add(songs['Two'])
playlist.add(songs['Four'])
self.store.add(playlist)
self.store.commit()
def test_get_playlists(self):
# get own playlists
@ -113,7 +94,8 @@ class PlaylistTestCase(ApiTestBase):
self._make_request('getPlaylist', { 'id': str(uuid.uuid4()) }, error = 70)
# other's private from non admin
playlist = self.store.find(Playlist, Playlist.public == False, Playlist.user_id == User.id, User.name == 'alice').one()
with db_session:
playlist = Playlist.get(lambda p: not p.public == False and p.user.name == 'alice')
self._make_request('getPlaylist', { 'u': 'bob', 'p': 'B0b', 'id': str(playlist.id) }, error = 50)
# standard
@ -156,9 +138,11 @@ class PlaylistTestCase(ApiTestBase):
self._make_request('createPlaylist', { 'u': 'bob', 'p': 'B0b', 'playlistId': playlist.get('id') }, error = 50)
# create more useful playlist
songs = { s.title: str(s.id) for s in self.store.find(Track) }
with db_session:
songs = { s.title: str(s.id) for s in Track.select() }
self._make_request('createPlaylist', { 'name': 'songs', 'songId': map(lambda s: songs[s], [ 'Three', 'One', 'Two' ]) }, skip_post = True)
playlist = self.store.find(Playlist, Playlist.name == 'songs').one()
with db_session:
playlist = Playlist.get(name = 'songs')
self.assertIsNotNone(playlist)
rv, child = self._make_request('getPlaylist', { 'id': str(playlist.id) }, tag = 'playlist')
self.assertEqual(child.get('songCount'), '3')
@ -174,6 +158,10 @@ class PlaylistTestCase(ApiTestBase):
self.assertEqual(self._xpath(child, 'count(./entry)'), 1)
self.assertEqual(child[0].get('title'), 'Two')
@db_session
def assertPlaylistCountEqual(self, count):
self.assertEqual(Playlist.select().count(), count)
def test_delete_playlist(self):
# check params
self._make_request('deletePlaylist', error = 10)
@ -181,27 +169,30 @@ class PlaylistTestCase(ApiTestBase):
self._make_request('deletePlaylist', { 'id': str(uuid.uuid4()) }, error = 70)
# delete unowned when not admin
playlist = self.store.find(Playlist, Playlist.user_id == User.id, User.name == 'alice')[0]
with db_session:
playlist = Playlist.select(lambda p: p.user.name == 'alice').first()
self._make_request('deletePlaylist', { 'u': 'bob', 'p': 'B0b', 'id': str(playlist.id) }, error = 50)
self.assertEqual(self.store.find(Playlist).count(), 3)
self.assertPlaylistCountEqual(3);
# delete owned
self._make_request('deletePlaylist', { 'id': str(playlist.id) }, skip_post = True)
self.assertEqual(self.store.find(Playlist).count(), 2)
self.assertPlaylistCountEqual(2);
self._make_request('deletePlaylist', { 'id': str(playlist.id) }, error = 70)
self.assertEqual(self.store.find(Playlist).count(), 2)
self.assertPlaylistCountEqual(2);
# delete unowned when admin
playlist = self.store.find(Playlist, Playlist.user_id == User.id, User.name == 'bob').one()
with db_session:
playlist = Playlist.get(lambda p: p.user.name == 'bob')
self._make_request('deletePlaylist', { 'id': str(playlist.id) }, skip_post = True)
self.assertEqual(self.store.find(Playlist).count(), 1)
self.assertPlaylistCountEqual(1);
def test_update_playlist(self):
self._make_request('updatePlaylist', error = 10)
self._make_request('updatePlaylist', { 'playlistId': 1234 }, error = 0)
self._make_request('updatePlaylist', { 'playlistId': str(uuid.uuid4()) }, error = 70)
playlist = self.store.find(Playlist, Playlist.user_id == User.id, User.name == 'alice')[0]
with db_session:
playlist = Playlist.select(lambda p: p.user.name == 'alice').order_by(Playlist.created).first()
pid = str(playlist.id)
self._make_request('updatePlaylist', { 'playlistId': pid, 'songIdToAdd': 'string' }, error = 0)
self._make_request('updatePlaylist', { 'playlistId': pid, 'songIndexToRemove': 'string' }, error = 0)
@ -226,7 +217,8 @@ class PlaylistTestCase(ApiTestBase):
self.assertEqual(self._xpath(child, 'count(./entry)'), 1)
self.assertEqual(self._find(child, './entry').get('title'), 'Three')
songs = { s.title: str(s.id) for s in self.store.find(Track) }
with db_session:
songs = { s.title: str(s.id) for s in Track.select() }
self._make_request('updatePlaylist', { 'playlistId': pid, 'songIdToAdd': [ songs['One'], songs['Two'], songs['Two'] ] }, skip_post = True)
rv, child = self._make_request('getPlaylist', { 'id': pid }, tag = 'playlist')

View File

@ -11,6 +11,8 @@
import unittest
from pony.orm import db_session
from supysonic.db import Folder, Track
from supysonic.managers.folder import FolderManager
from supysonic.scanner import Scanner
@ -23,12 +25,13 @@ class TranscodingTestCase(ApiTestBase):
super(TranscodingTestCase, self).setUp()
FolderManager.add(self.store, 'Folder', 'tests/assets/folder')
scanner = Scanner(self.store)
scanner.scan(self.store.find(Folder).one())
FolderManager.add('Folder', 'tests/assets/folder')
scanner = Scanner()
with db_session:
scanner.scan(Folder.get())
scanner.finish()
self.trackid = self.store.find(Track).one().id
self.trackid = Track.get().id
def _stream(self, **kwargs):
kwargs.update({ 'u': 'alice', 'p': 'Alic3', 'c': 'tests', 'v': '1.8.0', 'id': self.trackid })