1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 17:06:17 +00:00

Getting out of the storm on a pony

This commit is contained in:
spl0k 2017-12-14 22:28:49 +01:00
parent 8046457661
commit 6bd61e0388
5 changed files with 401 additions and 381 deletions

View File

@ -2,8 +2,9 @@
Supysonic is a Python implementation of the [Subsonic][] server API.
[![Build Status](https://travis-ci.org/spl0k/supysonic.svg?branch=master)](https://travis-ci.org/spl0k/supysonic)
[![codecov](https://codecov.io/gh/spl0k/supysonic/branch/master/graph/badge.svg)](https://codecov.io/gh/spl0k/supysonic)
[![Build Status](https://travis-ci.org/spl0k/supysonic.svg?branch=pony)](https://travis-ci.org/spl0k/supysonic)
[![codecov](https://codecov.io/gh/spl0k/supysonic/branch/pony/graph/badge.svg)](https://codecov.io/gh/spl0k/supysonic)
![Python](https://img.shields.io/badge/python-2.7-blue.svg)
Current supported features are:
* browsing (by folders or tags)
@ -50,27 +51,21 @@ You'll need these to run Supysonic:
* Python 2.7
* [Flask](http://flask.pocoo.org/) >= 0.9
* [Storm](https://storm.canonical.com/)
* [PonyORM](https://ponyorm.com/)
* [Python Imaging Library](https://github.com/python-pillow/Pillow)
* [simplejson](https://simplejson.readthedocs.io/en/latest/)
* [requests](http://docs.python-requests.org/)
* [mutagen](https://mutagen.readthedocs.io/en/latest/)
* [watchdog](https://github.com/gorakhargosh/watchdog)
On a Debian-like OS (Debian, Ubuntu, Linux Mint, etc.), you can install them
this way:
You can install all of them using `pip`:
$ apt-get install python-flask python-storm python-imaging python-simplesjon python-requests python-mutagen python-watchdog
$ pip install -r requirements.txt
You may also need a database specific package:
* MySQL: `apt install python-mysqldb`
* PostgreSQL: `apt-install python-psycopg2`
Due to a bug in `storm`, `psycopg2` version 2.5 and later does not work
properly. You can either use version 2.4 or [patch storm][storm] yourself.
[storm]: https://bugs.launchpad.net/storm/+bug/1170063
* MySQL: `pip install pymysql` or `pip install mysqlclient`
* PostgreSQL: `pip install psycopg2`
### Configuration
@ -84,7 +79,7 @@ The sample configuration (`config.sample`) looks like this:
```ini
[base]
; A Storm database URI. See the 'schema' folder for schema creation scripts
; A database URI. See the 'schema' folder for schema creation scripts
; Default: sqlite:///tmp/supysonic/supysonic.db
;database_uri = sqlite:////var/supysonic/supysonic.db
;database_uri = mysql://supysonic:supysonic@localhost/supysonic

View File

@ -1,5 +1,5 @@
[base]
; A Storm database URI. See the 'schema' folder for schema creation scripts
; A database URI. See the 'schema' folder for schema creation scripts
; Default: sqlite:///tmp/supysonic/supysonic.db
;database_uri = sqlite:////var/supysonic/supysonic.db
;database_uri = mysql://supysonic:supysonic@localhost/supysonic

View File

@ -1,5 +1,5 @@
flask>=0.9
storm
pony
Pillow
simplejson
requests>=1.0.0

View File

@ -18,45 +18,41 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from storm.properties import *
from storm.references import Reference, ReferenceSet
from storm.database import create_database
from storm.store import Store
from storm.variables import Variable
import uuid, datetime, time
import time
import mimetypes
import os.path
from datetime import datetime
from pony.orm import Database, Required, Optional, Set, PrimaryKey
from pony.orm import ObjectNotFound
from pony.orm import min, max, avg, sum
from urlparse import urlparse
from uuid import UUID, uuid4
def now():
return datetime.datetime.now().replace(microsecond = 0)
return datetime.now().replace(microsecond = 0)
class UnicodeOrStrVariable(Variable):
__slots__ = ()
db = Database()
def parse_set(self, value, from_db):
if isinstance(value, unicode):
return value
elif isinstance(value, str):
return unicode(value)
raise TypeError("Expected unicode, found %r: %r" % (type(value), value))
class Folder(db.Entity):
_table_ = 'folder'
Unicode.variable_class = UnicodeOrStrVariable
id = PrimaryKey(UUID, default = uuid4)
root = Required(bool, default = False)
name = Required(str)
path = Required(str, unique = True)
created = Required(datetime, precision = 0, default = now)
has_cover_art = Required(bool, default = False)
last_scan = Required(int, default = 0)
class Folder(object):
__storm_table__ = 'folder'
parent = Optional(lambda: Folder, reverse = 'children', column = 'parent_id')
children = Set(lambda: Folder, reverse = 'parent')
id = UUID(primary = True, default_factory = uuid.uuid4)
root = Bool(default = False)
name = Unicode()
path = Unicode() # unique
created = DateTime(default_factory = now)
has_cover_art = Bool(default = False)
last_scan = Int(default = 0)
__alltracks = Set(lambda: Track, lazy = True, reverse = 'root_folder') # Never used, hide it. Could be huge, lazy load
tracks = Set(lambda: Track, reverse = 'folder')
parent_id = UUID() # nullable
parent = Reference(parent_id, id)
children = ReferenceSet(id, parent_id)
stars = Set(lambda: StarredFolder)
ratings = Set(lambda: RatingFolder)
def as_subsonic_child(self, user):
info = {
@ -67,29 +63,36 @@ class Folder(object):
'created': self.created.isoformat()
}
if not self.root:
info['parent'] = str(self.parent_id)
info['parent'] = str(self.parent.id)
info['artist'] = self.parent.name
if self.has_cover_art:
info['coverArt'] = str(self.id)
starred = Store.of(self).get(StarredFolder, (user.id, self.id))
if starred:
try:
starred = StarredFolder[user.id, self.id]
info['starred'] = starred.date.isoformat()
except ObjectNotFound: pass
rating = Store.of(self).get(RatingFolder, (user.id, self.id))
if rating:
try:
rating = RatingFolder[user.id, self.id]
info['userRating'] = rating.rating
avgRating = Store.of(self).find(RatingFolder, RatingFolder.rated_id == self.id).avg(RatingFolder.rating)
except ObjectNotFound: pass
avgRating = avg(self.ratings.rating)
if avgRating:
info['averageRating'] = avgRating
return info
class Artist(object):
__storm_table__ = 'artist'
class Artist(db.Entity):
_table_ = 'artist'
id = UUID(primary = True, default_factory = uuid.uuid4)
name = Unicode() # unique
id = PrimaryKey(UUID, default = uuid4)
name = Required(str, unique = True)
albums = Set(lambda: Album)
tracks = Set(lambda: Track)
stars = Set(lambda: StarredArtist)
def as_subsonic_artist(self, user):
info = {
@ -99,38 +102,42 @@ class Artist(object):
'albumCount': self.albums.count()
}
starred = Store.of(self).get(StarredArtist, (user.id, self.id))
if starred:
try:
starred = StarredArtist[user.id, self.id]
info['starred'] = starred.date.isoformat()
except ObjectNotFound: pass
return info
class Album(object):
__storm_table__ = 'album'
class Album(db.Entity):
_table_ = 'album'
id = UUID(primary = True, default_factory = uuid.uuid4)
name = Unicode()
artist_id = UUID()
artist = Reference(artist_id, Artist.id)
id = PrimaryKey(UUID, default = uuid4)
name = Required(str)
artist = Required(Artist, column = 'artist_id')
tracks = Set(lambda: Track)
stars = Set(lambda: StarredAlbum)
def as_subsonic_album(self, user):
info = {
'id': str(self.id),
'name': self.name,
'artist': self.artist.name,
'artistId': str(self.artist_id),
'artistId': str(self.artist.id),
'songCount': self.tracks.count(),
'duration': sum(self.tracks.values(Track.duration)),
'created': min(self.tracks.values(Track.created)).isoformat()
'duration': sum(self.tracks.duration),
'created': min(self.tracks.created).isoformat()
}
track_with_cover = self.tracks.find(Track.folder_id == Folder.id, Folder.has_cover_art).any()
track_with_cover = self.tracks.select(lambda t: t.folder.has_cover_art)[:1][0]
if track_with_cover:
info['coverArt'] = str(track_with_cover.folder_id)
info['coverArt'] = str(track_with_cover.folder.id)
starred = Store.of(self).get(StarredAlbum, (user.id, self.id))
if starred:
try:
starred = StarredAlbum[user.id, self.id]
info['starred'] = starred.date.isoformat()
except ObjectNotFound: pass
return info
@ -138,41 +145,42 @@ class Album(object):
year = min(map(lambda t: t.year if t.year else 9999, self.tracks))
return '%i%s' % (year, self.name.lower())
Artist.albums = ReferenceSet(Artist.id, Album.artist_id)
class Track(db.Entity):
_table_ = 'track'
class Track(object):
__storm_table__ = 'track'
id = PrimaryKey(UUID, default = uuid4)
disc = Required(int)
number = Required(int)
title = Required(str)
year = Optional(int)
genre = Optional(str)
duration = Required(int)
id = UUID(primary = True, default_factory = uuid.uuid4)
disc = Int()
number = Int()
title = Unicode()
year = Int() # nullable
genre = Unicode() # nullable
duration = Int()
album_id = UUID()
album = Reference(album_id, Album.id)
artist_id = UUID()
artist = Reference(artist_id, Artist.id)
bitrate = Int()
album = Required(Album, column = 'album_id')
artist = Required(Artist, column = 'artist_id')
path = Unicode() # unique
content_type = Unicode()
created = DateTime(default_factory = now)
last_modification = Int()
bitrate = Required(int)
play_count = Int(default = 0)
last_play = DateTime() # nullable
path = Required(str, unique = True)
content_type = Required(str)
created = Required(datetime, precision = 0, default = now)
last_modification = Required(int)
root_folder_id = UUID()
root_folder = Reference(root_folder_id, Folder.id)
folder_id = UUID()
folder = Reference(folder_id, Folder.id)
play_count = Required(int, default = 0)
last_play = Optional(datetime, precision = 0)
root_folder = Required(Folder, column = 'root_folder_id')
folder = Required(Folder, column = 'folder_id')
__lastly_played_by = Set(lambda: User) # Never used, hide it
stars = Set(lambda: StarredTrack)
ratings = Set(lambda: RatingTrack)
def as_subsonic_child(self, user, prefs):
info = {
'id': str(self.id),
'parent': str(self.folder_id),
'parent': str(self.folder.id),
'isDir': False,
'title': self.title,
'album': self.album.name,
@ -187,8 +195,8 @@ class Track(object):
'isVideo': False,
'discNumber': self.disc,
'created': self.created.isoformat(),
'albumId': str(self.album_id),
'artistId': str(self.artist_id),
'albumId': str(self.album.id),
'artistId': str(self.artist.id),
'type': 'music'
}
@ -197,16 +205,19 @@ class Track(object):
if self.genre:
info['genre'] = self.genre
if self.folder.has_cover_art:
info['coverArt'] = str(self.folder_id)
info['coverArt'] = str(self.folder.id)
starred = Store.of(self).get(StarredTrack, (user.id, self.id))
if starred:
try:
starred = StarredTrack[user.id, self.id]
info['starred'] = starred.date.isoformat()
except ObjectNotFound: pass
rating = Store.of(self).get(RatingTrack, (user.id, self.id))
if rating:
try:
rating = RatingTrack[user.id, self.id]
info['userRating'] = rating.rating
avgRating = Store.of(self).find(RatingTrack, RatingTrack.rated_id == self.id).avg(RatingTrack.rating)
except ObjectNotFound: pass
avgRating = avg(self.ratings.rating)
if avgRating:
info['averageRating'] = avgRating
@ -228,25 +239,31 @@ class Track(object):
def sort_key(self):
return (self.album.artist.name + self.album.name + ("%02i" % self.disc) + ("%02i" % self.number) + self.title).lower()
Folder.tracks = ReferenceSet(Folder.id, Track.folder_id)
Album.tracks = ReferenceSet(Album.id, Track.album_id)
Artist.tracks = ReferenceSet(Artist.id, Track.artist_id)
class User(db.Entity):
_table_ = 'user'
class User(object):
__storm_table__ = 'user'
id = PrimaryKey(UUID, default = uuid4)
name = Required(str, unique = True)
mail = Optional(str)
password = Required(str)
salt = Required(str)
admin = Required(bool, default = False)
lastfm_session = Optional(str)
lastfm_status = Required(bool, default = True) # True: ok/unlinked, False: invalid session
id = UUID(primary = True, default_factory = uuid.uuid4)
name = Unicode() # unique
mail = Unicode()
password = Unicode()
salt = Unicode()
admin = Bool(default = False)
lastfm_session = Unicode() # nullable
lastfm_status = Bool(default = True) # True: ok/unlinked, False: invalid session
last_play = Optional(Track, column = 'last_play_id')
last_play_date = Optional(datetime, precision = 0)
last_play_id = UUID() # nullable
last_play = Reference(last_play_id, Track.id)
last_play_date = DateTime() # nullable
clients = Set(lambda: ClientPrefs)
playlists = Set(lambda: Playlist)
__messages = Set(lambda: ChatMessage, lazy = True) # Never used, hide it
starred_folders = Set(lambda: StarredFolder, lazy = True)
starred_artists = Set(lambda: StarredArtist, lazy = True)
starred_albums = Set(lambda: StarredAlbum, lazy = True)
starred_tracks = Set(lambda: StarredTrack, lazy = True)
folder_ratings = Set(lambda: RatingFolder, lazy = True)
track_ratings = Set(lambda: RatingTrack, lazy = True)
def as_subsonic_user(self):
return {
@ -266,72 +283,74 @@ class User(object):
'shareRole': False
}
class ClientPrefs(object):
__storm_table__ = 'client_prefs'
__storm_primary__ = 'user_id', 'client_name'
class ClientPrefs(db.Entity):
_table_ = 'client_prefs'
user_id = UUID()
client_name = Unicode()
format = Unicode() # nullable
bitrate = Int() # nullable
user = Required(User, column = 'user_id')
client_name = Required(str)
PrimaryKey(user, client_name)
format = Optional(str)
bitrate = Optional(int)
class BaseStarred(object):
__storm_primary__ = 'user_id', 'starred_id'
class StarredFolder(db.Entity):
_table_ = 'starred_folder'
user_id = UUID()
starred_id = UUID()
date = DateTime(default_factory = now)
user = Required(User, column = 'user_id')
starred = Required(Folder, column = 'starred_id')
date = Required(datetime, precision = 0, default = now)
user = Reference(user_id, User.id)
PrimaryKey(user, starred)
class StarredFolder(BaseStarred):
__storm_table__ = 'starred_folder'
class StarredArtist(db.Entity):
_table_ = 'starred_artist'
starred = Reference(BaseStarred.starred_id, Folder.id)
user = Required(User, column = 'user_id')
starred = Required(Artist, column = 'starred_id')
date = Required(datetime, precision = 0, default = now)
class StarredArtist(BaseStarred):
__storm_table__ = 'starred_artist'
PrimaryKey(user, starred)
starred = Reference(BaseStarred.starred_id, Artist.id)
class StarredAlbum(db.Entity):
_table_ = 'starred_album'
class StarredAlbum(BaseStarred):
__storm_table__ = 'starred_album'
user = Required(User, column = 'user_id')
starred = Required(Album, column = 'starred_id')
date = Required(datetime, precision = 0, default = now)
starred = Reference(BaseStarred.starred_id, Album.id)
PrimaryKey(user, starred)
class StarredTrack(BaseStarred):
__storm_table__ = 'starred_track'
class StarredTrack(db.Entity):
_table_ = 'starred_track'
starred = Reference(BaseStarred.starred_id, Track.id)
user = Required(User, column = 'user_id')
starred = Required(Track, column = 'starred_id')
date = Required(datetime, precision = 0, default = now)
class BaseRating(object):
__storm_primary__ = 'user_id', 'rated_id'
PrimaryKey(user, starred)
user_id = UUID()
rated_id = UUID()
rating = Int()
class RatingFolder(db.Entity):
_table_ = 'rating_folder'
user = Required(User, column = 'user_id')
rated = Required(Folder, column = 'rated_id')
rating = Required(int)
user = Reference(user_id, User.id)
PrimaryKey(user, rated)
class RatingFolder(BaseRating):
__storm_table__ = 'rating_folder'
class RatingTrack(db.Entity):
_table_ = 'rating_track'
user = Required(User, column = 'user_id')
rated = Required(Track, column = 'rated_id')
rating = Required(int)
rated = Reference(BaseRating.rated_id, Folder.id)
PrimaryKey(user, rated)
class RatingTrack(BaseRating):
__storm_table__ = 'rating_track'
class ChatMessage(db.Entity):
_table_ = 'chat_message'
rated = Reference(BaseRating.rated_id, Track.id)
class ChatMessage(object):
__storm_table__ = 'chat_message'
id = UUID(primary = True, default_factory = uuid.uuid4)
user_id = UUID()
time = Int(default_factory = lambda: int(time.time()))
message = Unicode()
user = Reference(user_id, User.id)
id = PrimaryKey(UUID, default = uuid4)
user = Required(User, column = 'user_id')
time = Required(int, default = lambda: int(time.time()))
message = Required(str)
def responsize(self):
return {
@ -340,24 +359,22 @@ class ChatMessage(object):
'message': self.message
}
class Playlist(object):
__storm_table__ = 'playlist'
class Playlist(db.Entity):
_table_ = 'playlist'
id = UUID(primary = True, default_factory = uuid.uuid4)
user_id = UUID()
name = Unicode()
comment = Unicode() # nullable
public = Bool(default = False)
created = DateTime(default_factory = now)
tracks = Unicode()
user = Reference(user_id, User.id)
id = PrimaryKey(UUID, default = uuid4)
user = Required(User, column = 'user_id')
name = Required(str)
comment = Optional(str)
public = Required(bool, default = False)
created = Required(datetime, precision = 0, default = now)
tracks = Optional(str)
def as_subsonic_playlist(self, user):
tracks = self.get_tracks()
info = {
'id': str(self.id),
'name': self.name if self.user_id == user.id else '[%s] %s' % (self.user.name, self.name),
'name': self.name if self.user.id == user.id else '[%s] %s' % (self.user.name, self.name),
'owner': self.user.name,
'public': self.public,
'songCount': len(tracks),
@ -374,38 +391,34 @@ class Playlist(object):
tracks = []
should_fix = False
store = Store.of(self)
for t in self.tracks.split(','):
try:
tid = uuid.UUID(t)
track = store.get(Track, tid)
if track:
tracks.append(track)
else:
should_fix = True
tid = UUID(t)
track = Track[tid]
tracks.append(track)
except:
should_fix = True
if should_fix:
self.tracks = ','.join(map(lambda t: str(t.id), tracks))
store.commit()
db.commit()
return tracks
def clear(self):
self.tracks = ""
self.tracks = ''
def add(self, track):
if isinstance(track, uuid.UUID):
if isinstance(track, UUID):
tid = track
elif isinstance(track, Track):
tid = track.id
elif isinstance(track, basestring):
tid = uuid.UUID(track)
tid = UUID(track)
if self.tracks and len(self.tracks) > 0:
self.tracks = "{},{}".format(self.tracks, tid)
self.tracks = '{},{}'.format(self.tracks, tid)
else:
self.tracks = str(tid)
@ -418,8 +431,35 @@ class Playlist(object):
self.tracks = ','.join(t for t in tracks if t)
def get_store(database_uri):
database = create_database(database_uri)
store = Store(database)
return store
def parse_uri(database_uri):
if not isinstance(database_uri, basestring):
raise TypeError('Expecting a string')
uri = urlparse(database_uri)
if uri.scheme == 'sqlite':
path = uri.path
if not path:
path = ':memory:'
elif path[0] == '/':
path = path[1:]
return dict(provider = 'sqlite', filename = path)
elif uri.scheme in ('postgres', 'postgresql'):
return dict(provider = 'postgres', user = uri.username, password = uri.password, host = uri.hostname, database = uri.path[1:])
elif uri.scheme == 'mysql':
return dict(provider = 'mysql', user = uri.username, passwd = uri.password, host = uri.hostname, db = uri.path[1:])
return dict()
def get_database(database_uri, create_tables = False):
db.bind(**parse_uri(database_uri))
db.generate_mapping(create_tables = create_tables)
return db
def release_database(db):
if not isinstance(db, Database):
raise TypeError('Expecting a pony.orm.Database instance')
db.disconnect()
db.provider = None
db.schema = None

View File

@ -9,41 +9,38 @@
#
# Distributed under terms of the GNU AGPLv3 license.
import re
import unittest
import io, re
from collections import namedtuple
import uuid
from collections import namedtuple
from pony.orm import db_session
from supysonic import db
date_regex = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$')
class DbTestCase(unittest.TestCase):
def setUp(self):
self.store = db.get_store(u'sqlite:')
with io.open(u'schema/sqlite.sql', u'r') as f:
for statement in f.read().split(u';'):
self.store.execute(statement)
self.store = db.get_database('sqlite:', True)
def tearDown(self):
self.store.close()
db.release_database(self.store)
def create_some_folders(self):
root_folder = db.Folder()
root_folder.root = True
root_folder.name = u'Root folder'
root_folder.path = u'tests'
root_folder = db.Folder(
root = True,
name = 'Root folder',
path = 'tests'
)
child_folder = db.Folder()
child_folder.root = False
child_folder.name = u'Child folder'
child_folder.path = u'tests/assets'
child_folder.has_cover_art = True
child_folder.parent = root_folder
self.store.add(root_folder)
self.store.add(child_folder)
self.store.commit()
child_folder = db.Folder(
root = False,
name = 'Child folder',
path = 'tests/assets',
has_cover_art = True,
parent = root_folder
)
return root_folder, child_folder
@ -51,244 +48,230 @@ class DbTestCase(unittest.TestCase):
root, child = self.create_some_folders()
if not artist:
artist = db.Artist()
artist.name = u'Test Artist'
artist = db.Artist(name = 'Test artist')
if not album:
album = db.Album()
album.artist = artist
album.name = u'Test Album'
album = db.Album(artist = artist, name = 'Test Album')
track1 = db.Track()
track1.title = u'Track Title'
track1.album = album
track1.artist = artist
track1.disc = 1
track1.number = 1
track1.duration = 3
track1.bitrate = 320
track1.path = u'tests/assets/empty'
track1.content_type = u'audio/mpeg'
track1.last_modification = 1234
track1.root_folder = root
track1.folder = child
self.store.add(track1)
track1 = db.Track(
title = 'Track Title',
album = album,
artist = artist,
disc = 1,
number = 1,
duration = 3,
bitrate = 320,
path = 'tests/assets/empty',
content_type = 'audio/mpeg',
last_modification = 1234,
root_folder = root,
folder = child
)
track2 = db.Track()
track2.title = u'One Awesome Song'
track2.album = album
track2.artist = artist
track2.disc = 1
track2.number = 2
track2.duration = 5
track2.bitrate = 96
track2.path = u'tests/assets/empty'
track2.content_type = u'audio/mpeg'
track2.last_modification = 1234
track2.root_folder = root
track2.folder = child
self.store.add(track2)
track2 = db.Track(
title = 'One Awesome Song',
album = album,
artist = artist,
disc = 1,
number = 2,
duration = 5,
bitrate = 96,
path = 'tests/assets/23bytes',
content_type = 'audio/mpeg',
last_modification = 1234,
root_folder = root,
folder = child
)
return track1, track2
def create_playlist(self):
user = db.User()
user.name = u'Test User'
user.password = u'secret'
user.salt = u'ABC+'
def create_user(self, name = 'Test User'):
return db.User(
name = name,
password = 'secret',
salt = 'ABC+',
)
playlist = db.Playlist()
playlist.user = user
playlist.name = u'Playlist!'
self.store.add(playlist)
def create_playlist(self):
playlist = db.Playlist(
user = self.create_user(),
name = 'Playlist!'
)
return playlist
@db_session
def test_folder_base(self):
root_folder, child_folder = self.create_some_folders()
self.store.commit()
MockUser = namedtuple(u'User', [ u'id' ])
MockUser = namedtuple('User', [ 'id' ])
user = MockUser(uuid.uuid4())
root = root_folder.as_subsonic_child(user)
self.assertIsInstance(root, dict)
self.assertIn(u'id', root)
self.assertIn(u'isDir', root)
self.assertIn(u'title', root)
self.assertIn(u'album', root)
self.assertIn(u'created', root)
self.assertTrue(root[u'isDir'])
self.assertEqual(root[u'title'], u'Root folder')
self.assertEqual(root[u'album'], u'Root folder')
self.assertIn('id', root)
self.assertIn('isDir', root)
self.assertIn('title', root)
self.assertIn('album', root)
self.assertIn('created', root)
self.assertTrue(root['isDir'])
self.assertEqual(root['title'], 'Root folder')
self.assertEqual(root['album'], 'Root folder')
self.assertRegexpMatches(root['created'], date_regex)
child = child_folder.as_subsonic_child(user)
self.assertIn(u'parent', child)
self.assertIn(u'artist', child)
self.assertIn(u'coverArt', child)
self.assertEqual(child[u'parent'], str(root_folder.id))
self.assertEqual(child[u'artist'], root_folder.name)
self.assertEqual(child[u'coverArt'], child[u'id'])
self.assertIn('parent', child)
self.assertIn('artist', child)
self.assertIn('coverArt', child)
self.assertEqual(child['parent'], str(root_folder.id))
self.assertEqual(child['artist'], root_folder.name)
self.assertEqual(child['coverArt'], child['id'])
@db_session
def test_folder_annotation(self):
root_folder, child_folder = self.create_some_folders()
# Assuming SQLite doesn't enforce foreign key constraints
MockUser = namedtuple(u'User', [ u'id' ])
user = MockUser(uuid.uuid4())
star = db.StarredFolder()
star.user_id = user.id
star.starred_id = root_folder.id
rating_user = db.RatingFolder()
rating_user.user_id = user.id
rating_user.rated_id = root_folder.id
rating_user.rating = 2
rating_other = db.RatingFolder()
rating_other.user_id = uuid.uuid4()
rating_other.rated_id = root_folder.id
rating_other.rating = 5
self.store.add(star)
self.store.add(rating_user)
self.store.add(rating_other)
user = self.create_user()
star = db.StarredFolder(
user = user,
starred = root_folder
)
rating_user = db.RatingFolder(
user = user,
rated = root_folder,
rating = 2
)
other = self.create_user('Other')
rating_other = db.RatingFolder(
user = other,
rated = root_folder,
rating = 5
)
self.store.commit()
root = root_folder.as_subsonic_child(user)
self.assertIn(u'starred', root)
self.assertIn(u'userRating', root)
self.assertIn(u'averageRating', root)
self.assertRegexpMatches(root[u'starred'], date_regex)
self.assertEqual(root[u'userRating'], 2)
self.assertEqual(root[u'averageRating'], 3.5)
self.assertIn('starred', root)
self.assertIn('userRating', root)
self.assertIn('averageRating', root)
self.assertRegexpMatches(root['starred'], date_regex)
self.assertEqual(root['userRating'], 2)
self.assertEqual(root['averageRating'], 3.5)
child = child_folder.as_subsonic_child(user)
self.assertNotIn(u'starred', child)
self.assertNotIn(u'userRating', child)
self.assertNotIn('starred', child)
self.assertNotIn('userRating', child)
@db_session
def test_artist(self):
artist = db.Artist()
artist.name = u'Test Artist'
self.store.add(artist)
artist = db.Artist(name = 'Test Artist')
# Assuming SQLite doesn't enforce foreign key constraints
MockUser = namedtuple(u'User', [ u'id' ])
user = MockUser(uuid.uuid4())
star = db.StarredArtist()
star.user_id = user.id
star.starred_id = artist.id
self.store.add(star)
user = self.create_user()
star = db.StarredArtist(user = user, starred = artist)
self.store.commit()
artist_dict = artist.as_subsonic_artist(user)
self.assertIsInstance(artist_dict, dict)
self.assertIn(u'id', artist_dict)
self.assertIn(u'name', artist_dict)
self.assertIn(u'albumCount', artist_dict)
self.assertIn(u'starred', artist_dict)
self.assertEqual(artist_dict[u'name'], u'Test Artist')
self.assertEqual(artist_dict[u'albumCount'], 0)
self.assertRegexpMatches(artist_dict[u'starred'], date_regex)
self.assertIn('id', artist_dict)
self.assertIn('name', artist_dict)
self.assertIn('albumCount', artist_dict)
self.assertIn('starred', artist_dict)
self.assertEqual(artist_dict['name'], 'Test Artist')
self.assertEqual(artist_dict['albumCount'], 0)
self.assertRegexpMatches(artist_dict['starred'], date_regex)
album = db.Album()
album.name = u'Test Artist' # self-titled
artist.albums.add(album)
album = db.Album()
album.name = u'The Album After The Frist One'
artist.albums.add(album)
db.Album(name = 'Test Artist', artist = artist) # self-titled
db.Album(name = 'The Album After The First One', artist = artist)
self.store.commit()
artist_dict = artist.as_subsonic_artist(user)
self.assertEqual(artist_dict[u'albumCount'], 2)
self.assertEqual(artist_dict['albumCount'], 2)
@db_session
def test_album(self):
artist = db.Artist()
artist.name = u'Test Artist'
artist = db.Artist(name = 'Test Artist')
album = db.Album(artist = artist, name = 'Test Album')
album = db.Album()
album.artist = artist
album.name = u'Test Album'
# Assuming SQLite doesn't enforce foreign key constraints
MockUser = namedtuple(u'User', [ u'id' ])
user = MockUser(uuid.uuid4())
star = db.StarredAlbum()
star.user_id = user.id
star.starred = album
self.store.add(album)
self.store.add(star)
user = self.create_user()
star = db.StarredAlbum(
user = user,
starred = album
)
self.store.commit()
# No tracks, shouldn't be stored under normal circumstances
self.assertRaises(ValueError, album.as_subsonic_album, user)
self.create_some_tracks(artist, album)
self.store.commit()
album_dict = album.as_subsonic_album(user)
self.assertIsInstance(album_dict, dict)
self.assertIn(u'id', album_dict)
self.assertIn(u'name', album_dict)
self.assertIn(u'artist', album_dict)
self.assertIn(u'artistId', album_dict)
self.assertIn(u'songCount', album_dict)
self.assertIn(u'duration', album_dict)
self.assertIn(u'created', album_dict)
self.assertIn(u'starred', album_dict)
self.assertEqual(album_dict[u'name'], album.name)
self.assertEqual(album_dict[u'artist'], artist.name)
self.assertEqual(album_dict[u'artistId'], str(artist.id))
self.assertEqual(album_dict[u'songCount'], 2)
self.assertEqual(album_dict[u'duration'], 8)
self.assertRegexpMatches(album_dict[u'created'], date_regex)
self.assertRegexpMatches(album_dict[u'starred'], date_regex)
self.assertIn('id', album_dict)
self.assertIn('name', album_dict)
self.assertIn('artist', album_dict)
self.assertIn('artistId', album_dict)
self.assertIn('songCount', album_dict)
self.assertIn('duration', album_dict)
self.assertIn('created', album_dict)
self.assertIn('starred', album_dict)
self.assertEqual(album_dict['name'], album.name)
self.assertEqual(album_dict['artist'], artist.name)
self.assertEqual(album_dict['artistId'], str(artist.id))
self.assertEqual(album_dict['songCount'], 2)
self.assertEqual(album_dict['duration'], 8)
self.assertRegexpMatches(album_dict['created'], date_regex)
self.assertRegexpMatches(album_dict['starred'], date_regex)
@db_session
def test_track(self):
track1, track2 = self.create_some_tracks()
self.store.commit()
# Assuming SQLite doesn't enforce foreign key constraints
MockUser = namedtuple(u'User', [ u'id' ])
MockUser = namedtuple('User', [ 'id' ])
user = MockUser(uuid.uuid4())
track1_dict = track1.as_subsonic_child(user, None)
self.assertIsInstance(track1_dict, dict)
self.assertIn(u'id', track1_dict)
self.assertIn(u'parent', track1_dict)
self.assertIn(u'isDir', track1_dict)
self.assertIn(u'title', track1_dict)
self.assertFalse(track1_dict[u'isDir'])
self.assertIn('id', track1_dict)
self.assertIn('parent', track1_dict)
self.assertIn('isDir', track1_dict)
self.assertIn('title', track1_dict)
self.assertFalse(track1_dict['isDir'])
# ... we'll test the rest against the API XSD.
@db_session
def test_user(self):
user = db.User()
user.name = u'Test User'
user.password = u'secret'
user.salt = u'ABC+'
user = self.create_user()
self.store.commit()
user_dict = user.as_subsonic_user()
self.assertIsInstance(user_dict, dict)
@db_session
def test_chat(self):
user = db.User()
user.name = u'Test User'
user.password = u'secret'
user.salt = u'ABC+'
user = self.create_user()
line = db.ChatMessage()
line.user = user
line.message = u'Hello world!'
line = db.ChatMessage(
user = user,
message = 'Hello world!'
)
self.store.commit()
line_dict = line.responsize()
self.assertIsInstance(line_dict, dict)
self.assertIn(u'username', line_dict)
self.assertEqual(line_dict[u'username'], user.name)
self.assertIn('username', line_dict)
self.assertEqual(line_dict['username'], user.name)
@db_session
def test_playlist(self):
playlist = self.create_playlist()
playlist_dict = playlist.as_subsonic_playlist(playlist.user)
self.assertIsInstance(playlist_dict, dict)
@db_session
def test_playlist_tracks(self):
playlist = self.create_playlist()
track1, track2 = self.create_some_tracks()
@ -307,9 +290,10 @@ class DbTestCase(unittest.TestCase):
playlist.add(str(track1.id))
self.assertSequenceEqual(playlist.get_tracks(), [ track1 ])
self.assertRaises(ValueError, playlist.add, u'some string')
self.assertRaises(ValueError, playlist.add, 'some string')
self.assertRaises(NameError, playlist.add, 2345)
@db_session
def test_playlist_remove_tracks(self):
playlist = self.create_playlist()
track1, track2 = self.create_some_tracks()
@ -329,6 +313,7 @@ class DbTestCase(unittest.TestCase):
playlist.remove_at_indexes([ 1, 1 ])
self.assertSequenceEqual(playlist.get_tracks(), [ track2, track1 ])
@db_session
def test_playlist_fixing(self):
playlist = self.create_playlist()
track1, track2 = self.create_some_tracks()
@ -338,10 +323,10 @@ class DbTestCase(unittest.TestCase):
playlist.add(track2)
self.assertSequenceEqual(playlist.get_tracks(), [ track1, track2 ])
self.store.remove(track2)
track2.delete()
self.assertSequenceEqual(playlist.get_tracks(), [ track1 ])
playlist.tracks = u'{0},{0},some random garbage,{0}'.format(track1.id)
playlist.tracks = '{0},{0},some random garbage,{0}'.format(track1.id)
self.assertSequenceEqual(playlist.get_tracks(), [ track1, track1, track1 ])
if __name__ == '__main__':