mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-22 17:06:17 +00:00
Py3: str/bytes, iterators, etc.
It seems to work on Python 3 now! Ref #75
This commit is contained in:
parent
1a79fe3d70
commit
7edb246b1e
@ -61,7 +61,7 @@ def decode_password(password):
|
||||
return password
|
||||
|
||||
try:
|
||||
return binascii.unhexlify(password[4:]).decode('utf-8')
|
||||
return binascii.unhexlify(password[4:].encode('utf-8')).decode('utf-8')
|
||||
except:
|
||||
return password
|
||||
|
||||
@ -141,14 +141,19 @@ class ResponseHelper:
|
||||
if not isinstance(d, dict):
|
||||
raise TypeError('Expecting a dict')
|
||||
|
||||
keys_to_remove = []
|
||||
for key, value in d.items():
|
||||
if isinstance(value, dict):
|
||||
d[key] = ResponseHelper.remove_empty_lists(value)
|
||||
elif isinstance(value, list):
|
||||
if len(value) == 0:
|
||||
del d[key]
|
||||
keys_to_remove.append(key)
|
||||
else:
|
||||
d[key] = [ ResponseHelper.remove_empty_lists(item) if isinstance(item, dict) else item for item in value ]
|
||||
|
||||
for key in keys_to_remove:
|
||||
del d[key]
|
||||
|
||||
return d
|
||||
|
||||
@staticmethod
|
||||
@ -178,7 +183,7 @@ class ResponseHelper:
|
||||
elem = ElementTree.Element('subsonic-response')
|
||||
ResponseHelper.dict2xml(elem, ret)
|
||||
|
||||
return minidom.parseString(ElementTree.tostring(elem)).toprettyxml(indent = ' ', encoding = 'UTF-8')
|
||||
return minidom.parseString(ElementTree.tostring(elem)).toprettyxml(indent = ' ')
|
||||
|
||||
@staticmethod
|
||||
def dict2xml(elem, dictionary):
|
||||
|
@ -82,7 +82,7 @@ def try_unstar(starred_cls, eid):
|
||||
|
||||
def merge_errors(errors):
|
||||
error = None
|
||||
errors = filter(None, errors)
|
||||
errors = [ e for e in errors if e ]
|
||||
if len(errors) == 1:
|
||||
error = errors[0]
|
||||
elif len(errors) > 1:
|
||||
@ -149,7 +149,7 @@ def rate():
|
||||
except:
|
||||
return request.error_formatter(0, 'Invalid parameter')
|
||||
|
||||
if not rating in xrange(6):
|
||||
if not 0 <= rating <= 5:
|
||||
return request.error_formatter(0, 'rating must be between 0 and 5 (inclusive)')
|
||||
|
||||
with db_session:
|
||||
|
@ -84,9 +84,9 @@ def list_indexes():
|
||||
indexes = dict()
|
||||
for artist in artists:
|
||||
index = artist.name[0].upper()
|
||||
if index in map(str, xrange(10)):
|
||||
if index in string.digits:
|
||||
index = '#'
|
||||
elif index not in string.letters:
|
||||
elif index not in string.ascii_letters:
|
||||
index = '?'
|
||||
|
||||
if index not in indexes:
|
||||
@ -132,9 +132,9 @@ def list_artists():
|
||||
indexes = dict()
|
||||
for artist in Artist.select():
|
||||
index = artist.name[0].upper() if artist.name else '?'
|
||||
if index in map(str, xrange(10)):
|
||||
if index in string.digits:
|
||||
index = '#'
|
||||
elif index not in string.letters:
|
||||
elif index not in string.ascii_letters:
|
||||
index = '?'
|
||||
|
||||
if index not in indexes:
|
||||
|
@ -40,8 +40,10 @@ def prepare_transcoding_cmdline(base_cmdline, input_file, input_format, output_f
|
||||
if not base_cmdline:
|
||||
return None
|
||||
ret = base_cmdline.split()
|
||||
for i in xrange(len(ret)):
|
||||
ret[i] = ret[i].replace('%srcpath', input_file).replace('%srcfmt', input_format).replace('%outfmt', output_format).replace('%outrate', str(output_bitrate))
|
||||
ret = [
|
||||
part.replace('%srcpath', input_file).replace('%srcfmt', input_format).replace('%outfmt', output_format).replace('%outrate', str(output_bitrate))
|
||||
for part in ret
|
||||
]
|
||||
return ret
|
||||
|
||||
@app.route('/rest/stream.view', methods = [ 'GET', 'POST' ])
|
||||
|
@ -71,9 +71,8 @@ def create_playlist():
|
||||
songs = request.values.getlist('songId')
|
||||
try:
|
||||
playlist_id = uuid.UUID(playlist_id) if playlist_id else None
|
||||
songs = map(uuid.UUID, songs)
|
||||
except:
|
||||
return request.error_formatter(0, 'Invalid parameter')
|
||||
return request.error_formatter(0, 'Invalid playlist id')
|
||||
|
||||
if playlist_id:
|
||||
try:
|
||||
@ -92,14 +91,17 @@ def create_playlist():
|
||||
else:
|
||||
return request.error_formatter(10, 'Missing playlist id or name')
|
||||
|
||||
for sid in songs:
|
||||
try:
|
||||
try:
|
||||
songs = map(uuid.UUID, songs)
|
||||
for sid in songs:
|
||||
track = Track[sid]
|
||||
except ObjectNotFound:
|
||||
rollback()
|
||||
return request.error_formatter(70, 'Unknown song')
|
||||
|
||||
playlist.add(track)
|
||||
playlist.add(track)
|
||||
except ValueError:
|
||||
rollback()
|
||||
return request.error_formatter(0, 'Invalid song id')
|
||||
except ObjectNotFound:
|
||||
rollback()
|
||||
return request.error_formatter(70, 'Unknown song')
|
||||
|
||||
return request.formatter(dict())
|
||||
|
||||
@ -129,11 +131,6 @@ def update_playlist():
|
||||
playlist = res
|
||||
name, comment, public = map(request.values.get, [ 'name', 'comment', 'public' ])
|
||||
to_add, to_remove = map(request.values.getlist, [ 'songIdToAdd', 'songIndexToRemove' ])
|
||||
try:
|
||||
to_add = map(uuid.UUID, to_add)
|
||||
to_remove = map(int, to_remove)
|
||||
except:
|
||||
return request.error_formatter(0, 'Invalid parameter')
|
||||
|
||||
if name:
|
||||
playlist.name = name
|
||||
@ -142,14 +139,19 @@ def update_playlist():
|
||||
if public:
|
||||
playlist.public = public in (True, 'True', 'true', 1, '1')
|
||||
|
||||
for sid in to_add:
|
||||
try:
|
||||
track = Track[sid]
|
||||
except ObjectNotFound:
|
||||
return request.error_formatter(70, 'Unknown song')
|
||||
playlist.add(track)
|
||||
try:
|
||||
to_add = map(uuid.UUID, to_add)
|
||||
to_remove = map(int, to_remove)
|
||||
|
||||
playlist.remove_at_indexes(to_remove)
|
||||
for sid in to_add:
|
||||
track = Track[sid]
|
||||
playlist.add(track)
|
||||
|
||||
playlist.remove_at_indexes(to_remove)
|
||||
except ValueError:
|
||||
return request.error_formatter(0, 'Invalid parameter')
|
||||
except ObjectNotFound:
|
||||
return request.error_formatter(70, 'Unknown song')
|
||||
|
||||
return request.formatter(dict())
|
||||
|
||||
|
@ -94,11 +94,11 @@ def new_search():
|
||||
albums = select(t.folder for t in Track if query in t.folder.name).limit(album_count, album_offset)
|
||||
songs = Track.select(lambda t: query in t.title).limit(song_count, song_offset)
|
||||
|
||||
return request.formatter(dict(searchResult2 = OrderedDict(
|
||||
artist = [ dict(id = str(a.id), name = a.name) for a in artists ],
|
||||
album = [ f.as_subsonic_child(request.user) for f in albums ],
|
||||
song = [ t.as_subsonic_child(request.user, request.client) for t in songs ]
|
||||
)))
|
||||
return request.formatter(dict(searchResult2 = OrderedDict((
|
||||
('artist', [ dict(id = str(a.id), name = a.name) for a in artists ]),
|
||||
('album', [ f.as_subsonic_child(request.user) for f in albums ]),
|
||||
('song', [ t.as_subsonic_child(request.user, request.client) for t in songs ])
|
||||
))))
|
||||
|
||||
@app.route('/rest/search3.view', methods = [ 'GET', 'POST' ])
|
||||
def search_id3():
|
||||
@ -123,9 +123,9 @@ def search_id3():
|
||||
albums = Album.select(lambda a: query in a.name).limit(album_count, album_offset)
|
||||
songs = Track.select(lambda t: query in t.title).limit(song_count, song_offset)
|
||||
|
||||
return request.formatter(dict(searchResult3 = OrderedDict(
|
||||
artist = [ a.as_subsonic_artist(request.user) for a in artists ],
|
||||
album = [ a.as_subsonic_album(request.user) for a in albums ],
|
||||
song = [ t.as_subsonic_child(request.user, request.client) for t in songs ]
|
||||
)))
|
||||
return request.formatter(dict(searchResult3 = OrderedDict((
|
||||
('artist', [ a.as_subsonic_artist(request.user) for a in artists ]),
|
||||
('album', [ a.as_subsonic_album(request.user) for a in albums ]),
|
||||
('song', [ t.as_subsonic_child(request.user, request.client) for t in songs ])
|
||||
))))
|
||||
|
||||
|
@ -24,10 +24,13 @@ from .py23 import strtype
|
||||
|
||||
class LastFm:
|
||||
def __init__(self, config, user, logger):
|
||||
self.__api_key = config['api_key']
|
||||
self.__api_secret = config['secret']
|
||||
if config['api_key'] is not None and config['secret'] is not None:
|
||||
self.__api_key = config['api_key']
|
||||
self.__api_secret = config['secret'].encode('utf-8')
|
||||
self.__enabled = True
|
||||
else:
|
||||
self.__enabled = False
|
||||
self.__user = user
|
||||
self.__enabled = self.__api_key is not None and self.__api_secret is not None
|
||||
self.__logger = logger
|
||||
|
||||
def link_account(self, token):
|
||||
@ -75,10 +78,9 @@ class LastFm:
|
||||
|
||||
sig_str = b''
|
||||
for k, v in sorted(kwargs.items()):
|
||||
if isinstance(v, strtype):
|
||||
sig_str += k + v.encode('utf-8')
|
||||
else:
|
||||
sig_str += k + str(v)
|
||||
k = k.encode('utf-8')
|
||||
v = v.encode('utf-8') if isinstance(v, strtype) else str(v).encode('utf-8')
|
||||
sig_str += k + v
|
||||
sig = hashlib.md5(sig_str + self.__api_secret).hexdigest()
|
||||
|
||||
kwargs['api_sig'] = sig
|
||||
|
@ -136,6 +136,6 @@ class UserManager:
|
||||
@staticmethod
|
||||
def __encrypt_password(password, salt = None):
|
||||
if salt is None:
|
||||
salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6))
|
||||
salt = ''.join(random.choice(string.printable.strip()) for _ in range(6))
|
||||
return hashlib.sha1(salt.encode('utf-8') + password.encode('utf-8')).hexdigest(), salt
|
||||
|
||||
|
@ -154,7 +154,7 @@ class Scanner:
|
||||
trdict['genre'] = self.__try_read_tag(tag, 'genre')
|
||||
trdict['duration'] = int(tag.info.length)
|
||||
|
||||
trdict['bitrate'] = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000
|
||||
trdict['bitrate'] = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) // 1000
|
||||
trdict['content_type'] = mimetypes.guess_type(path, False)[0] or 'application/octet-stream'
|
||||
trdict['last_modification'] = int(os.path.getmtime(path))
|
||||
|
||||
|
@ -5,38 +5,42 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2017 Alban 'spl0k' Féron
|
||||
# 2017 Óscar García Amor
|
||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
||||
# 2017 Óscar García Amor
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import simplejson
|
||||
|
||||
from xml.etree import ElementTree
|
||||
|
||||
from ..testbase import TestBase
|
||||
from ..utils import hexlify
|
||||
|
||||
class ApiSetupTestCase(TestBase):
|
||||
__with_api__ = True
|
||||
|
||||
def setUp(self):
|
||||
super(ApiSetupTestCase, self).setUp()
|
||||
self._patch_client()
|
||||
|
||||
def __basic_auth_get(self, username, password):
|
||||
hashed = base64.b64encode('{}:{}'.format(username, password))
|
||||
headers = { 'Authorization': 'Basic ' + hashed }
|
||||
hashed = base64.b64encode('{}:{}'.format(username, password).encode('utf-8'))
|
||||
headers = { 'Authorization': 'Basic ' + hashed.decode('utf-8') }
|
||||
return self.client.get('/rest/ping.view', headers = headers, query_string = { 'c': 'tests' })
|
||||
|
||||
def __query_params_auth_get(self, username, password):
|
||||
return self.client.get('/rest/ping.view', query_string = { 'c': 'tests', 'u': username, 'p': password })
|
||||
|
||||
def __query_params_auth_enc_get(self, username, password):
|
||||
return self.__query_params_auth_get(username, 'enc:' + binascii.hexlify(password))
|
||||
return self.__query_params_auth_get(username, 'enc:' + hexlify(password))
|
||||
|
||||
def __form_auth_post(self, username, password):
|
||||
return self.client.post('/rest/ping.view', data = { 'c': 'tests', 'u': username, 'p': password })
|
||||
|
||||
def __form_auth_enc_post(self, username, password):
|
||||
return self.__form_auth_post(username, 'enc:' + binascii.hexlify(password))
|
||||
return self.__form_auth_post(username, 'enc:' + hexlify(password))
|
||||
|
||||
def __test_auth(self, method):
|
||||
# non-existent user
|
||||
@ -66,7 +70,7 @@ class ApiSetupTestCase(TestBase):
|
||||
self.__test_auth(self.__basic_auth_get)
|
||||
|
||||
# Shouldn't accept 'enc:' passwords
|
||||
rv = self.__basic_auth_get('alice', 'enc:' + binascii.hexlify('Alic3'))
|
||||
rv = self.__basic_auth_get('alice', 'enc:' + hexlify('Alic3'))
|
||||
self.assertEqual(rv.status_code, 401)
|
||||
self.assertIn('status="failed"', rv.data)
|
||||
self.assertIn('code="40"', rv.data)
|
||||
|
@ -5,7 +5,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2017 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
@ -140,7 +140,7 @@ class PlaylistTestCase(ApiTestBase):
|
||||
# create more useful playlist
|
||||
with db_session:
|
||||
songs = { s.title: str(s.id) for s in Track.select() }
|
||||
self._make_request('createPlaylist', { 'name': 'songs', 'songId': map(lambda s: songs[s], [ 'Three', 'One', 'Two' ]) }, skip_post = True)
|
||||
self._make_request('createPlaylist', { 'name': 'songs', 'songId': list(map(lambda s: songs[s], [ 'Three', 'One', 'Two' ])) }, skip_post = True)
|
||||
with db_session:
|
||||
playlist = Playlist.get(name = 'songs')
|
||||
self.assertIsNotNone(playlist)
|
||||
|
@ -5,13 +5,12 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2017 Alban 'spl0k' Féron
|
||||
# 2017 Óscar García Amor
|
||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
||||
# 2017 Óscar García Amor
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
import binascii
|
||||
|
||||
from ..utils import hexlify
|
||||
from .apitestbase import ApiTestBase
|
||||
|
||||
class UserTestCase(ApiTestBase):
|
||||
@ -141,7 +140,7 @@ class UserTestCase(ApiTestBase):
|
||||
self._make_request('changePassword', { 'username': 'alice', 'password': 'Alic3', 'u': 'alice', 'p': 'новыйпароль' }, skip_post = True)
|
||||
|
||||
# non ASCII in hex encoded password
|
||||
self._make_request('changePassword', { 'username': 'alice', 'password': 'enc:' + binascii.hexlify('новыйпароль') }, skip_post = True)
|
||||
self._make_request('changePassword', { 'username': 'alice', 'password': 'enc:' + hexlify(u'новыйпароль') }, skip_post = True)
|
||||
self._make_request('ping', { 'u': 'alice', 'p': 'новыйпароль' })
|
||||
|
||||
# new password starting with 'enc:' followed by non hex chars
|
||||
|
@ -5,7 +5,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
@ -147,7 +147,7 @@ class ScannerTestCase(unittest.TestCase):
|
||||
self.assertEqual(db.Track.select().count(), 2)
|
||||
|
||||
tf.seek(0, 0)
|
||||
tf.write('\x00' * 4096)
|
||||
tf.write(b'\x00' * 4096)
|
||||
tf.truncate()
|
||||
|
||||
self.scanner.scan(db.Folder[self.folderid])
|
||||
|
@ -4,7 +4,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2017 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
@ -13,6 +13,10 @@ from ..testbase import TestBase
|
||||
class FrontendTestBase(TestBase):
|
||||
__with_webui__ = True
|
||||
|
||||
def setUp(self):
|
||||
super(FrontendTestBase, self).setUp()
|
||||
self._patch_client()
|
||||
|
||||
def _login(self, username, password):
|
||||
return self.client.post('/user/login', data = { 'user': username, 'password': password }, follow_redirects = True)
|
||||
|
||||
|
@ -5,7 +5,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2017 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
|
@ -53,6 +53,32 @@ class TestConfig(DefaultConfig):
|
||||
'mount_api': with_api
|
||||
})
|
||||
|
||||
class MockResponse(object):
|
||||
def __init__(self, response):
|
||||
self.__status_code = response.status_code
|
||||
self.__data = response.get_data(as_text = True)
|
||||
self.__mimetype = response.mimetype
|
||||
|
||||
@property
|
||||
def status_code(self):
|
||||
return self.__status_code
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
return self.__data
|
||||
|
||||
@property
|
||||
def mimetype(self):
|
||||
return self.__mimetype
|
||||
|
||||
def patch_method(f):
|
||||
original = f
|
||||
def patched(*args, **kwargs):
|
||||
rv = original(*args, **kwargs)
|
||||
return MockResponse(rv)
|
||||
|
||||
return patched
|
||||
|
||||
class TestBase(unittest.TestCase):
|
||||
__with_webui__ = False
|
||||
__with_api__ = False
|
||||
@ -76,6 +102,10 @@ class TestBase(unittest.TestCase):
|
||||
UserManager.add('alice', 'Alic3', 'test@example.com', True)
|
||||
UserManager.add('bob', 'B0b', 'bob@example.com', False)
|
||||
|
||||
def _patch_client(self):
|
||||
self.client.get = patch_method(self.client.get)
|
||||
self.client.post = patch_method(self.client.post)
|
||||
|
||||
@staticmethod
|
||||
def __should_unload_module(module):
|
||||
if module.startswith('supysonic'):
|
||||
|
14
tests/utils.py
Normal file
14
tests/utils.py
Normal file
@ -0,0 +1,14 @@
|
||||
# coding: utf-8
|
||||
#
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2018 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
import binascii
|
||||
|
||||
def hexlify(s):
|
||||
return binascii.hexlify(s.encode('utf-8')).decode('utf-8')
|
||||
|
Loading…
Reference in New Issue
Block a user