1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 17:06:17 +00:00

Added tests on API auth, format and such

This commit is contained in:
spl0k 2017-10-29 16:08:00 +01:00
parent 7de57cb680
commit 8a14ef496d
5 changed files with 199 additions and 67 deletions

View File

@ -11,11 +11,13 @@
import unittest
from .test_response_helper import suite as rh_suite
from .test_api_setup import ApiSetupTestCase
def suite():
suite = unittest.TestSuite()
suite.addTest(rh_suite())
suite.addTest(unittest.makeSuite(ApiSetupTestCase))
return suite

28
tests/api/appmock.py Normal file
View File

@ -0,0 +1,28 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import io
from flask import Flask
from supysonic.db import get_store
class AppMock(object):
def __init__(self, with_store = True):
self.app = Flask(__name__)
self.app.testing = True
if with_store:
self.store = get_store('sqlite:')
with io.open('schema/sqlite.sql', 'r') as sql:
schema = sql.read()
for statement in schema.split(';'):
self.store.execute(statement)
else:
self.store = None

162
tests/api/test_api_setup.py Normal file
View File

@ -0,0 +1,162 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
import base64
import binascii
import io
import simplejson
import sys
import unittest
from flask import request
from xml.etree import ElementTree
from supysonic.managers.user import UserManager
from .appmock import AppMock
class ApiSetupTestCase(unittest.TestCase):
def setUp(self):
app_mock = AppMock()
self.app = app_mock.app
self.store = app_mock.store
self.client = self.app.test_client()
sys.modules['supysonic.web'] = app_mock
import supysonic.api
UserManager.add(self.store, 'alice', 'Alic3', 'test@example.com', True)
def tearDown(self):
self.store.close()
to_unload = [ m for m in sys.modules if m.startswith('supysonic') ]
for m in to_unload:
del sys.modules[m]
def __basic_auth_get(self, username, password):
hashed = base64.b64encode('{}:{}'.format(username, password))
headers = { 'Authorization': 'Basic ' + hashed }
return self.client.get('/rest/ping.view', headers = headers, query_string = { 'c': 'tests' })
def __query_params_auth_get(self, username, password):
return self.client.get('/rest/ping.view', query_string = { 'c': 'tests', 'u': username, 'p': password })
def __query_params_auth_enc_get(self, username, password):
return self.__query_params_auth_get(username, 'enc:' + binascii.hexlify(password))
def __form_auth_post(self, username, password):
return self.client.post('/rest/ping.view', data = { 'c': 'tests', 'u': username, 'p': password })
def __form_auth_enc_post(self, username, password):
return self.__form_auth_post(username, 'enc:' + binascii.hexlify(password))
def __test_auth(self, method):
# non-existent user
rv = method('null', 'null')
self.assertEqual(rv.status_code, 401)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="40"', rv.data)
# user request with bad password
rv = method('alice', 'wrong password')
self.assertEqual(rv.status_code, 401)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="40"', rv.data)
# user request
rv = method('alice', 'Alic3')
self.assertEqual(rv.status_code, 200)
self.assertIn('status="ok"', rv.data)
def test_auth_basic(self):
# No auth info
rv = self.client.get('/rest/ping.view?c=tests')
self.assertEqual(rv.status_code, 401)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="40"', rv.data)
self.__test_auth(self.__basic_auth_get)
# Shouldn't accept 'enc:' passwords
rv = self.__basic_auth_get('alice', 'enc:' + binascii.hexlify('Alic3'))
self.assertEqual(rv.status_code, 401)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="40"', rv.data)
def test_auth_query_params(self):
self.__test_auth(self.__query_params_auth_get)
self.__test_auth(self.__query_params_auth_enc_get)
def test_auth_post(self):
self.__test_auth(self.__form_auth_post)
self.__test_auth(self.__form_auth_enc_post)
def test_required_client(self):
rv = self.client.get('/rest/ping.view', query_string = { 'u': 'alice', 'p': 'Alic3' })
self.assertIn('status="failed"', rv.data)
self.assertIn('code="10"', rv.data)
rv = self.client.get('/rest/ping.view', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests' })
self.assertIn('status="ok"', rv.data)
def test_format(self):
args = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests' }
rv = self.client.get('/rest/getLicense.view', query_string = args)
self.assertEqual(rv.status_code, 200)
self.assertTrue(rv.mimetype.endswith('/xml')) # application/xml or text/xml
self.assertIn('status="ok"', rv.data)
xml = ElementTree.fromstring(rv.data)
self.assertIsNotNone(xml.find('./{http://subsonic.org/restapi}license'))
args.update({ 'f': 'json' })
rv = self.client.get('/rest/getLicense.view', query_string = args)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'application/json')
json = simplejson.loads(rv.data)
self.assertIn('subsonic-response', json)
self.assertEqual(json['subsonic-response']['status'], 'ok')
self.assertIn('license', json['subsonic-response'])
args.update({ 'f': 'jsonp' })
rv = self.client.get('/rest/getLicense.view', query_string = args)
self.assertEqual(rv.mimetype, 'application/javascript')
json = simplejson.loads(rv.data)
self.assertIn('subsonic-response', json)
self.assertEqual(json['subsonic-response']['status'], 'failed')
self.assertEqual(json['subsonic-response']['error']['code'], 10)
args.update({ 'callback': 'dummy_cb' })
rv = self.client.get('/rest/getLicense.view', query_string = args)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'application/javascript')
self.assertTrue(rv.data.startswith('dummy_cb({'))
self.assertTrue(rv.data.endswith('})'))
json = simplejson.loads(rv.data[9:-1])
self.assertIn('subsonic-response', json)
self.assertEqual(json['subsonic-response']['status'], 'ok')
self.assertIn('license', json['subsonic-response'])
def test_not_implemented(self):
# Access to not implemented endpoint
rv = self.client.get('/rest/not-implemented', query_string = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests' })
self.assertEqual(rv.status_code, 501)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="0"', rv.data)
rv = self.client.post('/rest/not-implemented', data = { 'u': 'alice', 'p': 'Alic3', 'c': 'tests' })
self.assertEqual(rv.status_code, 501)
self.assertIn('status="failed"', rv.data)
self.assertIn('code="0"', rv.data)
if __name__ == '__main__':
unittest.main()

View File

@ -10,21 +10,23 @@
# Distributed under terms of the GNU AGPLv3 license.
import unittest, sys
from flask import Flask
import simplejson
from xml.etree import ElementTree
class AppMock(object):
app = Flask(__name__)
store = None
from .appmock import AppMock
class ResponseHelperBaseCase(unittest.TestCase):
def setUp(self):
sys.modules[u'supysonic.web'] = AppMock()
sys.modules[u'supysonic.web'] = AppMock(with_store = False)
from supysonic.api import ResponseHelper
self.helper = ResponseHelper
def tearDown(self):
to_unload = [ m for m in sys.modules if m.startswith('supysonic') ]
for m in to_unload:
del sys.modules[m]
class ResponseHelperJsonTestCase(ResponseHelperBaseCase):
def serialize_and_deserialize(self, d, error = False):
if not isinstance(d, dict):

View File

@ -45,68 +45,6 @@ class ApiTestCase(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
def test_ping(self):
# GET non-existent user
rv = self.app.get('/rest/ping.view?u=null&p=null&c=test')
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# POST non-existent user
rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test'))
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# GET user request
rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test')
self.assertIn('status="ok"', rv.data)
# POST user request
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('status="ok"', rv.data)
# GET user request with old enc:
rv = self.app.get('/rest/ping.view?u=alice&p=enc:616c696365&c=test')
self.assertIn('status="ok"', rv.data)
# POST user request with old enc:
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='enc:616c696365', c='test'))
self.assertIn('status="ok"', rv.data)
# GET user request with bad password
rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test')
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# POST user request with bad password
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test'))
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
def test_ping_in_jsonp(self):
# If ping in jsonp works all other endpoints must work OK
# GET non-existent user
rv = self.app.get('/rest/ping.view?u=null&p=null&c=test&f=jsonp&callback=test')
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# POST non-existent user
rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# GET user request
rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test&f=jsonp&callback=test')
self.assertIn('"status": "ok"', rv.data)
# POST user request
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "ok"', rv.data)
# GET user request with bad password
rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test&f=jsonp&callback=test')
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# POST user request with bad password
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
def test_not_implemented(self):
# Access to not implemented endpoint
rv = self.app.get('/rest/not-implemented?u=alice&p=alice&c=test')
self.assertIn('message="Not implemented"', rv.data)
rv = self.app.post('/rest/not-implemented', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Not implemented"', rv.data)
def test_get_license(self):
# GET user request
rv = self.app.get('/rest/getLicense.view?u=alice&p=alice&c=test')