mirror of
https://github.com/spl0k/supysonic.git
synced 2024-11-13 21:52:18 +00:00
235 lines
7.8 KiB
Python
235 lines
7.8 KiB
Python
# This file is part of Supysonic.
|
|
# Supysonic is a Python implementation of the Subsonic server API.
|
|
#
|
|
# Copyright (C) 2017-2023 Alban 'spl0k' Féron
|
|
#
|
|
# Distributed under terms of the GNU AGPLv3 license.
|
|
|
|
import mutagen
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import tempfile
|
|
import unittest
|
|
|
|
from contextlib import contextmanager
|
|
|
|
from supysonic import db
|
|
from supysonic.managers.folder import FolderManager
|
|
from supysonic.scanner import Scanner
|
|
|
|
|
|
class ScannerTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
db.init_database("sqlite:")
|
|
|
|
folder = FolderManager.add("folder", os.path.abspath("tests/assets/folder"))
|
|
self.assertIsNotNone(folder)
|
|
|
|
self.folderid = folder.id
|
|
self.__scan()
|
|
|
|
def tearDown(self):
|
|
db.release_database()
|
|
|
|
@contextmanager
|
|
def __temporary_track_copy(self):
|
|
track = db.Track.select().first()
|
|
with tempfile.NamedTemporaryFile(
|
|
dir=os.path.dirname(track.path), delete=False
|
|
) as tf:
|
|
with open(track.path, "rb") as f:
|
|
tf.write(f.read())
|
|
try:
|
|
yield tf.name
|
|
finally:
|
|
os.remove(tf.name)
|
|
|
|
def __scan(self, force=False):
|
|
self.scanner = Scanner(force=force)
|
|
self.scanner.queue_folder("folder")
|
|
self.scanner.run()
|
|
|
|
def test_scan(self):
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
self.assertRaises(TypeError, self.scanner.queue_folder, None)
|
|
self.assertRaises(
|
|
TypeError, self.scanner.queue_folder, db.Folder[self.folderid]
|
|
)
|
|
|
|
def test_rescan(self):
|
|
self.__scan()
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
def test_force_rescan(self):
|
|
self.__scan(True)
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
def test_scan_file(self):
|
|
self.scanner.scan_file("/some/inexistent/path")
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
def test_remove_file(self):
|
|
track = db.Track.select().first()
|
|
self.assertRaises(TypeError, self.scanner.remove_file, None)
|
|
self.assertRaises(TypeError, self.scanner.remove_file, track)
|
|
|
|
self.scanner.remove_file("/some/inexistent/path")
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
self.scanner.remove_file(track.path)
|
|
self.scanner.prune()
|
|
self.assertEqual(db.Track.select().count(), 0)
|
|
self.assertEqual(db.Album.select().count(), 0)
|
|
self.assertEqual(db.Artist.select().count(), 0)
|
|
|
|
def test_move_file(self):
|
|
track = db.Track.select().first()
|
|
self.assertRaises(TypeError, self.scanner.move_file, None, "string")
|
|
self.assertRaises(TypeError, self.scanner.move_file, track, "string")
|
|
self.assertRaises(TypeError, self.scanner.move_file, "string", None)
|
|
self.assertRaises(TypeError, self.scanner.move_file, "string", track)
|
|
|
|
self.scanner.move_file("/some/inexistent/path", track.path)
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
self.scanner.move_file(track.path, track.path)
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
self.assertRaises(
|
|
Exception, self.scanner.move_file, track.path, "/some/inexistent/path"
|
|
)
|
|
|
|
with self.__temporary_track_copy() as tf:
|
|
self.__scan()
|
|
self.assertEqual(db.Track.select().count(), 2)
|
|
self.scanner.move_file(tf, track.path)
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
track = db.Track.select().first()
|
|
new_path = track.path.replace("silence", "silence_moved")
|
|
self.scanner.move_file(track.path, new_path)
|
|
|
|
track = db.Track.select().first()
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
self.assertEqual(track.path, new_path)
|
|
|
|
def test_rescan_corrupt_file(self):
|
|
with self.__temporary_track_copy() as tf:
|
|
self.__scan()
|
|
self.assertEqual(db.Track.select().count(), 2)
|
|
|
|
with open(tf, "wb") as f:
|
|
f.seek(0, 0)
|
|
f.write(b"\x00" * 4096)
|
|
f.truncate()
|
|
|
|
self.__scan(True)
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
def test_rescan_removed_file(self):
|
|
with self.__temporary_track_copy():
|
|
self.__scan()
|
|
self.assertEqual(db.Track.select().count(), 2)
|
|
|
|
self.__scan()
|
|
self.assertEqual(db.Track.select().count(), 1)
|
|
|
|
def test_scan_tag_change(self):
|
|
with self.__temporary_track_copy() as tf:
|
|
self.__scan()
|
|
copy = db.Track.get(path=tf)
|
|
self.assertEqual(copy.artist.name, "Some artist")
|
|
self.assertEqual(copy.album.name, "Awesome album")
|
|
|
|
tags = mutagen.File(copy.path, easy=True)
|
|
tags["artist"] = "Renamed artist"
|
|
tags["album"] = "Crappy album"
|
|
tags.save()
|
|
|
|
self.__scan(True)
|
|
copy = db.Track.get(path=tf)
|
|
self.assertEqual(copy.artist.name, "Renamed artist")
|
|
self.assertEqual(copy.album.name, "Crappy album")
|
|
self.assertIsNotNone(db.Artist.get(name="Some artist"))
|
|
self.assertIsNotNone(db.Album.get(name="Awesome album"))
|
|
|
|
def test_stats(self):
|
|
stats = self.scanner.stats()
|
|
self.assertEqual(stats.added.artists, 1)
|
|
self.assertEqual(stats.added.albums, 1)
|
|
self.assertEqual(stats.added.tracks, 1)
|
|
self.assertEqual(stats.deleted.artists, 0)
|
|
self.assertEqual(stats.deleted.albums, 0)
|
|
self.assertEqual(stats.deleted.tracks, 0)
|
|
|
|
|
|
class ScannerDeletionsTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.__dir = tempfile.mkdtemp()
|
|
db.init_database("sqlite:")
|
|
FolderManager.add("folder", self.__dir)
|
|
|
|
# Create folder hierarchy
|
|
self._firstsubdir = tempfile.mkdtemp(dir=self.__dir)
|
|
subdir = self._firstsubdir
|
|
for _ in range(4):
|
|
subdir = tempfile.mkdtemp(dir=subdir)
|
|
|
|
# Put a file in the deepest folder
|
|
self._trackpath = os.path.join(subdir, "silence.mp3")
|
|
shutil.copyfile("tests/assets/folder/silence.mp3", self._trackpath)
|
|
|
|
self._scan()
|
|
|
|
# Create annotation data
|
|
track = db.Track.get()
|
|
firstdir = db.Folder.get(path=self._firstsubdir)
|
|
user = db.User.create(
|
|
name="user", password="password", salt="salt", last_play=track
|
|
)
|
|
db.StarredFolder.create(user=user, starred=track.folder_id)
|
|
db.StarredFolder.create(user=user, starred=firstdir)
|
|
db.StarredArtist.create(user=user, starred=track.artist_id)
|
|
db.StarredAlbum.create(user=user, starred=track.album_id)
|
|
db.StarredTrack.create(user=user, starred=track)
|
|
db.RatingFolder.create(user=user, rated=track.folder_id, rating=2)
|
|
db.RatingFolder.create(user=user, rated=firstdir, rating=2)
|
|
db.RatingTrack.create(user=user, rated=track, rating=2)
|
|
|
|
def tearDown(self):
|
|
db.release_database()
|
|
shutil.rmtree(self.__dir)
|
|
|
|
def _scan(self):
|
|
scanner = Scanner()
|
|
scanner.queue_folder("folder")
|
|
scanner.run()
|
|
|
|
return scanner.stats()
|
|
|
|
def _check_assertions(self, stats):
|
|
self.assertEqual(stats.deleted.artists, 1)
|
|
self.assertEqual(stats.deleted.albums, 1)
|
|
self.assertEqual(stats.deleted.tracks, 1)
|
|
self.assertEqual(db.Track.select().count(), 0)
|
|
self.assertEqual(db.Album.select().count(), 0)
|
|
self.assertEqual(db.Artist.select().count(), 0)
|
|
self.assertEqual(db.User.select().count(), 1)
|
|
self.assertEqual(db.Folder.select().count(), 1)
|
|
|
|
def test_parent_folder(self):
|
|
shutil.rmtree(self._firstsubdir)
|
|
stats = self._scan()
|
|
self._check_assertions(stats)
|
|
|
|
def test_track(self):
|
|
os.remove(self._trackpath)
|
|
stats = self._scan()
|
|
self._check_assertions(stats)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|