1
0
mirror of https://github.com/spl0k/supysonic.git synced 2025-01-12 11:16:18 +00:00
supysonic/tests/base/test_watcher.py

330 lines
9.1 KiB
Python
Raw Normal View History

2017-12-05 22:18:39 +00:00
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
2022-01-30 16:00:32 +00:00
# Copyright (C) 2017-2022 Alban 'spl0k' Féron
2017-12-05 22:18:39 +00:00
#
# Distributed under terms of the GNU AGPLv3 license.
2017-12-06 22:19:16 +00:00
import mutagen
2017-12-05 22:18:39 +00:00
import os
import shutil
import tempfile
import time
import unittest
from hashlib import sha1
2017-12-05 22:18:39 +00:00
from supysonic.db import init_database, release_database, Track, Artist, Folder
2017-12-05 22:18:39 +00:00
from supysonic.managers.folder import FolderManager
from supysonic.watcher import SupysonicWatcher
from ..testbase import TestConfig
2019-06-29 15:25:44 +00:00
2017-12-05 22:18:39 +00:00
class WatcherTestConfig(TestConfig):
2019-06-29 15:25:44 +00:00
DAEMON = {"wait_delay": 0.5, "log_file": "/dev/null", "log_level": "DEBUG"}
2017-12-05 22:18:39 +00:00
def __init__(self, db_uri):
2020-11-22 15:12:14 +00:00
super().__init__(False, False)
2019-06-29 15:25:44 +00:00
self.BASE["database_uri"] = db_uri
2017-12-05 22:18:39 +00:00
class WatcherTestBase(unittest.TestCase):
def setUp(self):
self.__db = tempfile.mkstemp()
dburi = "sqlite:///" + self.__db[1]
init_database(dburi)
2017-12-05 22:18:39 +00:00
2017-12-17 22:25:34 +00:00
conf = WatcherTestConfig(dburi)
2019-06-29 15:25:44 +00:00
self.__sleep_time = conf.DAEMON["wait_delay"] + 1
2017-12-05 22:18:39 +00:00
self.__watcher = SupysonicWatcher(conf)
def tearDown(self):
2019-03-31 12:43:32 +00:00
release_database()
os.close(self.__db[0])
os.remove(self.__db[1])
2017-12-05 22:18:39 +00:00
def _start(self):
2019-03-31 12:43:32 +00:00
self.__watcher.start()
2017-12-05 22:18:39 +00:00
time.sleep(0.2)
def _stop(self):
self.__watcher.stop()
def _is_alive(self):
2019-03-31 12:43:32 +00:00
return self.__watcher.running
2017-12-05 22:18:39 +00:00
def _sleep(self):
time.sleep(self.__sleep_time)
2019-06-29 15:25:44 +00:00
2017-12-05 22:18:39 +00:00
class WatcherTestCase(WatcherTestBase):
def setUp(self):
2020-11-22 15:12:14 +00:00
super().setUp()
2017-12-05 22:18:39 +00:00
self.__dir = tempfile.mkdtemp()
FolderManager.add("Folder", self.__dir)
2017-12-05 22:18:39 +00:00
self._start()
def tearDown(self):
self._stop()
shutil.rmtree(self.__dir)
2020-11-22 15:12:14 +00:00
super().tearDown()
2017-12-05 22:18:39 +00:00
@staticmethod
def _tempname():
with tempfile.NamedTemporaryFile() as f:
return os.path.basename(f.name)
2019-06-29 15:25:44 +00:00
def _temppath(self, suffix, depth=0):
if depth > 0:
2019-06-29 15:25:44 +00:00
dirpath = os.path.join(
self.__dir, *(self._tempname() for _ in range(depth))
)
os.makedirs(dirpath)
else:
dirpath = self.__dir
return os.path.join(dirpath, self._tempname() + suffix)
2017-12-06 22:19:16 +00:00
2019-06-29 15:25:44 +00:00
def _addfile(self, depth=0):
path = self._temppath(".mp3", depth)
shutil.copyfile("tests/assets/folder/silence.mp3", path)
2017-12-06 22:19:16 +00:00
return path
2019-06-29 15:25:44 +00:00
def _addcover(self, suffix=None, depth=0):
suffix = ".jpg" if suffix is None else (suffix + ".jpg")
path = self._temppath(suffix, depth)
2019-06-29 15:25:44 +00:00
shutil.copyfile("tests/assets/cover.jpg", path)
return path
2019-06-29 15:25:44 +00:00
class AudioWatcherTestCase(WatcherTestCase):
2017-12-05 22:18:39 +00:00
def assertTrackCountEqual(self, expected):
2017-12-17 22:25:34 +00:00
self.assertEqual(Track.select().count(), expected)
2017-12-05 22:18:39 +00:00
def test_add(self):
2017-12-06 22:19:16 +00:00
self._addfile()
2017-12-05 22:18:39 +00:00
self.assertTrackCountEqual(0)
self._sleep()
self.assertTrackCountEqual(1)
2021-11-21 11:20:48 +00:00
def test_add_nowait_stop(self):
self._addfile()
2022-01-30 16:00:32 +00:00
# Add a small delay (< wait_delay) so wathdog can pick up that a file was added
time.sleep(0.1)
2021-11-21 11:20:48 +00:00
self._stop()
self.assertTrackCountEqual(1)
2017-12-05 22:18:39 +00:00
def test_add_multiple(self):
2017-12-06 22:19:16 +00:00
self._addfile()
self._addfile()
self._addfile()
2017-12-05 22:18:39 +00:00
self.assertTrackCountEqual(0)
self._sleep()
self.assertEqual(Track.select().count(), 3)
self.assertEqual(Artist.select().count(), 1)
2017-12-05 22:18:39 +00:00
2017-12-06 22:19:16 +00:00
def test_change(self):
path = self._addfile()
self._sleep()
trackid = None
self.assertEqual(Track.select().count(), 1)
self.assertEqual(Artist.select().where(Artist.name == "Some artist").count(), 1)
trackid = Track.select().first().id
2017-12-06 22:19:16 +00:00
2019-06-29 15:25:44 +00:00
tags = mutagen.File(path, easy=True)
tags["artist"] = "Renamed"
2017-12-06 22:19:16 +00:00
tags.save()
self._sleep()
self.assertEqual(Track.select().count(), 1)
self.assertEqual(Artist.select().where(Artist.name == "Some artist").count(), 0)
self.assertEqual(Artist.select().where(Artist.name == "Renamed").count(), 1)
self.assertEqual(Track.select().first().id, trackid)
2017-12-06 22:19:16 +00:00
def test_rename(self):
path = self._addfile()
self._sleep()
trackid = None
self.assertEqual(Track.select().count(), 1)
trackid = Track.select().first().id
2017-12-06 22:19:16 +00:00
2019-06-29 15:25:44 +00:00
newpath = self._temppath(".mp3")
2017-12-06 22:19:16 +00:00
shutil.move(path, newpath)
self._sleep()
track = Track.select().first()
self.assertIsNotNone(track)
self.assertNotEqual(track.path, path)
self.assertEqual(track.path, newpath)
self.assertEqual(
track._path_hash, memoryview(sha1(newpath.encode("utf-8")).digest())
)
self.assertEqual(track.id, trackid)
2017-12-06 22:19:16 +00:00
def test_move_in(self):
2019-06-29 15:25:44 +00:00
filename = self._tempname() + ".mp3"
2017-12-06 22:19:16 +00:00
initialpath = os.path.join(tempfile.gettempdir(), filename)
2019-06-29 15:25:44 +00:00
shutil.copyfile("tests/assets/folder/silence.mp3", initialpath)
shutil.move(initialpath, self._temppath(".mp3"))
2017-12-06 22:19:16 +00:00
self._sleep()
self.assertTrackCountEqual(1)
def test_move_out(self):
initialpath = self._addfile()
self._sleep()
self.assertTrackCountEqual(1)
newpath = os.path.join(tempfile.gettempdir(), os.path.basename(initialpath))
shutil.move(initialpath, newpath)
self._sleep()
self.assertTrackCountEqual(0)
os.unlink(newpath)
def test_delete(self):
path = self._addfile()
self._sleep()
self.assertTrackCountEqual(1)
os.unlink(path)
self._sleep()
self.assertTrackCountEqual(0)
def test_add_delete(self):
path = self._addfile()
os.unlink(path)
self._sleep()
self.assertTrackCountEqual(0)
def test_add_rename(self):
path = self._addfile()
2019-06-29 15:25:44 +00:00
shutil.move(path, self._temppath(".mp3"))
2017-12-06 22:19:16 +00:00
self._sleep()
self.assertTrackCountEqual(1)
def test_rename_delete(self):
path = self._addfile()
self._sleep()
self.assertTrackCountEqual(1)
2019-06-29 15:25:44 +00:00
newpath = self._temppath(".mp3")
2017-12-06 22:19:16 +00:00
shutil.move(path, newpath)
os.unlink(newpath)
self._sleep()
self.assertTrackCountEqual(0)
def test_add_rename_delete(self):
path = self._addfile()
2019-06-29 15:25:44 +00:00
newpath = self._temppath(".mp3")
2017-12-06 22:19:16 +00:00
shutil.move(path, newpath)
os.unlink(newpath)
self._sleep()
self.assertTrackCountEqual(0)
def test_rename_rename(self):
path = self._addfile()
self._sleep()
self.assertTrackCountEqual(1)
2019-06-29 15:25:44 +00:00
newpath = self._temppath(".mp3")
finalpath = self._temppath(".mp3")
2017-12-06 22:19:16 +00:00
shutil.move(path, newpath)
shutil.move(newpath, finalpath)
self._sleep()
self.assertTrackCountEqual(1)
2019-06-29 15:25:44 +00:00
class CoverWatcherTestCase(WatcherTestCase):
def test_add_file_then_cover(self):
self._addfile()
path = self._addcover()
self._sleep()
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
def test_add_cover_then_file(self):
path = self._addcover()
self._addfile()
self._sleep()
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
def test_remove_cover(self):
self._addfile()
path = self._addcover()
self._sleep()
os.unlink(path)
self._sleep()
self.assertIsNone(Folder.select().first().cover_art)
def test_naming_add_good(self):
self._addcover()
self._sleep()
2019-06-29 15:25:44 +00:00
good = os.path.basename(self._addcover("cover"))
self._sleep()
self.assertEqual(Folder.select().first().cover_art, good)
def test_naming_add_bad(self):
2019-06-29 15:25:44 +00:00
good = os.path.basename(self._addcover("cover"))
self._sleep()
self._addcover()
self._sleep()
self.assertEqual(Folder.select().first().cover_art, good)
def test_naming_remove_good(self):
bad = self._addcover()
2019-06-29 15:25:44 +00:00
good = self._addcover("cover")
self._sleep()
os.unlink(good)
self._sleep()
self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad))
def test_naming_remove_bad(self):
bad = self._addcover()
2019-06-29 15:25:44 +00:00
good = self._addcover("cover")
self._sleep()
os.unlink(bad)
self._sleep()
self.assertEqual(Folder.select().first().cover_art, os.path.basename(good))
def test_rename(self):
path = self._addcover()
self._sleep()
2019-06-29 15:25:44 +00:00
newpath = self._temppath(".jpg")
shutil.move(path, newpath)
self._sleep()
self.assertEqual(Folder.select().first().cover_art, os.path.basename(newpath))
def test_add_to_folder_without_track(self):
2019-06-29 15:25:44 +00:00
path = self._addcover(depth=1)
self._sleep()
self.assertFalse(
Folder.select().where(Folder.cover_art == os.path.basename(path)).exists()
)
def test_remove_from_folder_without_track(self):
2019-06-29 15:25:44 +00:00
path = self._addcover(depth=1)
self._sleep()
os.unlink(path)
self._sleep()
def test_add_track_to_empty_folder(self):
self._addfile(1)
self._sleep()
2019-06-29 15:25:44 +00:00
if __name__ == "__main__":
unittest.main()