diff --git a/.gitignore b/.gitignore index 57c56ef..5c99f15 100755 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,68 @@ -*.pyc -.*.sw[a-z] -*~ +# ---> Python +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ build/ +develop-eggs/ dist/ -MANIFEST +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# ---> Vim +[._]*.s[a-w][a-z] +[._]s[a-w][a-z] +*.un~ +Session.vim +.netrwhist +*~ + diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..759ca25 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,5 @@ +language: python +python: + - "2.7" +install: "python setup.py install" +script: "python setup.py test" diff --git a/MANIFEST.in b/MANIFEST.in index f19a717..238b63e 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,3 +1,5 @@ include cgi-bin/* include config.sample include README.md +recursive-include supysonic/templates * +recursive-include supysonic/static * diff --git a/cgi-bin/server.py b/cgi-bin/server.py index 61043f7..61751a0 100755 --- a/cgi-bin/server.py +++ b/cgi-bin/server.py @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # coding: utf-8 # This file is part of Supysonic. diff --git a/cgi-bin/supysonic.cgi b/cgi-bin/supysonic.cgi index 9cadacc..e2a7138 100755 --- a/cgi-bin/supysonic.cgi +++ b/cgi-bin/supysonic.cgi @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # coding: utf-8 # This file is part of Supysonic. diff --git a/cgi-bin/supysonic.fcgi b/cgi-bin/supysonic.fcgi index 81fd551..bec9cfc 100755 --- a/cgi-bin/supysonic.fcgi +++ b/cgi-bin/supysonic.fcgi @@ -1,4 +1,4 @@ -#!/usr/bin/python +#!/usr/bin/env python # coding: utf-8 # This file is part of Supysonic. diff --git a/config.sample b/config.sample index 3049c7f..1286a0d 100644 --- a/config.sample +++ b/config.sample @@ -7,6 +7,9 @@ ; Optional, restrict scanner to these extensions ; scanner_extensions = mp3 ogg +; Optional for develop, key for sign the session cookies +; secret_key = verydifficultkeyword + [webapp] ; Optional cache directory cache_dir = /var/supysonic/cache @@ -24,7 +27,7 @@ log_level = INFO [lastfm] ; API and secret key to enable scrobbling. http://www.last.fm/api/accounts -; api_key = +; api_key = ; secret = [transcoding] diff --git a/setup.py b/setup.py index 54e86fa..a154188 100755 --- a/setup.py +++ b/setup.py @@ -1,36 +1,38 @@ -#!/usr/bin/python -# encoding: utf-8 +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. -from distutils.core import setup +import supysonic as project -setup(name='supysonic', - description='Python implementation of the Subsonic server API.', - keywords='subsonic music', - version='0.1', - url='https://github.com/spl0k/supysonic', - license='AGPLv3', - author='Alban Féron', - author_email='alban.feron@gmail.com', - long_description=""" - Supysonic is a Python implementation of the Subsonic server API. +from setuptools import setup +from setuptools import find_packages +from pip.req import parse_requirements +from pip.download import PipSession - Current supported features are: - * browsing (by folders or tags) - * streaming of various audio file formats - * transcoding - * user or random playlists - * cover arts (cover.jpg files in the same folder as music files) - * starred tracks/albums and ratings - * Last.FM scrobbling - """, - packages=['supysonic', 'supysonic.api', 'supysonic.frontend', - 'supysonic.managers'], - scripts=['bin/supysonic-cli', 'bin/supysonic-watcher'], - package_data={'supysonic': [ - 'templates/*.html', - 'static/css/*', - 'static/fonts/*', - 'static/js/*' - ]} - ) +setup( + name=project.NAME, + version=project.VERSION, + description=project.DESCRIPTION, + keywords=project.KEYWORDS, + long_description=project.LONG_DESCRIPTION, + author=project.AUTHOR_NAME, + author_email=project.AUTHOR_EMAIL, + url=project.URL, + license=project.LICENSE, + packages=find_packages(), + install_requires=[str(x.req) for x in + parse_requirements('requirements.txt', session=PipSession())], + scripts=['bin/supysonic-cli', 'bin/supysonic-watcher'], + zip_safe=False, + include_package_data=True, + test_suite="tests.suite" + ) diff --git a/supysonic/__init__.py b/supysonic/__init__.py index b10767a..a2bab46 100644 --- a/supysonic/__init__.py +++ b/supysonic/__init__.py @@ -1,25 +1,28 @@ -# coding: utf-8 - +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# # This file is part of Supysonic. -# # Supysonic is a Python implementation of the Subsonic server API. -# Copyright (C) 2013-2017 Alban 'spl0k' Féron # -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import mimetypes - -def get_mime(ext): - return mimetypes.guess_type('dummy.' + ext, False)[0] or config.get('mimetypes', ext) or 'application/octet-stream' +# Distributed under terms of the GNU AGPLv3 license. +NAME = 'supysonic' +VERSION = '0.2' +DESCRIPTION = 'Python implementation of the Subsonic server API.' +KEYWORDS = 'subsonic music api' +AUTHOR_NAME = 'Alban Féron' +AUTHOR_EMAIL = 'alban.feron@gmail.com' +URL = 'https://github.com/spl0k/supysonic' +LICENSE = 'GNU AGPLv3' +LONG_DESCRIPTION = '''Supysonic is a Python implementation of the Subsonic server API. +Current supported features are: +* browsing (by folders or tags) +* streaming of various audio file formats +* transcoding +* user or random playlists +* cover arts (cover.jpg files in the same folder as music files) +* starred tracks/albums and ratings +* Last.FM scrobbling''' diff --git a/supysonic/api/media.py b/supysonic/api/media.py index f308377..efb59b1 100644 --- a/supysonic/api/media.py +++ b/supysonic/api/media.py @@ -70,7 +70,7 @@ def stream_media(): if format and format != 'raw' and format != src_suffix: dst_suffix = format - dst_mimetype = scanner.get_mime(dst_suffix) + dst_mimetype = config.get_mime(dst_suffix) if format != 'raw' and (dst_suffix != src_suffix or dst_bitrate != res.bitrate): transcoder = config.get('transcoding', 'transcoder_{}_{}'.format(src_suffix, dst_suffix)) diff --git a/supysonic/config.py b/supysonic/config.py index 193252d..01013e6 100644 --- a/supysonic/config.py +++ b/supysonic/config.py @@ -1,49 +1,71 @@ -# coding: utf-8 - +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# # This file is part of Supysonic. -# # Supysonic is a Python implementation of the Subsonic server API. -# Copyright (C) 2013 Alban 'spl0k' Féron # -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# Distributed under terms of the GNU AGPLv3 license. -import os, sys, tempfile, ConfigParser +from ConfigParser import ConfigParser, NoOptionError, NoSectionError + +import mimetypes +import os +import tempfile + +# Seek for standard locations +config_file = [ + 'supysonic.conf', + os.path.expanduser('~/.config/supysonic/supysonic.conf'), + os.path.expanduser('~/.supysonic'), + '/etc/supysonic' + ] + +config = ConfigParser({ 'cache_dir': os.path.join(tempfile.gettempdir(), 'supysonic') }) -config = ConfigParser.RawConfigParser({ 'cache_dir': os.path.join(tempfile.gettempdir(), 'supysonic') }) def check(): - try: - ret = config.read([ '/etc/supysonic', os.path.expanduser('~/.supysonic') ]) - except (ConfigParser.MissingSectionHeaderError, ConfigParser.ParsingError), e: - print >>sys.stderr, "Error while parsing the configuration file(s):\n%s" % str(e) - return False + """ + Checks the config file and mandatory fields + """ + try: + config.read(config_file) + except Exception as e: + err = 'Config file is corrupted.\n{0}'.format(e) + raise SystemExit(err) - if not ret: - print >>sys.stderr, "No configuration file found" - return False + try: + config.get('base', 'database_uri') + except (NoSectionError, NoOptionError): + raise SystemExit('No database URI set') - try: - config.get('base', 'database_uri') - except: - print >>sys.stderr, "No database URI set" - return False + return True - return True +def get(section, option): + """ + Returns a config option value from config file -def get(section, name): - try: - return config.get(section, name) - except: - return None + :param section: section where the option is stored + :param option: option name + :return: a config option value + :rtype: string + """ + try: + return config.get(section, option) + except (NoSectionError, NoOptionError): + return None +def get_mime(extension): + """ + Returns mimetype of an extension based on config file + + :param extension: extension string + :return: mimetype + :rtype: string + """ + guessed_mime = mimetypes.guess_type('dummy.' + extension, False)[0] + config_mime = get('mimetypes', extension) + default_mime = 'application/octet-stream' + return guessed_mime or config_mime or default_mime diff --git a/supysonic/db.py b/supysonic/db.py index 34f10e6..4daad71 100644 --- a/supysonic/db.py +++ b/supysonic/db.py @@ -27,7 +27,7 @@ from storm.variables import Variable import uuid, datetime, time import os.path -from supysonic import get_mime +from supysonic import config def now(): return datetime.datetime.now().replace(microsecond = 0) @@ -213,7 +213,7 @@ class Track(object): if prefs and prefs.format and prefs.format != self.suffix(): info['transcodedSuffix'] = prefs.format - info['transcodedContentType'] = get_mime(prefs.format) + info['transcodedContentType'] = config.get_mime(prefs.format) return info diff --git a/supysonic/frontend/__init__.py b/supysonic/frontend/__init__.py index d5e5778..c004021 100644 --- a/supysonic/frontend/__init__.py +++ b/supysonic/frontend/__init__.py @@ -1,22 +1,13 @@ -# coding: utf-8 - +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# # This file is part of Supysonic. -# # Supysonic is a Python implementation of the Subsonic server API. -# Copyright (C) 2014 Alban 'spl0k' Féron # -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# Distributed under terms of the GNU AGPLv3 license. from flask import session from supysonic.web import app, store @@ -27,34 +18,36 @@ app.add_template_filter(str) @app.before_request def login_check(): - if request.path.startswith('/rest/'): - return + if request.path.startswith('/rest/'): + return - if request.path.startswith('/static/'): - return + if request.path.startswith('/static/'): + return - if request.endpoint != 'login': - should_login = False - if not session.get('userid'): - should_login = True - elif UserManager.get(store, session.get('userid'))[0] != UserManager.SUCCESS: - session.clear() - should_login = True + if request.endpoint != 'login': + should_login = False + if not session.get('userid'): + should_login = True + elif UserManager.get(store, session.get('userid'))[0] != UserManager.SUCCESS: + session.clear() + should_login = True + elif UserManager.get(store, session.get('userid'))[1].name != session.get('username'): + session.clear() + should_login = True - if should_login: - flash('Please login') - return redirect(url_for('login', returnUrl = request.script_root + request.url[len(request.url_root)-1:])) + if should_login: + flash('Please login') + return redirect(url_for('login', returnUrl = request.script_root + request.url[len(request.url_root)-1:])) @app.route('/') def index(): - stats = { - 'artists': store.find(Artist).count(), - 'albums': store.find(Album).count(), - 'tracks': store.find(Track).count() - } - return render_template('home.html', stats = stats, admin = UserManager.get(store, session.get('userid'))[1].admin) + stats = { + 'artists': store.find(Artist).count(), + 'albums': store.find(Album).count(), + 'tracks': store.find(Track).count() + } + return render_template('home.html', stats = stats, admin = UserManager.get(store, session.get('userid'))[1].admin) from .user import * from .folder import * from .playlist import * - diff --git a/supysonic/managers/user.py b/supysonic/managers/user.py index 211dfa1..1840520 100644 --- a/supysonic/managers/user.py +++ b/supysonic/managers/user.py @@ -1,24 +1,18 @@ -# coding: utf-8 - +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# # This file is part of Supysonic. -# # Supysonic is a Python implementation of the Subsonic server API. -# Copyright (C) 2013 Alban 'spl0k' Féron # -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# Distributed under terms of the GNU AGPLv3 license. -import string, random, hashlib +import binascii +import string +import random +import hashlib import uuid from supysonic.db import User, ChatMessage, Playlist @@ -26,140 +20,137 @@ from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrac from supysonic.db import RatingFolder, RatingTrack class UserManager: - SUCCESS = 0 - INVALID_ID = 1 - NO_SUCH_USER = 2 - NAME_EXISTS = 3 - WRONG_PASS = 4 + SUCCESS = 0 + INVALID_ID = 1 + NO_SUCH_USER = 2 + NAME_EXISTS = 3 + WRONG_PASS = 4 - @staticmethod - def get(store, uid): - if type(uid) in (str, unicode): - try: - uid = uuid.UUID(uid) - except: - return UserManager.INVALID_ID, None - elif type(uid) is uuid.UUID: - pass - else: - return UserManager.INVALID_ID, None + @staticmethod + def get(store, uid): + if type(uid) in (str, unicode): + try: + uid = uuid.UUID(uid) + except: + return UserManager.INVALID_ID, None + elif type(uid) is uuid.UUID: + pass + else: + return UserManager.INVALID_ID, None - user = store.get(User, uid) - if user is None: - return UserManager.NO_SUCH_USER, None + user = store.get(User, uid) + if user is None: + return UserManager.NO_SUCH_USER, None - return UserManager.SUCCESS, user + return UserManager.SUCCESS, user - @staticmethod - def add(store, name, password, mail, admin): - if store.find(User, User.name == name).one(): - return UserManager.NAME_EXISTS + @staticmethod + def add(store, name, password, mail, admin): + if store.find(User, User.name == name).one(): + return UserManager.NAME_EXISTS - password = UserManager.__decode_password(password) - crypt, salt = UserManager.__encrypt_password(password) + password = UserManager.__decode_password(password) + crypt, salt = UserManager.__encrypt_password(password) - user = User() - user.name = name - user.mail = mail - user.password = crypt - user.salt = salt - user.admin = admin + user = User() + user.name = name + user.mail = mail + user.password = crypt + user.salt = salt + user.admin = admin - store.add(user) - store.commit() + store.add(user) + store.commit() - return UserManager.SUCCESS + return UserManager.SUCCESS - @staticmethod - def delete(store, uid): - status, user = UserManager.get(store, uid) - if status != UserManager.SUCCESS: - return status + @staticmethod + def delete(store, uid): + status, user = UserManager.get(store, uid) + if status != UserManager.SUCCESS: + return status - store.find(StarredFolder, StarredFolder.user_id == uid).remove() - store.find(StarredArtist, StarredArtist.user_id == uid).remove() - store.find(StarredAlbum, StarredAlbum.user_id == uid).remove() - store.find(StarredTrack, StarredTrack.user_id == uid).remove() - store.find(RatingFolder, RatingFolder.user_id == uid).remove() - store.find(RatingTrack, RatingTrack.user_id == uid).remove() - store.find(ChatMessage, ChatMessage.user_id == uid).remove() - for playlist in store.find(Playlist, Playlist.user_id == uid): - playlist.tracks.clear() - store.remove(playlist) + store.find(StarredFolder, StarredFolder.user_id == uid).remove() + store.find(StarredArtist, StarredArtist.user_id == uid).remove() + store.find(StarredAlbum, StarredAlbum.user_id == uid).remove() + store.find(StarredTrack, StarredTrack.user_id == uid).remove() + store.find(RatingFolder, RatingFolder.user_id == uid).remove() + store.find(RatingTrack, RatingTrack.user_id == uid).remove() + store.find(ChatMessage, ChatMessage.user_id == uid).remove() + for playlist in store.find(Playlist, Playlist.user_id == uid): + playlist.tracks.clear() + store.remove(playlist) - store.remove(user) - store.commit() + store.remove(user) + store.commit() - return UserManager.SUCCESS + return UserManager.SUCCESS - @staticmethod - def try_auth(store, name, password): - password = UserManager.__decode_password(password) - user = store.find(User, User.name == name).one() - if not user: - return UserManager.NO_SUCH_USER, None - elif UserManager.__encrypt_password(password, user.salt)[0] != user.password: - return UserManager.WRONG_PASS, None - else: - return UserManager.SUCCESS, user + @staticmethod + def try_auth(store, name, password): + password = UserManager.__decode_password(password) + user = store.find(User, User.name == name).one() + if not user: + return UserManager.NO_SUCH_USER, None + elif UserManager.__encrypt_password(password, user.salt)[0] != user.password: + return UserManager.WRONG_PASS, None + else: + return UserManager.SUCCESS, user - @staticmethod - def change_password(store, uid, old_pass, new_pass): - status, user = UserManager.get(store, uid) - if status != UserManager.SUCCESS: - return status + @staticmethod + def change_password(store, uid, old_pass, new_pass): + status, user = UserManager.get(store, uid) + if status != UserManager.SUCCESS: + return status - old_pass = UserManager.__decode_password(old_pass) - new_pass = UserManager.__decode_password(new_pass) + old_pass = UserManager.__decode_password(old_pass) + new_pass = UserManager.__decode_password(new_pass) - if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password: - return UserManager.WRONG_PASS + if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password: + return UserManager.WRONG_PASS - user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] - store.commit() - return UserManager.SUCCESS + user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] + store.commit() + return UserManager.SUCCESS - @staticmethod - def change_password2(store, name, new_pass): - user = store.find(User, User.name == name).one() - if not user: - return UserManager.NO_SUCH_USER + @staticmethod + def change_password2(store, name, new_pass): + user = store.find(User, User.name == name).one() + if not user: + return UserManager.NO_SUCH_USER - new_pass = UserManager.__decode_password(new_pass) - user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] - store.commit() - return UserManager.SUCCESS + new_pass = UserManager.__decode_password(new_pass) + user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] + store.commit() + return UserManager.SUCCESS - @staticmethod - def error_str(err): - if err == UserManager.SUCCESS: - return 'No error' - elif err == UserManager.INVALID_ID: - return 'Invalid user id' - elif err == UserManager.NO_SUCH_USER: - return 'No such user' - elif err == UserManager.NAME_EXISTS: - return 'There is already a user with that name' - elif err == UserManager.WRONG_PASS: - return 'Wrong password' - else: - return 'Unkown error' + @staticmethod + def error_str(err): + if err == UserManager.SUCCESS: + return 'No error' + elif err == UserManager.INVALID_ID: + return 'Invalid user id' + elif err == UserManager.NO_SUCH_USER: + return 'No such user' + elif err == UserManager.NAME_EXISTS: + return 'There is already a user with that name' + elif err == UserManager.WRONG_PASS: + return 'Wrong password' + else: + return 'Unkown error' - @staticmethod - def __encrypt_password(password, salt = None): - if salt is None: - salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6)) - return hashlib.sha1(salt + password).hexdigest(), salt + @staticmethod + def __encrypt_password(password, salt = None): + if salt is None: + salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6)) + return hashlib.sha1(salt.encode('utf-8') + password.encode('utf-8')).hexdigest(), salt - @staticmethod - def __decode_password(password): - if not password.startswith('enc:'): - return password - - enc = password[4:] - ret = '' - while enc: - ret = ret + chr(int(enc[:2], 16)) - enc = enc[2:] - return ret + @staticmethod + def __decode_password(password): + if not password.startswith('enc:'): + return password + try: + return binascii.unhexlify(password[4:]).decode('utf-8') + except: + return password diff --git a/supysonic/scanner.py b/supysonic/scanner.py index 3a3683a..30d85f3 100644 --- a/supysonic/scanner.py +++ b/supysonic/scanner.py @@ -25,7 +25,7 @@ import mutagen from storm.expr import ComparableExpr, compile, Like from storm.exceptions import NotSupportedError -from supysonic import config, get_mime +from supysonic import config from supysonic.db import Folder, Artist, Album, Track, User, PlaylistTrack from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack from supysonic.db import RatingFolder, RatingTrack @@ -166,7 +166,7 @@ class Scanner: tr.duration = int(tag.info.length) tr.bitrate = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000 - tr.content_type = get_mime(os.path.splitext(path)[1][1:]) + tr.content_type = config.get_mime(os.path.splitext(path)[1][1:]) tr.last_modification = os.path.getmtime(path) tralbum = self.__find_album(albumartist, album) diff --git a/supysonic/web.py b/supysonic/web.py index 8c35aac..1260882 100644 --- a/supysonic/web.py +++ b/supysonic/web.py @@ -1,75 +1,76 @@ -# coding: utf-8 - +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# # This file is part of Supysonic. -# # Supysonic is a Python implementation of the Subsonic server API. -# Copyright (C) 2013 Alban 'spl0k' Féron # -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor # -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. -# -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# Distributed under terms of the GNU AGPLv3 license. -import os.path from flask import Flask, g +from os import makedirs, path from werkzeug.local import LocalProxy from supysonic import config from supysonic.db import get_store -def get_db_store(): - store = getattr(g, 'store', None) - if store: - return store - g.store = get_store(config.get('base', 'database_uri')) - return g.store +# Supysonic database open +def get_db(): + if not hasattr(g, 'database'): + g.database = get_store(config.get('base', 'database_uri')) + return g.database -store = LocalProxy(get_db_store) +# Supysonic database close +def close_db(error): + if hasattr(g, 'database'): + g.database.close() -def teardown_db(exception): - store = getattr(g, 'store', None) - if store: - store.close() +store = LocalProxy(get_db) def create_application(): - global app + global app - if not config.check(): - return None + # Check config for mandatory fields + config.check() - if not os.path.exists(config.get('webapp', 'cache_dir')): - os.makedirs(config.get('webapp', 'cache_dir')) + # Test for the cache directory + if not path.exists(config.get('webapp', 'cache_dir')): + os.makedirs(config.get('webapp', 'cache_dir')) - app = Flask(__name__) - app.secret_key = '?9huDM\\H' + # Flask! + app = Flask(__name__) - app.teardown_appcontext(teardown_db) + # Set a secret key for sessions + secret_key = config.get('base', 'secret_key') + # If secret key is not defined in config, set develop key + if secret_key is None: + app.secret_key = 'd3v3l0p' + else: + app.secret_key = secret_key - if config.get('webapp', 'log_file'): - import logging - from logging.handlers import TimedRotatingFileHandler - handler = TimedRotatingFileHandler(config.get('webapp', 'log_file'), when = 'midnight') - if config.get('webapp', 'log_level'): - mapping = { - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO, - 'WARNING': logging.WARNING, - 'ERROR': logging.ERROR, - 'CRTICAL': logging.CRITICAL - } - handler.setLevel(mapping.get(config.get('webapp', 'log_level').upper(), logging.NOTSET)) - app.logger.addHandler(handler) + # Close database connection on teardown + app.teardown_appcontext(close_db) - from supysonic import frontend - from supysonic import api + # Set loglevel + if config.get('webapp', 'log_file'): + import logging + from logging.handlers import TimedRotatingFileHandler + handler = TimedRotatingFileHandler(config.get('webapp', 'log_file'), when = 'midnight') + if config.get('webapp', 'log_level'): + mapping = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRTICAL': logging.CRITICAL + } + handler.setLevel(mapping.get(config.get('webapp', 'log_level').upper(), logging.NOTSET)) + app.logger.addHandler(handler) - return app + # Import app sections + from supysonic import frontend + from supysonic import api + return app diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..bd00f37 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. + +import unittest + +from .test_manager_folder import FolderManagerTestCase +from .test_manager_user import UserManagerTestCase +from .test_api import ApiTestCase +from .test_frontend import FrontendTestCase + +def suite(): + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(FolderManagerTestCase)) + suite.addTest(unittest.makeSuite(UserManagerTestCase)) + suite.addTest(unittest.makeSuite(ApiTestCase)) + suite.addTest(unittest.makeSuite(FrontendTestCase)) + return suite diff --git a/tests/test_api.py b/tests/test_api.py new file mode 100644 index 0000000..cffe97e --- /dev/null +++ b/tests/test_api.py @@ -0,0 +1,297 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. + +from supysonic import db +from supysonic.managers.user import UserManager + +import sys +import unittest +import uuid + +# Create an empty sqlite database in memory +store = db.get_store("sqlite:") +# Read schema from file +with open('schema/sqlite.sql') as sql: + schema = sql.read() +# Create tables on memory database +for command in schema.split(';'): + store.execute(command) +# Create some users +UserManager.add(store, 'alice', 'alice', 'test@example.com', True) +UserManager.add(store, 'bob', 'bob', 'bob@example.com', False) +UserManager.add(store, 'charlie', 'charlie', 'charlie@example.com', False) + +# Create a mockup of web +from flask import Flask +app = Flask(__name__) +class web(): + app = app + store = store +sys.modules['supysonic.web'] = web() + +# Import module and set app in test mode +import supysonic.api + +class ApiTestCase(unittest.TestCase): + def setUp(self): + self.app = app.test_client() + + def test_ping(self): + # GET non-existent user + rv = self.app.get('/rest/ping.view?u=null&p=null&c=test') + self.assertIn('status="failed"', rv.data) + self.assertIn('message="Unauthorized"', rv.data) + # POST non-existent user + rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test')) + self.assertIn('status="failed"', rv.data) + self.assertIn('message="Unauthorized"', rv.data) + # GET user request + rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test') + self.assertIn('status="ok"', rv.data) + # POST user request + rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('status="ok"', rv.data) + # GET user request with old enc: + rv = self.app.get('/rest/ping.view?u=alice&p=enc:616c696365&c=test') + self.assertIn('status="ok"', rv.data) + # POST user request with old enc: + rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='enc:616c696365', c='test')) + self.assertIn('status="ok"', rv.data) + # GET user request with bad password + rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test') + self.assertIn('status="failed"', rv.data) + self.assertIn('message="Unauthorized"', rv.data) + # POST user request with bad password + rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test')) + self.assertIn('status="failed"', rv.data) + self.assertIn('message="Unauthorized"', rv.data) + + def test_ping_in_jsonp(self): + # If ping in jsonp works all other endpoints must work OK + # GET non-existent user + rv = self.app.get('/rest/ping.view?u=null&p=null&c=test&f=jsonp&callback=test') + self.assertIn('"status": "failed"', rv.data) + self.assertIn('"message": "Unauthorized"', rv.data) + # POST non-existent user + rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test', f='jsonp', callback='test')) + self.assertIn('"status": "failed"', rv.data) + self.assertIn('"message": "Unauthorized"', rv.data) + # GET user request + rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test&f=jsonp&callback=test') + self.assertIn('"status": "ok"', rv.data) + # POST user request + rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test', f='jsonp', callback='test')) + self.assertIn('"status": "ok"', rv.data) + # GET user request with bad password + rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test&f=jsonp&callback=test') + self.assertIn('"status": "failed"', rv.data) + self.assertIn('"message": "Unauthorized"', rv.data) + # POST user request with bad password + rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test', f='jsonp', callback='test')) + self.assertIn('"status": "failed"', rv.data) + self.assertIn('"message": "Unauthorized"', rv.data) + + def test_not_implemented(self): + # Access to not implemented endpoint + rv = self.app.get('/rest/not-implemented?u=alice&p=alice&c=test') + self.assertIn('message="Not implemented"', rv.data) + rv = self.app.post('/rest/not-implemented', data=dict(u='alice', p='alice', c='test')) + self.assertIn('message="Not implemented"', rv.data) + + def test_get_license(self): + # GET user request + rv = self.app.get('/rest/getLicense.view?u=alice&p=alice&c=test') + self.assertIn('status="ok"', rv.data) + self.assertIn('license valid="true"', rv.data) + # POST user request + rv = self.app.post('/rest/getLicense.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('status="ok"', rv.data) + self.assertIn('license valid="true"', rv.data) + + def test_get_user(self): + # GET missing username + rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test') + self.assertIn('message="Missing username"', rv.data) + # POST missing username + rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('message="Missing username"', rv.data) + # GET non-admin request for other user + rv = self.app.get('/rest/getUser.view?u=bob&p=bob&c=test&username=alice') + self.assertIn('message="Admin restricted"', rv.data) + # POST non-admin request for other user + rv = self.app.post('/rest/getUser.view', data=dict(u='bob', p='bob', c='test', username='alice')) + self.assertIn('message="Admin restricted"', rv.data) + # GET non-existent user + rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=null') + self.assertIn('message="Unknown user"', rv.data) + # POST non-existent user + rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='null')) + self.assertIn('message="Unknown user"', rv.data) + # GET admin request + rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=alice') + self.assertIn('adminRole="true"', rv.data) + self.assertIn('username="alice"', rv.data) + # POST admin request + rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='alice')) + self.assertIn('adminRole="true"', rv.data) + self.assertIn('username="alice"', rv.data) + # GET admin request for other user + rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=bob') + self.assertIn('username="bob"', rv.data) + self.assertIn('adminRole="false"', rv.data) + # POST admin request for other user + rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='bob')) + self.assertIn('username="bob"', rv.data) + self.assertIn('adminRole="false"', rv.data) + # GET non-admin request + rv = self.app.get('/rest/getUser.view?u=charlie&p=charlie&c=test&username=charlie') + self.assertIn('username="charlie"', rv.data) + self.assertIn('adminRole="false"', rv.data) + # POST non-admin request + rv = self.app.post('/rest/getUser.view', data=dict(u='charlie', p='charlie', c='test', username='charlie')) + self.assertIn('username="charlie"', rv.data) + self.assertIn('adminRole="false"', rv.data) + + def test_get_users(self): + # GET admin request + rv = self.app.get('/rest/getUsers.view?u=alice&p=alice&c=test') + self.assertIn('alice', rv.data) + self.assertIn('bob', rv.data) + self.assertIn('charlie', rv.data) + # POST admin request + rv = self.app.post('/rest/getUsers.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('alice', rv.data) + self.assertIn('bob', rv.data) + self.assertIn('charlie', rv.data) + # GET non-admin request + rv = self.app.get('/rest/getUsers.view?u=bob&p=bob&c=test') + self.assertIn('message="Admin restricted"', rv.data) + # POST non-admin request + rv = self.app.post('/rest/getUsers.view', data=dict(u='bob', p='bob', c='test')) + self.assertIn('message="Admin restricted"', rv.data) + + def test_create_user(self): + # GET non-admin request + rv = self.app.get('/rest/createUser.view?u=bob&p=bob&c=test') + self.assertIn('message="Admin restricted"', rv.data) + # POST non-admin request + rv = self.app.post('/rest/createUser.view', data=dict(u='bob', p='bob', c='test')) + self.assertIn('message="Admin restricted"', rv.data) + # GET incomplete request + rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test') + self.assertIn('message="Missing parameter"', rv.data) + # POST incomplete request + rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('message="Missing parameter"', rv.data) + # GET create user and test that user is created + rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test&username=david&password=david&email=david%40example.com&adminRole=True') + self.assertIn('status="ok"', rv.data) + rv = self.app.get('/rest/getUser.view?u=david&p=david&c=test&username=david') + self.assertIn('username="david"', rv.data) + self.assertIn('email="david@example.com"', rv.data) + self.assertIn('adminRole="true"', rv.data) + # POST create user and test that user is created + rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test', username='elanor', password='elanor', email='elanor@example.com', adminRole=True)) + self.assertIn('status="ok"', rv.data) + rv = self.app.post('/rest/getUser.view', data=dict(u='elanor', p='elanor', c='test', username='elanor')) + self.assertIn('username="elanor"', rv.data) + self.assertIn('email="elanor@example.com"', rv.data) + self.assertIn('adminRole="true"', rv.data) + # GET create duplicate + rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test&username=david&password=david&email=david%40example.com&adminRole=True') + self.assertIn('message="There is already a user with that username"', rv.data) + # POST create duplicate + rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test', username='elanor', password='elanor', email='elanor@example.com', adminRole=True)) + self.assertIn('message="There is already a user with that username"', rv.data) + + def test_delete_user(self): + # GET non-admin request + rv = self.app.get('/rest/deleteUser.view?u=bob&p=bob&c=test') + self.assertIn('message="Admin restricted"', rv.data) + # POST non-admin request + rv = self.app.post('/rest/deleteUser.view', data=dict(u='bob', p='bob', c='test')) + self.assertIn('message="Admin restricted"', rv.data) + # GET incomplete request + rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test') + self.assertIn('message="Unknown user"', rv.data) + # POST incomplete request + rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('message="Unknown user"', rv.data) + # GET delete non-existent user + rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test&username=nonexistent') + self.assertIn('message="Unknown user"', rv.data) + # POST delete non-existent user + rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test', username='nonexistent')) + self.assertIn('message="Unknown user"', rv.data) + # GET delete existent user + rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test&username=elanor') + self.assertIn('status="ok"', rv.data) + rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=elanor') + self.assertIn('message="Unknown user"', rv.data) + # POST delete existent user + rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test', username='david')) + self.assertIn('status="ok"', rv.data) + rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='david')) + self.assertIn('message="Unknown user"', rv.data) + + def test_change_password(self): + # GET incomplete request + rv = self.app.get('/rest/changePassword.view?u=alice&p=alice&c=test') + self.assertIn('message="Missing parameter"', rv.data) + # POST incomplete request + rv = self.app.post('/rest/changePassword.view', data=dict(u='alice', p='alice', c='test')) + self.assertIn('message="Missing parameter"', rv.data) + # GET non-admin change own password + rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=newpassword') + self.assertIn('status="ok"', rv.data) + # POST non-admin change own password + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='newpassword', c='test', username='bob', password='bob')) + self.assertIn('status="ok"', rv.data) + # GET non-admin change other user password + rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=alice&password=newpassword') + self.assertIn('message="Admin restricted"', rv.data) + # POST non-admin change other user password + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='bob', c='test', username='alice', password='newpassword')) + self.assertIn('message="Admin restricted"', rv.data) + # GET admin change other user password + rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=newpassword') + self.assertIn('status="ok"', rv.data) + # POST admin change other user password + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='newpassword', c='test', username='bob', password='bob')) + self.assertIn('status="ok"', rv.data) + # GET change non-existent user password + rv = self.app.get('/rest/changePassword.view?u=alice&p=alice&c=test&username=nonexistent&password=nonexistent') + self.assertIn('message="No such user"', rv.data) + # POST change non-existent user password + rv = self.app.post('/rest/changePassword.view', data=dict(u='alice', p='alice', c='test', username='nonexistent', password='nonexistent')) + self.assertIn('message="No such user"', rv.data) + # GET non-admin change own password using extended utf-8 characters + rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=новыйпароль') + self.assertIn('status="ok"', rv.data) + # POST non-admin change own password using extended utf-8 characters + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='новыйпароль', c='test', username='bob', password='bob')) + self.assertIn('status="ok"', rv.data) + # GET non-admin change own password using extended utf-8 characters with old enc: + rv = self.app.get('/rest/changePassword.view?u=bob&p=enc:626f62&c=test&username=bob&password=новыйпароль') + self.assertIn('status="ok"', rv.data) + # POST non-admin change own password using extended utf-8 characters with old enc: + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='enc:d0bdd0bed0b2d18bd0b9d0bfd0b0d180d0bed0bbd18c', c='test', username='bob', password='bob')) + self.assertIn('status="ok"', rv.data) + # GET non-admin change own password using enc: in password + rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=enc:test') + self.assertIn('status="ok"', rv.data) + # POST non-admin change own password using enc: in password + rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='enc:test', c='test', username='bob', password='bob')) + self.assertIn('status="ok"', rv.data) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_frontend.py b/tests/test_frontend.py new file mode 100644 index 0000000..f7d8d26 --- /dev/null +++ b/tests/test_frontend.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. + +from supysonic import db +from supysonic.managers.user import UserManager + +import sys +import unittest +import uuid + +# Create an empty sqlite database in memory +store = db.get_store("sqlite:") +# Read schema from file +with open('schema/sqlite.sql') as sql: + schema = sql.read() +# Create tables on memory database +for command in schema.split(';'): + store.execute(command) +# Create some users +UserManager.add(store, 'alice', 'alice', 'test@example.com', True) +UserManager.add(store, 'bob', 'bob', 'bob@example.com', False) +UserManager.add(store, 'charlie', 'charlie', 'charlie@example.com', False) + +# Create a mockup of web +from flask import Flask +app = Flask(__name__, template_folder='../supysonic/templates') +class web(): + app = app + store = store +sys.modules['supysonic.web'] = web() + +# Import module and set app in test mode +import supysonic.frontend +app.secret_key = 'test-suite' + +class FrontendTestCase(unittest.TestCase): + def setUp(self): + self.app = app.test_client() + + def test_unauthorized_request(self): + # Unauthorized request + rv = self.app.get('/', follow_redirects=True) + self.assertIn('Please login', rv.data) + + def test_login_with_bad_data(self): + # Login with not blank user or password + rv = self.app.post('/user/login', data=dict(name='', password=''), follow_redirects=True) + self.assertIn('Missing user name', rv.data) + self.assertIn('Missing password', rv.data) + # Login with not valid user or password + rv = self.app.post('/user/login', data=dict(user='nonexistent', password='nonexistent'), follow_redirects=True) + self.assertIn('No such user', rv.data) + rv = self.app.post('/user/login', data=dict(user='alice', password='badpassword'), follow_redirects=True) + self.assertIn('Wrong password', rv.data) + + def test_login_admin(self): + # Login with a valid admin user + rv = self.app.post('/user/login', data=dict(user='alice', password='alice'), follow_redirects=True) + self.assertIn('Logged in', rv.data) + self.assertIn('Users', rv.data) + self.assertIn('Folders', rv.data) + + def test_login_non_admin(self): + # Login with a valid non-admin user + rv = self.app.post('/user/login', data=dict(user='bob', password='bob'), follow_redirects=True) + self.assertIn('Logged in', rv.data) + # Non-admin user cannot acces to users and folders + self.assertNotIn('Users', rv.data) + self.assertNotIn('Folders', rv.data) + + def test_root_with_valid_session(self): + # Root with valid session + with self.app.session_transaction() as sess: + sess['userid'] = store.find(db.User, db.User.name == 'alice').one().id + sess['username'] = 'alice' + rv = self.app.get('/', follow_redirects=True) + self.assertIn('alice', rv.data) + self.assertIn('Log out', rv.data) + self.assertIn('There\'s nothing much to see here.', rv.data) + + def test_root_with_non_valid_session(self): + # Root with a no-valid session + with self.app.session_transaction() as sess: + sess['userid'] = uuid.uuid4() + sess['username'] = 'alice' + rv = self.app.get('/', follow_redirects=True) + self.assertIn('Please login', rv.data) + # Root with a no-valid user + with self.app.session_transaction() as sess: + sess['userid'] = store.find(db.User, db.User.name == 'alice').one().id + sess['username'] = 'nonexistent' + rv = self.app.get('/', follow_redirects=True) + self.assertIn('Please login', rv.data) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_manager_folder.py b/tests/test_manager_folder.py new file mode 100644 index 0000000..9cbcea3 --- /dev/null +++ b/tests/test_manager_folder.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. + +from supysonic import db +from supysonic.managers.folder import FolderManager + +import os +import shutil +import tempfile +import unittest +import uuid + +class FolderManagerTestCase(unittest.TestCase): + def setUp(self): + # Create an empty sqlite database in memory + self.store = db.get_store("sqlite:") + # Read schema from file + with open('schema/sqlite.sql') as sql: + schema = sql.read() + # Create tables on memory database + for command in schema.split(';'): + self.store.execute(command) + # Create some temporary directories + self.media_dir = tempfile.mkdtemp() + self.music_dir = tempfile.mkdtemp() + # Add test folders + self.assertEqual(FolderManager.add(self.store, 'media', self.media_dir), FolderManager.SUCCESS) + self.assertEqual(FolderManager.add(self.store, 'music', self.music_dir), FolderManager.SUCCESS) + folder = db.Folder() + folder.root = False + folder.name = 'non-root' + folder.path = os.path.join(self.music_dir, 'subfolder') + self.store.add(folder) + self.store.commit() + + def tearDown(self): + shutil.rmtree(self.media_dir) + shutil.rmtree(self.music_dir) + + def test_get_folder(self): + # Get existing folders + for name in ['media', 'music']: + folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one() + self.assertEqual(FolderManager.get(self.store, folder.id), (FolderManager.SUCCESS, folder)) + # Get with invalid UUID + self.assertEqual(FolderManager.get(self.store, 'invalid-uuid'), (FolderManager.INVALID_ID, None)) + # Non-existent folder + self.assertEqual(FolderManager.get(self.store, uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None)) + + def test_add_folder(self): + # Create duplicate + self.assertEqual(FolderManager.add(self.store,'media', self.media_dir), FolderManager.NAME_EXISTS) + # Duplicate path + self.assertEqual(FolderManager.add(self.store,'new-folder', self.media_dir), FolderManager.PATH_EXISTS) + # Invalid path + self.assertEqual(FolderManager.add(self.store,'invalid-path', os.path.abspath('/this/not/is/valid')), FolderManager.INVALID_PATH) + # Subfolder of already added path + os.mkdir(os.path.join(self.media_dir, 'subfolder')) + self.assertEqual(FolderManager.add(self.store,'subfolder', os.path.join(self.media_dir, 'subfolder')), FolderManager.PATH_EXISTS) + + def test_delete_folder(self): + # Delete existing folders + for name in ['media', 'music']: + folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one() + self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.SUCCESS) + # Delete invalid UUID + self.assertEqual(FolderManager.delete(self.store, 'invalid-uuid'), FolderManager.INVALID_ID) + # Delete non-existent folder + self.assertEqual(FolderManager.delete(self.store, uuid.uuid4()), FolderManager.NO_SUCH_FOLDER) + # Delete non-root folder + folder = self.store.find(db.Folder, db.Folder.name == 'non-root').one() + self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.NO_SUCH_FOLDER) + + def test_delete_by_name(self): + # Delete existing folders + for name in ['media', 'music']: + self.assertEqual(FolderManager.delete_by_name(self.store, name), FolderManager.SUCCESS) + # Delete non-existent folder + self.assertEqual(FolderManager.delete_by_name(self.store, 'null'), FolderManager.NO_SUCH_FOLDER) + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_manager_user.py b/tests/test_manager_user.py new file mode 100644 index 0000000..3194ddd --- /dev/null +++ b/tests/test_manager_user.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# vim:fenc=utf-8 +# +# This file is part of Supysonic. +# Supysonic is a Python implementation of the Subsonic server API. +# +# Copyright (C) 2013-2017 Alban 'spl0k' Féron +# 2017 Óscar García Amor +# +# Distributed under terms of the GNU AGPLv3 license. + +from supysonic import db +from supysonic.managers.user import UserManager + +import unittest +import uuid + +class UserManagerTestCase(unittest.TestCase): + def setUp(self): + # Create an empty sqlite database in memory + self.store = db.get_store("sqlite:") + # Read schema from file + with open('schema/sqlite.sql') as sql: + schema = sql.read() + # Create tables on memory database + for command in schema.split(';'): + self.store.execute(command) + # Create some users + self.assertEqual(UserManager.add(self.store, 'alice', 'alice', 'test@example.com', True), UserManager.SUCCESS) + self.assertEqual(UserManager.add(self.store, 'bob', 'bob', 'bob@example.com', False), UserManager.SUCCESS) + self.assertEqual(UserManager.add(self.store, 'charlie', 'charlie', 'charlie@example.com', False), UserManager.SUCCESS) + + def test_encrypt_password(self): + self.assertEqual(UserManager._UserManager__encrypt_password('password','salt'), ('59b3e8d637cf97edbe2384cf59cb7453dfe30789', 'salt')) + self.assertEqual(UserManager._UserManager__encrypt_password('pass-word','pepper'), ('d68c95a91ed7773aa57c7c044d2309a5bf1da2e7', 'pepper')) + + def test_get_user(self): + # Get existing users + for name in ['alice', 'bob', 'charlie']: + user = self.store.find(db.User, db.User.name == name).one() + self.assertEqual(UserManager.get(self.store, user.id), (UserManager.SUCCESS, user)) + # Get with invalid UUID + self.assertEqual(UserManager.get(self.store, 'invalid-uuid'), (UserManager.INVALID_ID, None)) + # Non-existent user + self.assertEqual(UserManager.get(self.store, uuid.uuid4()), (UserManager.NO_SUCH_USER, None)) + + def test_add_user(self): + # Create duplicate + self.assertEqual(UserManager.add(self.store, 'alice', 'alice', 'test@example.com', True), UserManager.NAME_EXISTS) + + def test_delete_user(self): + # Delete existing users + for name in ['alice', 'bob', 'charlie']: + user = self.store.find(db.User, db.User.name == name).one() + self.assertEqual(UserManager.delete(self.store, user.id), UserManager.SUCCESS) + # Delete invalid UUID + self.assertEqual(UserManager.delete(self.store, 'invalid-uuid'), UserManager.INVALID_ID) + # Delete non-existent user + self.assertEqual(UserManager.delete(self.store, uuid.uuid4()), UserManager.NO_SUCH_USER) + + def test_try_auth(self): + # Test authentication + for name in ['alice', 'bob', 'charlie']: + user = self.store.find(db.User, db.User.name == name).one() + self.assertEqual(UserManager.try_auth(self.store, name, name), (UserManager.SUCCESS, user)) + # Wrong password + self.assertEqual(UserManager.try_auth(self.store, name, 'bad'), (UserManager.WRONG_PASS, None)) + # Non-existent user + self.assertEqual(UserManager.try_auth(self.store, 'null', 'null'), (UserManager.NO_SUCH_USER, None)) + + def test_change_password(self): + # With existing users + for name in ['alice', 'bob', 'charlie']: + user = self.store.find(db.User, db.User.name == name).one() + # God password + self.assertEqual(UserManager.change_password(self.store, user.id, name, 'newpass'), UserManager.SUCCESS) + self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user)) + # Wrong password + self.assertEqual(UserManager.change_password(self.store, user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS) + # With invalid UUID + self.assertEqual(UserManager.change_password(self.store, 'invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID) + # Non-existent user + self.assertEqual(UserManager.change_password(self.store, uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER) + + def test_change_password2(self): + # With existing users + for name in ['alice', 'bob', 'charlie']: + self.assertEqual(UserManager.change_password2(self.store, name, 'newpass'), UserManager.SUCCESS) + user = self.store.find(db.User, db.User.name == name).one() + self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user)) + # Non-existent user + self.assertEqual(UserManager.change_password2(self.store, 'null', 'newpass'), UserManager.NO_SUCH_USER) + +if __name__ == '__main__': + unittest.main()