#!/usr/bin/env python # coding: utf-8 # # This file is part of Supysonic. # Supysonic is a Python implementation of the Subsonic server API. # # Copyright (C) 2017-2018 Alban 'spl0k' FĂ©ron # # Distributed under terms of the GNU AGPLv3 license. import io import mutagen import os import shutil import tempfile import time import unittest from hashlib import sha1 from pony.orm import db_session from threading import Thread from supysonic.db import init_database, release_database, Track, Artist, Folder from supysonic.managers.folder import FolderManager from supysonic.watcher import SupysonicWatcher from ..testbase import TestConfig class WatcherTestConfig(TestConfig): DAEMON = {"wait_delay": 0.5, "log_file": "/dev/null", "log_level": "DEBUG"} def __init__(self, db_uri): super(WatcherTestConfig, self).__init__(False, False) self.BASE["database_uri"] = db_uri class WatcherTestBase(unittest.TestCase): def setUp(self): self.__dbfile = tempfile.mkstemp()[1] dburi = "sqlite:///" + self.__dbfile init_database(dburi) conf = WatcherTestConfig(dburi) self.__sleep_time = conf.DAEMON["wait_delay"] + 1 self.__watcher = SupysonicWatcher(conf) def tearDown(self): release_database() os.unlink(self.__dbfile) def _start(self): self.__watcher.start() time.sleep(0.2) def _stop(self): self.__watcher.stop() def _is_alive(self): return self.__watcher.running def _sleep(self): time.sleep(self.__sleep_time) class WatcherTestCase(WatcherTestBase): def setUp(self): super(WatcherTestCase, self).setUp() self.__dir = tempfile.mkdtemp() with db_session: FolderManager.add("Folder", self.__dir) self._start() def tearDown(self): self._stop() shutil.rmtree(self.__dir) super(WatcherTestCase, self).tearDown() @staticmethod def _tempname(): with tempfile.NamedTemporaryFile() as f: return os.path.basename(f.name) def _temppath(self, suffix, depth=0): if depth > 0: dirpath = os.path.join( self.__dir, *(self._tempname() for _ in range(depth)) ) os.makedirs(dirpath) else: dirpath = self.__dir return os.path.join(dirpath, self._tempname() + suffix) def _addfile(self, depth=0): path = self._temppath(".mp3", depth) shutil.copyfile("tests/assets/folder/silence.mp3", path) return path def _addcover(self, suffix=None, depth=0): suffix = ".jpg" if suffix is None else (suffix + ".jpg") path = self._temppath(suffix, depth) shutil.copyfile("tests/assets/cover.jpg", path) return path class AudioWatcherTestCase(WatcherTestCase): @db_session def assertTrackCountEqual(self, expected): self.assertEqual(Track.select().count(), expected) def test_add(self): self._addfile() self.assertTrackCountEqual(0) self._sleep() self.assertTrackCountEqual(1) # This test now fails and I don't understand why # def test_add_nowait_stop(self): # self._addfile() # self._stop() # self.assertTrackCountEqual(1) def test_add_multiple(self): self._addfile() self._addfile() self._addfile() self.assertTrackCountEqual(0) self._sleep() with db_session: self.assertEqual(Track.select().count(), 3) self.assertEqual(Artist.select().count(), 1) def test_change(self): path = self._addfile() self._sleep() trackid = None with db_session: self.assertEqual(Track.select().count(), 1) self.assertEqual( Artist.select(lambda a: a.name == "Some artist").count(), 1 ) trackid = Track.select().first().id tags = mutagen.File(path, easy=True) tags["artist"] = "Renamed" tags.save() self._sleep() with db_session: self.assertEqual(Track.select().count(), 1) self.assertEqual( Artist.select(lambda a: a.name == "Some artist").count(), 0 ) self.assertEqual(Artist.select(lambda a: a.name == "Renamed").count(), 1) self.assertEqual(Track.select().first().id, trackid) def test_rename(self): path = self._addfile() self._sleep() trackid = None with db_session: self.assertEqual(Track.select().count(), 1) trackid = Track.select().first().id newpath = self._temppath(".mp3") shutil.move(path, newpath) self._sleep() with db_session: track = Track.select().first() self.assertIsNotNone(track) self.assertNotEqual(track.path, path) self.assertEqual(track.path, newpath) self.assertEqual( track._path_hash, memoryview(sha1(newpath.encode("utf-8")).digest()) ) self.assertEqual(track.id, trackid) def test_move_in(self): filename = self._tempname() + ".mp3" initialpath = os.path.join(tempfile.gettempdir(), filename) shutil.copyfile("tests/assets/folder/silence.mp3", initialpath) shutil.move(initialpath, self._temppath(".mp3")) self._sleep() self.assertTrackCountEqual(1) def test_move_out(self): initialpath = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = os.path.join(tempfile.gettempdir(), os.path.basename(initialpath)) shutil.move(initialpath, newpath) self._sleep() self.assertTrackCountEqual(0) os.unlink(newpath) def test_delete(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) os.unlink(path) self._sleep() self.assertTrackCountEqual(0) def test_add_delete(self): path = self._addfile() os.unlink(path) self._sleep() self.assertTrackCountEqual(0) def test_add_rename(self): path = self._addfile() shutil.move(path, self._temppath(".mp3")) self._sleep() self.assertTrackCountEqual(1) def test_rename_delete(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = self._temppath(".mp3") shutil.move(path, newpath) os.unlink(newpath) self._sleep() self.assertTrackCountEqual(0) def test_add_rename_delete(self): path = self._addfile() newpath = self._temppath(".mp3") shutil.move(path, newpath) os.unlink(newpath) self._sleep() self.assertTrackCountEqual(0) def test_rename_rename(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = self._temppath(".mp3") finalpath = self._temppath(".mp3") shutil.move(path, newpath) shutil.move(newpath, finalpath) self._sleep() self.assertTrackCountEqual(1) class CoverWatcherTestCase(WatcherTestCase): def test_add_file_then_cover(self): self._addfile() path = self._addcover() self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(path)) def test_add_cover_then_file(self): path = self._addcover() self._addfile() self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(path)) def test_remove_cover(self): self._addfile() path = self._addcover() self._sleep() os.unlink(path) self._sleep() with db_session: self.assertIsNone(Folder.select().first().cover_art) def test_naming_add_good(self): bad = os.path.basename(self._addcover()) self._sleep() good = os.path.basename(self._addcover("cover")) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, good) def test_naming_add_bad(self): good = os.path.basename(self._addcover("cover")) self._sleep() bad = os.path.basename(self._addcover()) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, good) def test_naming_remove_good(self): bad = self._addcover() good = self._addcover("cover") self._sleep() os.unlink(good) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad)) def test_naming_remove_bad(self): bad = self._addcover() good = self._addcover("cover") self._sleep() os.unlink(bad) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(good)) def test_rename(self): path = self._addcover() self._sleep() newpath = self._temppath(".jpg") shutil.move(path, newpath) self._sleep() with db_session: self.assertEqual( Folder.select().first().cover_art, os.path.basename(newpath) ) def test_add_to_folder_without_track(self): path = self._addcover(depth=1) self._sleep() with db_session: self.assertFalse(Folder.exists(cover_art=os.path.basename(path))) def test_remove_from_folder_without_track(self): path = self._addcover(depth=1) self._sleep() os.unlink(path) self._sleep() def test_add_track_to_empty_folder(self): self._addfile(1) self._sleep() def suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(AudioWatcherTestCase)) suite.addTest(unittest.makeSuite(CoverWatcherTestCase)) return suite if __name__ == "__main__": unittest.main()