#!/usr/bin/env python # coding: utf-8 # # This file is part of Supysonic. # Supysonic is a Python implementation of the Subsonic server API. # # Copyright (C) 2017-2018 Alban 'spl0k' FĂ©ron # # Distributed under terms of the GNU AGPLv3 license. import io import mutagen import os import shutil import tempfile import time import unittest from hashlib import sha1 from pony.orm import db_session from threading import Thread from supysonic.db import init_database, release_database, Track, Artist, Folder from supysonic.managers.folder import FolderManager from supysonic.watcher import SupysonicWatcher from ..testbase import TestConfig class WatcherTestConfig(TestConfig): DAEMON = { 'wait_delay': 0.5, 'log_file': '/dev/null', 'log_level': 'DEBUG' } def __init__(self, db_uri): super(WatcherTestConfig, self).__init__(False, False) self.BASE['database_uri'] = db_uri class WatcherTestBase(unittest.TestCase): def setUp(self): self.__dbfile = tempfile.mkstemp()[1] dburi = 'sqlite:///' + self.__dbfile init_database(dburi) conf = WatcherTestConfig(dburi) self.__sleep_time = conf.DAEMON['wait_delay'] + 1 self.__watcher = SupysonicWatcher(conf) def tearDown(self): release_database() os.unlink(self.__dbfile) def _start(self): self.__watcher.start() time.sleep(0.2) def _stop(self): self.__watcher.stop() def _is_alive(self): return self.__watcher.running def _sleep(self): time.sleep(self.__sleep_time) class NothingToWatchTestCase(WatcherTestBase): def test_spawn_useless_watcher(self): self._start() time.sleep(0.2) self.assertFalse(self._is_alive()) self._stop() class WatcherTestCase(WatcherTestBase): def setUp(self): super(WatcherTestCase, self).setUp() self.__dir = tempfile.mkdtemp() with db_session: FolderManager.add('Folder', self.__dir) self._start() def tearDown(self): self._stop() shutil.rmtree(self.__dir) super(WatcherTestCase, self).tearDown() @staticmethod def _tempname(): with tempfile.NamedTemporaryFile() as f: return os.path.basename(f.name) def _temppath(self, suffix, depth = 0): if depth > 0: dirpath = os.path.join(self.__dir, *(self._tempname() for _ in range(depth))) os.makedirs(dirpath) else: dirpath = self.__dir return os.path.join(dirpath, self._tempname() + suffix) def _addfile(self, depth = 0): path = self._temppath('.mp3', depth) shutil.copyfile('tests/assets/folder/silence.mp3', path) return path def _addcover(self, suffix = None, depth = 0): suffix = '.jpg' if suffix is None else (suffix + '.jpg') path = self._temppath(suffix, depth) shutil.copyfile('tests/assets/cover.jpg', path) return path class AudioWatcherTestCase(WatcherTestCase): @db_session def assertTrackCountEqual(self, expected): self.assertEqual(Track.select().count(), expected) def test_add(self): self._addfile() self.assertTrackCountEqual(0) self._sleep() self.assertTrackCountEqual(1) # This test now fails and I don't understand why #def test_add_nowait_stop(self): # self._addfile() # self._stop() # self.assertTrackCountEqual(1) def test_add_multiple(self): self._addfile() self._addfile() self._addfile() self.assertTrackCountEqual(0) self._sleep() with db_session: self.assertEqual(Track.select().count(), 3) self.assertEqual(Artist.select().count(), 1) def test_change(self): path = self._addfile() self._sleep() trackid = None with db_session: self.assertEqual(Track.select().count(), 1) self.assertEqual(Artist.select(lambda a: a.name == 'Some artist').count(), 1) trackid = Track.select().first().id tags = mutagen.File(path, easy = True) tags['artist'] = 'Renamed' tags.save() self._sleep() with db_session: self.assertEqual(Track.select().count(), 1) self.assertEqual(Artist.select(lambda a: a.name == 'Some artist').count(), 0) self.assertEqual(Artist.select(lambda a: a.name == 'Renamed').count(), 1) self.assertEqual(Track.select().first().id, trackid) def test_rename(self): path = self._addfile() self._sleep() trackid = None with db_session: self.assertEqual(Track.select().count(), 1) trackid = Track.select().first().id newpath = self._temppath('.mp3') shutil.move(path, newpath) self._sleep() with db_session: track = Track.select().first() self.assertIsNotNone(track) self.assertNotEqual(track.path, path) self.assertEqual(track.path, newpath) self.assertEqual(track._path_hash, memoryview(sha1(newpath.encode('utf-8')).digest())) self.assertEqual(track.id, trackid) def test_move_in(self): filename = self._tempname() + '.mp3' initialpath = os.path.join(tempfile.gettempdir(), filename) shutil.copyfile('tests/assets/folder/silence.mp3', initialpath) shutil.move(initialpath, self._temppath('.mp3')) self._sleep() self.assertTrackCountEqual(1) def test_move_out(self): initialpath = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = os.path.join(tempfile.gettempdir(), os.path.basename(initialpath)) shutil.move(initialpath, newpath) self._sleep() self.assertTrackCountEqual(0) os.unlink(newpath) def test_delete(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) os.unlink(path) self._sleep() self.assertTrackCountEqual(0) def test_add_delete(self): path = self._addfile() os.unlink(path) self._sleep() self.assertTrackCountEqual(0) def test_add_rename(self): path = self._addfile() shutil.move(path, self._temppath('.mp3')) self._sleep() self.assertTrackCountEqual(1) def test_rename_delete(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = self._temppath('.mp3') shutil.move(path, newpath) os.unlink(newpath) self._sleep() self.assertTrackCountEqual(0) def test_add_rename_delete(self): path = self._addfile() newpath = self._temppath('.mp3') shutil.move(path, newpath) os.unlink(newpath) self._sleep() self.assertTrackCountEqual(0) def test_rename_rename(self): path = self._addfile() self._sleep() self.assertTrackCountEqual(1) newpath = self._temppath('.mp3') finalpath = self._temppath('.mp3') shutil.move(path, newpath) shutil.move(newpath, finalpath) self._sleep() self.assertTrackCountEqual(1) class CoverWatcherTestCase(WatcherTestCase): def test_add_file_then_cover(self): self._addfile() path = self._addcover() self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(path)) def test_add_cover_then_file(self): path = self._addcover() self._addfile() self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(path)) def test_remove_cover(self): self._addfile() path = self._addcover() self._sleep() os.unlink(path) self._sleep() with db_session: self.assertIsNone(Folder.select().first().cover_art) def test_naming_add_good(self): bad = os.path.basename(self._addcover()) self._sleep() good = os.path.basename(self._addcover('cover')) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, good) def test_naming_add_bad(self): good = os.path.basename(self._addcover('cover')) self._sleep() bad = os.path.basename(self._addcover()) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, good) def test_naming_remove_good(self): bad = self._addcover() good = self._addcover('cover') self._sleep() os.unlink(good) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad)) def test_naming_remove_bad(self): bad = self._addcover() good = self._addcover('cover') self._sleep() os.unlink(bad) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(good)) def test_rename(self): path = self._addcover() self._sleep() newpath = self._temppath('.jpg') shutil.move(path, newpath) self._sleep() with db_session: self.assertEqual(Folder.select().first().cover_art, os.path.basename(newpath)) def test_add_to_folder_without_track(self): path = self._addcover(depth = 1) self._sleep() with db_session: self.assertFalse(Folder.exists(cover_art = os.path.basename(path))) def test_remove_from_folder_without_track(self): path = self._addcover(depth = 1) self._sleep() os.unlink(path) self._sleep() def test_add_track_to_empty_folder(self): self._addfile(1) self._sleep() def suite(): suite = unittest.TestSuite() suite.addTest(unittest.makeSuite(NothingToWatchTestCase)) suite.addTest(unittest.makeSuite(AudioWatcherTestCase)) suite.addTest(unittest.makeSuite(CoverWatcherTestCase)) return suite if __name__ == '__main__': unittest.main()