1
0
mirror of https://github.com/spl0k/supysonic.git synced 2025-01-09 01:36:19 +00:00

Merge remote-tracking branch 'ogarcia/config'

This commit is contained in:
spl0k 2017-08-08 19:04:16 +02:00
commit 7f736c240b
21 changed files with 1017 additions and 319 deletions

70
.gitignore vendored
View File

@ -1,6 +1,68 @@
*.pyc # ---> Python
.*.sw[a-z] # Byte-compiled / optimized / DLL files
*~ __pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/ build/
develop-eggs/
dist/ dist/
MANIFEST downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# ---> Vim
[._]*.s[a-w][a-z]
[._]s[a-w][a-z]
*.un~
Session.vim
.netrwhist
*~

5
.travis.yml Normal file
View File

@ -0,0 +1,5 @@
language: python
python:
- "2.7"
install: "python setup.py install"
script: "python setup.py test"

View File

@ -1,3 +1,5 @@
include cgi-bin/* include cgi-bin/*
include config.sample include config.sample
include README.md include README.md
recursive-include supysonic/templates *
recursive-include supysonic/static *

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python
# coding: utf-8 # coding: utf-8
# This file is part of Supysonic. # This file is part of Supysonic.

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python
# coding: utf-8 # coding: utf-8
# This file is part of Supysonic. # This file is part of Supysonic.

View File

@ -1,4 +1,4 @@
#!/usr/bin/python #!/usr/bin/env python
# coding: utf-8 # coding: utf-8
# This file is part of Supysonic. # This file is part of Supysonic.

View File

@ -7,6 +7,9 @@
; Optional, restrict scanner to these extensions ; Optional, restrict scanner to these extensions
; scanner_extensions = mp3 ogg ; scanner_extensions = mp3 ogg
; Optional for develop, key for sign the session cookies
; secret_key = verydifficultkeyword
[webapp] [webapp]
; Optional cache directory ; Optional cache directory
cache_dir = /var/supysonic/cache cache_dir = /var/supysonic/cache

View File

@ -1,36 +1,38 @@
#!/usr/bin/python #!/usr/bin/env python
# encoding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
from distutils.core import setup import supysonic as project
setup(name='supysonic', from setuptools import setup
description='Python implementation of the Subsonic server API.', from setuptools import find_packages
keywords='subsonic music', from pip.req import parse_requirements
version='0.1', from pip.download import PipSession
url='https://github.com/spl0k/supysonic',
license='AGPLv3',
author='Alban Féron',
author_email='alban.feron@gmail.com',
long_description="""
Supysonic is a Python implementation of the Subsonic server API.
Current supported features are:
* browsing (by folders or tags) setup(
* streaming of various audio file formats name=project.NAME,
* transcoding version=project.VERSION,
* user or random playlists description=project.DESCRIPTION,
* cover arts (cover.jpg files in the same folder as music files) keywords=project.KEYWORDS,
* starred tracks/albums and ratings long_description=project.LONG_DESCRIPTION,
* Last.FM scrobbling author=project.AUTHOR_NAME,
""", author_email=project.AUTHOR_EMAIL,
packages=['supysonic', 'supysonic.api', 'supysonic.frontend', url=project.URL,
'supysonic.managers'], license=project.LICENSE,
scripts=['bin/supysonic-cli', 'bin/supysonic-watcher'], packages=find_packages(),
package_data={'supysonic': [ install_requires=[str(x.req) for x in
'templates/*.html', parse_requirements('requirements.txt', session=PipSession())],
'static/css/*', scripts=['bin/supysonic-cli', 'bin/supysonic-watcher'],
'static/fonts/*', zip_safe=False,
'static/js/*' include_package_data=True,
]} test_suite="tests.suite"
) )

View File

@ -1,25 +1,28 @@
# coding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic. # This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# it under the terms of the GNU Affero General Public License as published by # 2017 Óscar García Amor
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # Distributed under terms of the GNU AGPLv3 license.
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import mimetypes
def get_mime(ext):
return mimetypes.guess_type('dummy.' + ext, False)[0] or config.get('mimetypes', ext) or 'application/octet-stream'
NAME = 'supysonic'
VERSION = '0.2'
DESCRIPTION = 'Python implementation of the Subsonic server API.'
KEYWORDS = 'subsonic music api'
AUTHOR_NAME = 'Alban Féron'
AUTHOR_EMAIL = 'alban.feron@gmail.com'
URL = 'https://github.com/spl0k/supysonic'
LICENSE = 'GNU AGPLv3'
LONG_DESCRIPTION = '''Supysonic is a Python implementation of the Subsonic server API.
Current supported features are:
* browsing (by folders or tags)
* streaming of various audio file formats
* transcoding
* user or random playlists
* cover arts (cover.jpg files in the same folder as music files)
* starred tracks/albums and ratings
* Last.FM scrobbling'''

View File

@ -70,7 +70,7 @@ def stream_media():
if format and format != 'raw' and format != src_suffix: if format and format != 'raw' and format != src_suffix:
dst_suffix = format dst_suffix = format
dst_mimetype = scanner.get_mime(dst_suffix) dst_mimetype = config.get_mime(dst_suffix)
if format != 'raw' and (dst_suffix != src_suffix or dst_bitrate != res.bitrate): if format != 'raw' and (dst_suffix != src_suffix or dst_bitrate != res.bitrate):
transcoder = config.get('transcoding', 'transcoder_{}_{}'.format(src_suffix, dst_suffix)) transcoder = config.get('transcoding', 'transcoder_{}_{}'.format(src_suffix, dst_suffix))

View File

@ -1,49 +1,71 @@
# coding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic. # This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# it under the terms of the GNU Affero General Public License as published by # 2017 Óscar García Amor
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # Distributed under terms of the GNU AGPLv3 license.
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os, sys, tempfile, ConfigParser from ConfigParser import ConfigParser, NoOptionError, NoSectionError
import mimetypes
import os
import tempfile
# Seek for standard locations
config_file = [
'supysonic.conf',
os.path.expanduser('~/.config/supysonic/supysonic.conf'),
os.path.expanduser('~/.supysonic'),
'/etc/supysonic'
]
config = ConfigParser({ 'cache_dir': os.path.join(tempfile.gettempdir(), 'supysonic') })
config = ConfigParser.RawConfigParser({ 'cache_dir': os.path.join(tempfile.gettempdir(), 'supysonic') })
def check(): def check():
try: """
ret = config.read([ '/etc/supysonic', os.path.expanduser('~/.supysonic') ]) Checks the config file and mandatory fields
except (ConfigParser.MissingSectionHeaderError, ConfigParser.ParsingError), e: """
print >>sys.stderr, "Error while parsing the configuration file(s):\n%s" % str(e) try:
return False config.read(config_file)
except Exception as e:
err = 'Config file is corrupted.\n{0}'.format(e)
raise SystemExit(err)
if not ret: try:
print >>sys.stderr, "No configuration file found" config.get('base', 'database_uri')
return False except (NoSectionError, NoOptionError):
raise SystemExit('No database URI set')
try: return True
config.get('base', 'database_uri')
except:
print >>sys.stderr, "No database URI set"
return False
return True def get(section, option):
"""
Returns a config option value from config file
def get(section, name): :param section: section where the option is stored
try: :param option: option name
return config.get(section, name) :return: a config option value
except: :rtype: string
return None """
try:
return config.get(section, option)
except (NoSectionError, NoOptionError):
return None
def get_mime(extension):
"""
Returns mimetype of an extension based on config file
:param extension: extension string
:return: mimetype
:rtype: string
"""
guessed_mime = mimetypes.guess_type('dummy.' + extension, False)[0]
config_mime = get('mimetypes', extension)
default_mime = 'application/octet-stream'
return guessed_mime or config_mime or default_mime

View File

@ -27,7 +27,7 @@ from storm.variables import Variable
import uuid, datetime, time import uuid, datetime, time
import os.path import os.path
from supysonic import get_mime from supysonic import config
def now(): def now():
return datetime.datetime.now().replace(microsecond = 0) return datetime.datetime.now().replace(microsecond = 0)
@ -213,7 +213,7 @@ class Track(object):
if prefs and prefs.format and prefs.format != self.suffix(): if prefs and prefs.format and prefs.format != self.suffix():
info['transcodedSuffix'] = prefs.format info['transcodedSuffix'] = prefs.format
info['transcodedContentType'] = get_mime(prefs.format) info['transcodedContentType'] = config.get_mime(prefs.format)
return info return info

View File

@ -1,22 +1,13 @@
# coding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic. # This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2014 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# it under the terms of the GNU Affero General Public License as published by # 2017 Óscar García Amor
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # Distributed under terms of the GNU AGPLv3 license.
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from flask import session from flask import session
from supysonic.web import app, store from supysonic.web import app, store
@ -27,34 +18,36 @@ app.add_template_filter(str)
@app.before_request @app.before_request
def login_check(): def login_check():
if request.path.startswith('/rest/'): if request.path.startswith('/rest/'):
return return
if request.path.startswith('/static/'): if request.path.startswith('/static/'):
return return
if request.endpoint != 'login': if request.endpoint != 'login':
should_login = False should_login = False
if not session.get('userid'): if not session.get('userid'):
should_login = True should_login = True
elif UserManager.get(store, session.get('userid'))[0] != UserManager.SUCCESS: elif UserManager.get(store, session.get('userid'))[0] != UserManager.SUCCESS:
session.clear() session.clear()
should_login = True should_login = True
elif UserManager.get(store, session.get('userid'))[1].name != session.get('username'):
session.clear()
should_login = True
if should_login: if should_login:
flash('Please login') flash('Please login')
return redirect(url_for('login', returnUrl = request.script_root + request.url[len(request.url_root)-1:])) return redirect(url_for('login', returnUrl = request.script_root + request.url[len(request.url_root)-1:]))
@app.route('/') @app.route('/')
def index(): def index():
stats = { stats = {
'artists': store.find(Artist).count(), 'artists': store.find(Artist).count(),
'albums': store.find(Album).count(), 'albums': store.find(Album).count(),
'tracks': store.find(Track).count() 'tracks': store.find(Track).count()
} }
return render_template('home.html', stats = stats, admin = UserManager.get(store, session.get('userid'))[1].admin) return render_template('home.html', stats = stats, admin = UserManager.get(store, session.get('userid'))[1].admin)
from .user import * from .user import *
from .folder import * from .folder import *
from .playlist import * from .playlist import *

View File

@ -1,24 +1,18 @@
# coding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic. # This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# it under the terms of the GNU Affero General Public License as published by # 2017 Óscar García Amor
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # Distributed under terms of the GNU AGPLv3 license.
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import string, random, hashlib import binascii
import string
import random
import hashlib
import uuid import uuid
from supysonic.db import User, ChatMessage, Playlist from supysonic.db import User, ChatMessage, Playlist
@ -26,140 +20,137 @@ from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrac
from supysonic.db import RatingFolder, RatingTrack from supysonic.db import RatingFolder, RatingTrack
class UserManager: class UserManager:
SUCCESS = 0 SUCCESS = 0
INVALID_ID = 1 INVALID_ID = 1
NO_SUCH_USER = 2 NO_SUCH_USER = 2
NAME_EXISTS = 3 NAME_EXISTS = 3
WRONG_PASS = 4 WRONG_PASS = 4
@staticmethod @staticmethod
def get(store, uid): def get(store, uid):
if type(uid) in (str, unicode): if type(uid) in (str, unicode):
try: try:
uid = uuid.UUID(uid) uid = uuid.UUID(uid)
except: except:
return UserManager.INVALID_ID, None return UserManager.INVALID_ID, None
elif type(uid) is uuid.UUID: elif type(uid) is uuid.UUID:
pass pass
else: else:
return UserManager.INVALID_ID, None return UserManager.INVALID_ID, None
user = store.get(User, uid) user = store.get(User, uid)
if user is None: if user is None:
return UserManager.NO_SUCH_USER, None return UserManager.NO_SUCH_USER, None
return UserManager.SUCCESS, user return UserManager.SUCCESS, user
@staticmethod @staticmethod
def add(store, name, password, mail, admin): def add(store, name, password, mail, admin):
if store.find(User, User.name == name).one(): if store.find(User, User.name == name).one():
return UserManager.NAME_EXISTS return UserManager.NAME_EXISTS
password = UserManager.__decode_password(password) password = UserManager.__decode_password(password)
crypt, salt = UserManager.__encrypt_password(password) crypt, salt = UserManager.__encrypt_password(password)
user = User() user = User()
user.name = name user.name = name
user.mail = mail user.mail = mail
user.password = crypt user.password = crypt
user.salt = salt user.salt = salt
user.admin = admin user.admin = admin
store.add(user) store.add(user)
store.commit() store.commit()
return UserManager.SUCCESS return UserManager.SUCCESS
@staticmethod @staticmethod
def delete(store, uid): def delete(store, uid):
status, user = UserManager.get(store, uid) status, user = UserManager.get(store, uid)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
return status return status
store.find(StarredFolder, StarredFolder.user_id == uid).remove() store.find(StarredFolder, StarredFolder.user_id == uid).remove()
store.find(StarredArtist, StarredArtist.user_id == uid).remove() store.find(StarredArtist, StarredArtist.user_id == uid).remove()
store.find(StarredAlbum, StarredAlbum.user_id == uid).remove() store.find(StarredAlbum, StarredAlbum.user_id == uid).remove()
store.find(StarredTrack, StarredTrack.user_id == uid).remove() store.find(StarredTrack, StarredTrack.user_id == uid).remove()
store.find(RatingFolder, RatingFolder.user_id == uid).remove() store.find(RatingFolder, RatingFolder.user_id == uid).remove()
store.find(RatingTrack, RatingTrack.user_id == uid).remove() store.find(RatingTrack, RatingTrack.user_id == uid).remove()
store.find(ChatMessage, ChatMessage.user_id == uid).remove() store.find(ChatMessage, ChatMessage.user_id == uid).remove()
for playlist in store.find(Playlist, Playlist.user_id == uid): for playlist in store.find(Playlist, Playlist.user_id == uid):
playlist.tracks.clear() playlist.tracks.clear()
store.remove(playlist) store.remove(playlist)
store.remove(user) store.remove(user)
store.commit() store.commit()
return UserManager.SUCCESS return UserManager.SUCCESS
@staticmethod @staticmethod
def try_auth(store, name, password): def try_auth(store, name, password):
password = UserManager.__decode_password(password) password = UserManager.__decode_password(password)
user = store.find(User, User.name == name).one() user = store.find(User, User.name == name).one()
if not user: if not user:
return UserManager.NO_SUCH_USER, None return UserManager.NO_SUCH_USER, None
elif UserManager.__encrypt_password(password, user.salt)[0] != user.password: elif UserManager.__encrypt_password(password, user.salt)[0] != user.password:
return UserManager.WRONG_PASS, None return UserManager.WRONG_PASS, None
else: else:
return UserManager.SUCCESS, user return UserManager.SUCCESS, user
@staticmethod @staticmethod
def change_password(store, uid, old_pass, new_pass): def change_password(store, uid, old_pass, new_pass):
status, user = UserManager.get(store, uid) status, user = UserManager.get(store, uid)
if status != UserManager.SUCCESS: if status != UserManager.SUCCESS:
return status return status
old_pass = UserManager.__decode_password(old_pass) old_pass = UserManager.__decode_password(old_pass)
new_pass = UserManager.__decode_password(new_pass) new_pass = UserManager.__decode_password(new_pass)
if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password: if UserManager.__encrypt_password(old_pass, user.salt)[0] != user.password:
return UserManager.WRONG_PASS return UserManager.WRONG_PASS
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
store.commit() store.commit()
return UserManager.SUCCESS return UserManager.SUCCESS
@staticmethod @staticmethod
def change_password2(store, name, new_pass): def change_password2(store, name, new_pass):
user = store.find(User, User.name == name).one() user = store.find(User, User.name == name).one()
if not user: if not user:
return UserManager.NO_SUCH_USER return UserManager.NO_SUCH_USER
new_pass = UserManager.__decode_password(new_pass) new_pass = UserManager.__decode_password(new_pass)
user.password = UserManager.__encrypt_password(new_pass, user.salt)[0] user.password = UserManager.__encrypt_password(new_pass, user.salt)[0]
store.commit() store.commit()
return UserManager.SUCCESS return UserManager.SUCCESS
@staticmethod @staticmethod
def error_str(err): def error_str(err):
if err == UserManager.SUCCESS: if err == UserManager.SUCCESS:
return 'No error' return 'No error'
elif err == UserManager.INVALID_ID: elif err == UserManager.INVALID_ID:
return 'Invalid user id' return 'Invalid user id'
elif err == UserManager.NO_SUCH_USER: elif err == UserManager.NO_SUCH_USER:
return 'No such user' return 'No such user'
elif err == UserManager.NAME_EXISTS: elif err == UserManager.NAME_EXISTS:
return 'There is already a user with that name' return 'There is already a user with that name'
elif err == UserManager.WRONG_PASS: elif err == UserManager.WRONG_PASS:
return 'Wrong password' return 'Wrong password'
else: else:
return 'Unkown error' return 'Unkown error'
@staticmethod @staticmethod
def __encrypt_password(password, salt = None): def __encrypt_password(password, salt = None):
if salt is None: if salt is None:
salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6)) salt = ''.join(random.choice(string.printable.strip()) for i in xrange(6))
return hashlib.sha1(salt + password).hexdigest(), salt return hashlib.sha1(salt.encode('utf-8') + password.encode('utf-8')).hexdigest(), salt
@staticmethod @staticmethod
def __decode_password(password): def __decode_password(password):
if not password.startswith('enc:'): if not password.startswith('enc:'):
return password return password
enc = password[4:]
ret = ''
while enc:
ret = ret + chr(int(enc[:2], 16))
enc = enc[2:]
return ret
try:
return binascii.unhexlify(password[4:]).decode('utf-8')
except:
return password

View File

@ -25,7 +25,7 @@ import mutagen
from storm.expr import ComparableExpr, compile, Like from storm.expr import ComparableExpr, compile, Like
from storm.exceptions import NotSupportedError from storm.exceptions import NotSupportedError
from supysonic import config, get_mime from supysonic import config
from supysonic.db import Folder, Artist, Album, Track, User, PlaylistTrack from supysonic.db import Folder, Artist, Album, Track, User, PlaylistTrack
from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack from supysonic.db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
from supysonic.db import RatingFolder, RatingTrack from supysonic.db import RatingFolder, RatingTrack
@ -166,7 +166,7 @@ class Scanner:
tr.duration = int(tag.info.length) tr.duration = int(tag.info.length)
tr.bitrate = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000 tr.bitrate = (tag.info.bitrate if hasattr(tag.info, 'bitrate') else int(os.path.getsize(path) * 8 / tag.info.length)) / 1000
tr.content_type = get_mime(os.path.splitext(path)[1][1:]) tr.content_type = config.get_mime(os.path.splitext(path)[1][1:])
tr.last_modification = os.path.getmtime(path) tr.last_modification = os.path.getmtime(path)
tralbum = self.__find_album(albumartist, album) tralbum = self.__find_album(albumartist, album)

View File

@ -1,75 +1,76 @@
# coding: utf-8 # -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic. # This file is part of Supysonic.
#
# Supysonic is a Python implementation of the Subsonic server API. # Supysonic is a Python implementation of the Subsonic server API.
# Copyright (C) 2013 Alban 'spl0k' Féron
# #
# This program is free software: you can redistribute it and/or modify # Copyright (C) 2013-2017 Alban 'spl0k' Féron
# it under the terms of the GNU Affero General Public License as published by # 2017 Óscar García Amor
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# #
# This program is distributed in the hope that it will be useful, # Distributed under terms of the GNU AGPLv3 license.
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os.path
from flask import Flask, g from flask import Flask, g
from os import makedirs, path
from werkzeug.local import LocalProxy from werkzeug.local import LocalProxy
from supysonic import config from supysonic import config
from supysonic.db import get_store from supysonic.db import get_store
def get_db_store(): # Supysonic database open
store = getattr(g, 'store', None) def get_db():
if store: if not hasattr(g, 'database'):
return store g.database = get_store(config.get('base', 'database_uri'))
g.store = get_store(config.get('base', 'database_uri')) return g.database
return g.store
store = LocalProxy(get_db_store) # Supysonic database close
def close_db(error):
if hasattr(g, 'database'):
g.database.close()
def teardown_db(exception): store = LocalProxy(get_db)
store = getattr(g, 'store', None)
if store:
store.close()
def create_application(): def create_application():
global app global app
if not config.check(): # Check config for mandatory fields
return None config.check()
if not os.path.exists(config.get('webapp', 'cache_dir')): # Test for the cache directory
os.makedirs(config.get('webapp', 'cache_dir')) if not path.exists(config.get('webapp', 'cache_dir')):
os.makedirs(config.get('webapp', 'cache_dir'))
app = Flask(__name__) # Flask!
app.secret_key = '?9huDM\\H' app = Flask(__name__)
app.teardown_appcontext(teardown_db) # Set a secret key for sessions
secret_key = config.get('base', 'secret_key')
# If secret key is not defined in config, set develop key
if secret_key is None:
app.secret_key = 'd3v3l0p'
else:
app.secret_key = secret_key
if config.get('webapp', 'log_file'): # Close database connection on teardown
import logging app.teardown_appcontext(close_db)
from logging.handlers import TimedRotatingFileHandler
handler = TimedRotatingFileHandler(config.get('webapp', 'log_file'), when = 'midnight')
if config.get('webapp', 'log_level'):
mapping = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRTICAL': logging.CRITICAL
}
handler.setLevel(mapping.get(config.get('webapp', 'log_level').upper(), logging.NOTSET))
app.logger.addHandler(handler)
from supysonic import frontend # Set loglevel
from supysonic import api if config.get('webapp', 'log_file'):
import logging
from logging.handlers import TimedRotatingFileHandler
handler = TimedRotatingFileHandler(config.get('webapp', 'log_file'), when = 'midnight')
if config.get('webapp', 'log_level'):
mapping = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRTICAL': logging.CRITICAL
}
handler.setLevel(mapping.get(config.get('webapp', 'log_level').upper(), logging.NOTSET))
app.logger.addHandler(handler)
return app # Import app sections
from supysonic import frontend
from supysonic import api
return app

25
tests/__init__.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
import unittest
from .test_manager_folder import FolderManagerTestCase
from .test_manager_user import UserManagerTestCase
from .test_api import ApiTestCase
from .test_frontend import FrontendTestCase
def suite():
suite = unittest.TestSuite()
suite.addTest(unittest.makeSuite(FolderManagerTestCase))
suite.addTest(unittest.makeSuite(UserManagerTestCase))
suite.addTest(unittest.makeSuite(ApiTestCase))
suite.addTest(unittest.makeSuite(FrontendTestCase))
return suite

297
tests/test_api.py Normal file
View File

@ -0,0 +1,297 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
from supysonic import db
from supysonic.managers.user import UserManager
import sys
import unittest
import uuid
# Create an empty sqlite database in memory
store = db.get_store("sqlite:")
# Read schema from file
with open('schema/sqlite.sql') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
store.execute(command)
# Create some users
UserManager.add(store, 'alice', 'alice', 'test@example.com', True)
UserManager.add(store, 'bob', 'bob', 'bob@example.com', False)
UserManager.add(store, 'charlie', 'charlie', 'charlie@example.com', False)
# Create a mockup of web
from flask import Flask
app = Flask(__name__)
class web():
app = app
store = store
sys.modules['supysonic.web'] = web()
# Import module and set app in test mode
import supysonic.api
class ApiTestCase(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
def test_ping(self):
# GET non-existent user
rv = self.app.get('/rest/ping.view?u=null&p=null&c=test')
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# POST non-existent user
rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test'))
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# GET user request
rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test')
self.assertIn('status="ok"', rv.data)
# POST user request
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('status="ok"', rv.data)
# GET user request with old enc:
rv = self.app.get('/rest/ping.view?u=alice&p=enc:616c696365&c=test')
self.assertIn('status="ok"', rv.data)
# POST user request with old enc:
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='enc:616c696365', c='test'))
self.assertIn('status="ok"', rv.data)
# GET user request with bad password
rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test')
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
# POST user request with bad password
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test'))
self.assertIn('status="failed"', rv.data)
self.assertIn('message="Unauthorized"', rv.data)
def test_ping_in_jsonp(self):
# If ping in jsonp works all other endpoints must work OK
# GET non-existent user
rv = self.app.get('/rest/ping.view?u=null&p=null&c=test&f=jsonp&callback=test')
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# POST non-existent user
rv = self.app.post('/rest/ping.view', data=dict(u='null', p='null', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# GET user request
rv = self.app.get('/rest/ping.view?u=alice&p=alice&c=test&f=jsonp&callback=test')
self.assertIn('"status": "ok"', rv.data)
# POST user request
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='alice', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "ok"', rv.data)
# GET user request with bad password
rv = self.app.get('/rest/ping.view?u=alice&p=bad&c=test&f=jsonp&callback=test')
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
# POST user request with bad password
rv = self.app.post('/rest/ping.view', data=dict(u='alice', p='bad', c='test', f='jsonp', callback='test'))
self.assertIn('"status": "failed"', rv.data)
self.assertIn('"message": "Unauthorized"', rv.data)
def test_not_implemented(self):
# Access to not implemented endpoint
rv = self.app.get('/rest/not-implemented?u=alice&p=alice&c=test')
self.assertIn('message="Not implemented"', rv.data)
rv = self.app.post('/rest/not-implemented', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Not implemented"', rv.data)
def test_get_license(self):
# GET user request
rv = self.app.get('/rest/getLicense.view?u=alice&p=alice&c=test')
self.assertIn('status="ok"', rv.data)
self.assertIn('license valid="true"', rv.data)
# POST user request
rv = self.app.post('/rest/getLicense.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('status="ok"', rv.data)
self.assertIn('license valid="true"', rv.data)
def test_get_user(self):
# GET missing username
rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test')
self.assertIn('message="Missing username"', rv.data)
# POST missing username
rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Missing username"', rv.data)
# GET non-admin request for other user
rv = self.app.get('/rest/getUser.view?u=bob&p=bob&c=test&username=alice')
self.assertIn('message="Admin restricted"', rv.data)
# POST non-admin request for other user
rv = self.app.post('/rest/getUser.view', data=dict(u='bob', p='bob', c='test', username='alice'))
self.assertIn('message="Admin restricted"', rv.data)
# GET non-existent user
rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=null')
self.assertIn('message="Unknown user"', rv.data)
# POST non-existent user
rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='null'))
self.assertIn('message="Unknown user"', rv.data)
# GET admin request
rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=alice')
self.assertIn('adminRole="true"', rv.data)
self.assertIn('username="alice"', rv.data)
# POST admin request
rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='alice'))
self.assertIn('adminRole="true"', rv.data)
self.assertIn('username="alice"', rv.data)
# GET admin request for other user
rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=bob')
self.assertIn('username="bob"', rv.data)
self.assertIn('adminRole="false"', rv.data)
# POST admin request for other user
rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='bob'))
self.assertIn('username="bob"', rv.data)
self.assertIn('adminRole="false"', rv.data)
# GET non-admin request
rv = self.app.get('/rest/getUser.view?u=charlie&p=charlie&c=test&username=charlie')
self.assertIn('username="charlie"', rv.data)
self.assertIn('adminRole="false"', rv.data)
# POST non-admin request
rv = self.app.post('/rest/getUser.view', data=dict(u='charlie', p='charlie', c='test', username='charlie'))
self.assertIn('username="charlie"', rv.data)
self.assertIn('adminRole="false"', rv.data)
def test_get_users(self):
# GET admin request
rv = self.app.get('/rest/getUsers.view?u=alice&p=alice&c=test')
self.assertIn('alice', rv.data)
self.assertIn('bob', rv.data)
self.assertIn('charlie', rv.data)
# POST admin request
rv = self.app.post('/rest/getUsers.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('alice', rv.data)
self.assertIn('bob', rv.data)
self.assertIn('charlie', rv.data)
# GET non-admin request
rv = self.app.get('/rest/getUsers.view?u=bob&p=bob&c=test')
self.assertIn('message="Admin restricted"', rv.data)
# POST non-admin request
rv = self.app.post('/rest/getUsers.view', data=dict(u='bob', p='bob', c='test'))
self.assertIn('message="Admin restricted"', rv.data)
def test_create_user(self):
# GET non-admin request
rv = self.app.get('/rest/createUser.view?u=bob&p=bob&c=test')
self.assertIn('message="Admin restricted"', rv.data)
# POST non-admin request
rv = self.app.post('/rest/createUser.view', data=dict(u='bob', p='bob', c='test'))
self.assertIn('message="Admin restricted"', rv.data)
# GET incomplete request
rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test')
self.assertIn('message="Missing parameter"', rv.data)
# POST incomplete request
rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Missing parameter"', rv.data)
# GET create user and test that user is created
rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test&username=david&password=david&email=david%40example.com&adminRole=True')
self.assertIn('status="ok"', rv.data)
rv = self.app.get('/rest/getUser.view?u=david&p=david&c=test&username=david')
self.assertIn('username="david"', rv.data)
self.assertIn('email="david@example.com"', rv.data)
self.assertIn('adminRole="true"', rv.data)
# POST create user and test that user is created
rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test', username='elanor', password='elanor', email='elanor@example.com', adminRole=True))
self.assertIn('status="ok"', rv.data)
rv = self.app.post('/rest/getUser.view', data=dict(u='elanor', p='elanor', c='test', username='elanor'))
self.assertIn('username="elanor"', rv.data)
self.assertIn('email="elanor@example.com"', rv.data)
self.assertIn('adminRole="true"', rv.data)
# GET create duplicate
rv = self.app.get('/rest/createUser.view?u=alice&p=alice&c=test&username=david&password=david&email=david%40example.com&adminRole=True')
self.assertIn('message="There is already a user with that username"', rv.data)
# POST create duplicate
rv = self.app.post('/rest/createUser.view', data=dict(u='alice', p='alice', c='test', username='elanor', password='elanor', email='elanor@example.com', adminRole=True))
self.assertIn('message="There is already a user with that username"', rv.data)
def test_delete_user(self):
# GET non-admin request
rv = self.app.get('/rest/deleteUser.view?u=bob&p=bob&c=test')
self.assertIn('message="Admin restricted"', rv.data)
# POST non-admin request
rv = self.app.post('/rest/deleteUser.view', data=dict(u='bob', p='bob', c='test'))
self.assertIn('message="Admin restricted"', rv.data)
# GET incomplete request
rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test')
self.assertIn('message="Unknown user"', rv.data)
# POST incomplete request
rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Unknown user"', rv.data)
# GET delete non-existent user
rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test&username=nonexistent')
self.assertIn('message="Unknown user"', rv.data)
# POST delete non-existent user
rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test', username='nonexistent'))
self.assertIn('message="Unknown user"', rv.data)
# GET delete existent user
rv = self.app.get('/rest/deleteUser.view?u=alice&p=alice&c=test&username=elanor')
self.assertIn('status="ok"', rv.data)
rv = self.app.get('/rest/getUser.view?u=alice&p=alice&c=test&username=elanor')
self.assertIn('message="Unknown user"', rv.data)
# POST delete existent user
rv = self.app.post('/rest/deleteUser.view', data=dict(u='alice', p='alice', c='test', username='david'))
self.assertIn('status="ok"', rv.data)
rv = self.app.post('/rest/getUser.view', data=dict(u='alice', p='alice', c='test', username='david'))
self.assertIn('message="Unknown user"', rv.data)
def test_change_password(self):
# GET incomplete request
rv = self.app.get('/rest/changePassword.view?u=alice&p=alice&c=test')
self.assertIn('message="Missing parameter"', rv.data)
# POST incomplete request
rv = self.app.post('/rest/changePassword.view', data=dict(u='alice', p='alice', c='test'))
self.assertIn('message="Missing parameter"', rv.data)
# GET non-admin change own password
rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=newpassword')
self.assertIn('status="ok"', rv.data)
# POST non-admin change own password
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='newpassword', c='test', username='bob', password='bob'))
self.assertIn('status="ok"', rv.data)
# GET non-admin change other user password
rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=alice&password=newpassword')
self.assertIn('message="Admin restricted"', rv.data)
# POST non-admin change other user password
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='bob', c='test', username='alice', password='newpassword'))
self.assertIn('message="Admin restricted"', rv.data)
# GET admin change other user password
rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=newpassword')
self.assertIn('status="ok"', rv.data)
# POST admin change other user password
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='newpassword', c='test', username='bob', password='bob'))
self.assertIn('status="ok"', rv.data)
# GET change non-existent user password
rv = self.app.get('/rest/changePassword.view?u=alice&p=alice&c=test&username=nonexistent&password=nonexistent')
self.assertIn('message="No such user"', rv.data)
# POST change non-existent user password
rv = self.app.post('/rest/changePassword.view', data=dict(u='alice', p='alice', c='test', username='nonexistent', password='nonexistent'))
self.assertIn('message="No such user"', rv.data)
# GET non-admin change own password using extended utf-8 characters
rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=новыйпароль')
self.assertIn('status="ok"', rv.data)
# POST non-admin change own password using extended utf-8 characters
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='новыйпароль', c='test', username='bob', password='bob'))
self.assertIn('status="ok"', rv.data)
# GET non-admin change own password using extended utf-8 characters with old enc:
rv = self.app.get('/rest/changePassword.view?u=bob&p=enc:626f62&c=test&username=bob&password=новыйпароль')
self.assertIn('status="ok"', rv.data)
# POST non-admin change own password using extended utf-8 characters with old enc:
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='enc:d0bdd0bed0b2d18bd0b9d0bfd0b0d180d0bed0bbd18c', c='test', username='bob', password='bob'))
self.assertIn('status="ok"', rv.data)
# GET non-admin change own password using enc: in password
rv = self.app.get('/rest/changePassword.view?u=bob&p=bob&c=test&username=bob&password=enc:test')
self.assertIn('status="ok"', rv.data)
# POST non-admin change own password using enc: in password
rv = self.app.post('/rest/changePassword.view', data=dict(u='bob', p='enc:test', c='test', username='bob', password='bob'))
self.assertIn('status="ok"', rv.data)
if __name__ == '__main__':
unittest.main()

105
tests/test_frontend.py Normal file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
from supysonic import db
from supysonic.managers.user import UserManager
import sys
import unittest
import uuid
# Create an empty sqlite database in memory
store = db.get_store("sqlite:")
# Read schema from file
with open('schema/sqlite.sql') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
store.execute(command)
# Create some users
UserManager.add(store, 'alice', 'alice', 'test@example.com', True)
UserManager.add(store, 'bob', 'bob', 'bob@example.com', False)
UserManager.add(store, 'charlie', 'charlie', 'charlie@example.com', False)
# Create a mockup of web
from flask import Flask
app = Flask(__name__, template_folder='../supysonic/templates')
class web():
app = app
store = store
sys.modules['supysonic.web'] = web()
# Import module and set app in test mode
import supysonic.frontend
app.secret_key = 'test-suite'
class FrontendTestCase(unittest.TestCase):
def setUp(self):
self.app = app.test_client()
def test_unauthorized_request(self):
# Unauthorized request
rv = self.app.get('/', follow_redirects=True)
self.assertIn('Please login', rv.data)
def test_login_with_bad_data(self):
# Login with not blank user or password
rv = self.app.post('/user/login', data=dict(name='', password=''), follow_redirects=True)
self.assertIn('Missing user name', rv.data)
self.assertIn('Missing password', rv.data)
# Login with not valid user or password
rv = self.app.post('/user/login', data=dict(user='nonexistent', password='nonexistent'), follow_redirects=True)
self.assertIn('No such user', rv.data)
rv = self.app.post('/user/login', data=dict(user='alice', password='badpassword'), follow_redirects=True)
self.assertIn('Wrong password', rv.data)
def test_login_admin(self):
# Login with a valid admin user
rv = self.app.post('/user/login', data=dict(user='alice', password='alice'), follow_redirects=True)
self.assertIn('Logged in', rv.data)
self.assertIn('Users', rv.data)
self.assertIn('Folders', rv.data)
def test_login_non_admin(self):
# Login with a valid non-admin user
rv = self.app.post('/user/login', data=dict(user='bob', password='bob'), follow_redirects=True)
self.assertIn('Logged in', rv.data)
# Non-admin user cannot acces to users and folders
self.assertNotIn('Users', rv.data)
self.assertNotIn('Folders', rv.data)
def test_root_with_valid_session(self):
# Root with valid session
with self.app.session_transaction() as sess:
sess['userid'] = store.find(db.User, db.User.name == 'alice').one().id
sess['username'] = 'alice'
rv = self.app.get('/', follow_redirects=True)
self.assertIn('alice', rv.data)
self.assertIn('Log out', rv.data)
self.assertIn('There\'s nothing much to see here.', rv.data)
def test_root_with_non_valid_session(self):
# Root with a no-valid session
with self.app.session_transaction() as sess:
sess['userid'] = uuid.uuid4()
sess['username'] = 'alice'
rv = self.app.get('/', follow_redirects=True)
self.assertIn('Please login', rv.data)
# Root with a no-valid user
with self.app.session_transaction() as sess:
sess['userid'] = store.find(db.User, db.User.name == 'alice').one().id
sess['username'] = 'nonexistent'
rv = self.app.get('/', follow_redirects=True)
self.assertIn('Please login', rv.data)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
from supysonic import db
from supysonic.managers.folder import FolderManager
import os
import shutil
import tempfile
import unittest
import uuid
class FolderManagerTestCase(unittest.TestCase):
def setUp(self):
# Create an empty sqlite database in memory
self.store = db.get_store("sqlite:")
# Read schema from file
with open('schema/sqlite.sql') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
self.store.execute(command)
# Create some temporary directories
self.media_dir = tempfile.mkdtemp()
self.music_dir = tempfile.mkdtemp()
# Add test folders
self.assertEqual(FolderManager.add(self.store, 'media', self.media_dir), FolderManager.SUCCESS)
self.assertEqual(FolderManager.add(self.store, 'music', self.music_dir), FolderManager.SUCCESS)
folder = db.Folder()
folder.root = False
folder.name = 'non-root'
folder.path = os.path.join(self.music_dir, 'subfolder')
self.store.add(folder)
self.store.commit()
def tearDown(self):
shutil.rmtree(self.media_dir)
shutil.rmtree(self.music_dir)
def test_get_folder(self):
# Get existing folders
for name in ['media', 'music']:
folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one()
self.assertEqual(FolderManager.get(self.store, folder.id), (FolderManager.SUCCESS, folder))
# Get with invalid UUID
self.assertEqual(FolderManager.get(self.store, 'invalid-uuid'), (FolderManager.INVALID_ID, None))
# Non-existent folder
self.assertEqual(FolderManager.get(self.store, uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None))
def test_add_folder(self):
# Create duplicate
self.assertEqual(FolderManager.add(self.store,'media', self.media_dir), FolderManager.NAME_EXISTS)
# Duplicate path
self.assertEqual(FolderManager.add(self.store,'new-folder', self.media_dir), FolderManager.PATH_EXISTS)
# Invalid path
self.assertEqual(FolderManager.add(self.store,'invalid-path', os.path.abspath('/this/not/is/valid')), FolderManager.INVALID_PATH)
# Subfolder of already added path
os.mkdir(os.path.join(self.media_dir, 'subfolder'))
self.assertEqual(FolderManager.add(self.store,'subfolder', os.path.join(self.media_dir, 'subfolder')), FolderManager.PATH_EXISTS)
def test_delete_folder(self):
# Delete existing folders
for name in ['media', 'music']:
folder = self.store.find(db.Folder, db.Folder.name == name, db.Folder.root == True).one()
self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.SUCCESS)
# Delete invalid UUID
self.assertEqual(FolderManager.delete(self.store, 'invalid-uuid'), FolderManager.INVALID_ID)
# Delete non-existent folder
self.assertEqual(FolderManager.delete(self.store, uuid.uuid4()), FolderManager.NO_SUCH_FOLDER)
# Delete non-root folder
folder = self.store.find(db.Folder, db.Folder.name == 'non-root').one()
self.assertEqual(FolderManager.delete(self.store, folder.id), FolderManager.NO_SUCH_FOLDER)
def test_delete_by_name(self):
# Delete existing folders
for name in ['media', 'music']:
self.assertEqual(FolderManager.delete_by_name(self.store, name), FolderManager.SUCCESS)
# Delete non-existent folder
self.assertEqual(FolderManager.delete_by_name(self.store, 'null'), FolderManager.NO_SUCH_FOLDER)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,96 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2017 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
from supysonic import db
from supysonic.managers.user import UserManager
import unittest
import uuid
class UserManagerTestCase(unittest.TestCase):
def setUp(self):
# Create an empty sqlite database in memory
self.store = db.get_store("sqlite:")
# Read schema from file
with open('schema/sqlite.sql') as sql:
schema = sql.read()
# Create tables on memory database
for command in schema.split(';'):
self.store.execute(command)
# Create some users
self.assertEqual(UserManager.add(self.store, 'alice', 'alice', 'test@example.com', True), UserManager.SUCCESS)
self.assertEqual(UserManager.add(self.store, 'bob', 'bob', 'bob@example.com', False), UserManager.SUCCESS)
self.assertEqual(UserManager.add(self.store, 'charlie', 'charlie', 'charlie@example.com', False), UserManager.SUCCESS)
def test_encrypt_password(self):
self.assertEqual(UserManager._UserManager__encrypt_password('password','salt'), ('59b3e8d637cf97edbe2384cf59cb7453dfe30789', 'salt'))
self.assertEqual(UserManager._UserManager__encrypt_password('pass-word','pepper'), ('d68c95a91ed7773aa57c7c044d2309a5bf1da2e7', 'pepper'))
def test_get_user(self):
# Get existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.get(self.store, user.id), (UserManager.SUCCESS, user))
# Get with invalid UUID
self.assertEqual(UserManager.get(self.store, 'invalid-uuid'), (UserManager.INVALID_ID, None))
# Non-existent user
self.assertEqual(UserManager.get(self.store, uuid.uuid4()), (UserManager.NO_SUCH_USER, None))
def test_add_user(self):
# Create duplicate
self.assertEqual(UserManager.add(self.store, 'alice', 'alice', 'test@example.com', True), UserManager.NAME_EXISTS)
def test_delete_user(self):
# Delete existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.delete(self.store, user.id), UserManager.SUCCESS)
# Delete invalid UUID
self.assertEqual(UserManager.delete(self.store, 'invalid-uuid'), UserManager.INVALID_ID)
# Delete non-existent user
self.assertEqual(UserManager.delete(self.store, uuid.uuid4()), UserManager.NO_SUCH_USER)
def test_try_auth(self):
# Test authentication
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.try_auth(self.store, name, name), (UserManager.SUCCESS, user))
# Wrong password
self.assertEqual(UserManager.try_auth(self.store, name, 'bad'), (UserManager.WRONG_PASS, None))
# Non-existent user
self.assertEqual(UserManager.try_auth(self.store, 'null', 'null'), (UserManager.NO_SUCH_USER, None))
def test_change_password(self):
# With existing users
for name in ['alice', 'bob', 'charlie']:
user = self.store.find(db.User, db.User.name == name).one()
# God password
self.assertEqual(UserManager.change_password(self.store, user.id, name, 'newpass'), UserManager.SUCCESS)
self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user))
# Wrong password
self.assertEqual(UserManager.change_password(self.store, user.id, 'badpass', 'newpass'), UserManager.WRONG_PASS)
# With invalid UUID
self.assertEqual(UserManager.change_password(self.store, 'invalid-uuid', 'oldpass', 'newpass'), UserManager.INVALID_ID)
# Non-existent user
self.assertEqual(UserManager.change_password(self.store, uuid.uuid4(), 'oldpass', 'newpass'), UserManager.NO_SUCH_USER)
def test_change_password2(self):
# With existing users
for name in ['alice', 'bob', 'charlie']:
self.assertEqual(UserManager.change_password2(self.store, name, 'newpass'), UserManager.SUCCESS)
user = self.store.find(db.User, db.User.name == name).one()
self.assertEqual(UserManager.try_auth(self.store, name, 'newpass'), (UserManager.SUCCESS, user))
# Non-existent user
self.assertEqual(UserManager.change_password2(self.store, 'null', 'newpass'), UserManager.NO_SUCH_USER)
if __name__ == '__main__':
unittest.main()