mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-23 01:16:18 +00:00
5b0b5ff29b
The watcher now keeps running even if there's nothing to watch initially so we can add watched folder later on
350 lines
9.8 KiB
Python
350 lines
9.8 KiB
Python
#!/usr/bin/env python
|
|
# coding: utf-8
|
|
#
|
|
# This file is part of Supysonic.
|
|
# Supysonic is a Python implementation of the Subsonic server API.
|
|
#
|
|
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
|
#
|
|
# Distributed under terms of the GNU AGPLv3 license.
|
|
|
|
import io
|
|
import mutagen
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
|
|
from hashlib import sha1
|
|
from pony.orm import db_session
|
|
from threading import Thread
|
|
|
|
from supysonic.db import init_database, release_database, Track, Artist, Folder
|
|
from supysonic.managers.folder import FolderManager
|
|
from supysonic.watcher import SupysonicWatcher
|
|
|
|
from ..testbase import TestConfig
|
|
|
|
class WatcherTestConfig(TestConfig):
|
|
DAEMON = {
|
|
'wait_delay': 0.5,
|
|
'log_file': '/dev/null',
|
|
'log_level': 'DEBUG'
|
|
}
|
|
|
|
def __init__(self, db_uri):
|
|
super(WatcherTestConfig, self).__init__(False, False)
|
|
self.BASE['database_uri'] = db_uri
|
|
|
|
class WatcherTestBase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.__dbfile = tempfile.mkstemp()[1]
|
|
dburi = 'sqlite:///' + self.__dbfile
|
|
init_database(dburi)
|
|
|
|
conf = WatcherTestConfig(dburi)
|
|
self.__sleep_time = conf.DAEMON['wait_delay'] + 1
|
|
|
|
self.__watcher = SupysonicWatcher(conf)
|
|
|
|
def tearDown(self):
|
|
release_database()
|
|
os.unlink(self.__dbfile)
|
|
|
|
def _start(self):
|
|
self.__watcher.start()
|
|
time.sleep(0.2)
|
|
|
|
def _stop(self):
|
|
self.__watcher.stop()
|
|
|
|
def _is_alive(self):
|
|
return self.__watcher.running
|
|
|
|
def _sleep(self):
|
|
time.sleep(self.__sleep_time)
|
|
|
|
class WatcherTestCase(WatcherTestBase):
|
|
def setUp(self):
|
|
super(WatcherTestCase, self).setUp()
|
|
self.__dir = tempfile.mkdtemp()
|
|
with db_session:
|
|
FolderManager.add('Folder', self.__dir)
|
|
self._start()
|
|
|
|
def tearDown(self):
|
|
self._stop()
|
|
shutil.rmtree(self.__dir)
|
|
super(WatcherTestCase, self).tearDown()
|
|
|
|
@staticmethod
|
|
def _tempname():
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
return os.path.basename(f.name)
|
|
|
|
def _temppath(self, suffix, depth = 0):
|
|
if depth > 0:
|
|
dirpath = os.path.join(self.__dir, *(self._tempname() for _ in range(depth)))
|
|
os.makedirs(dirpath)
|
|
else:
|
|
dirpath = self.__dir
|
|
return os.path.join(dirpath, self._tempname() + suffix)
|
|
|
|
def _addfile(self, depth = 0):
|
|
path = self._temppath('.mp3', depth)
|
|
shutil.copyfile('tests/assets/folder/silence.mp3', path)
|
|
return path
|
|
|
|
def _addcover(self, suffix = None, depth = 0):
|
|
suffix = '.jpg' if suffix is None else (suffix + '.jpg')
|
|
path = self._temppath(suffix, depth)
|
|
shutil.copyfile('tests/assets/cover.jpg', path)
|
|
return path
|
|
|
|
class AudioWatcherTestCase(WatcherTestCase):
|
|
@db_session
|
|
def assertTrackCountEqual(self, expected):
|
|
self.assertEqual(Track.select().count(), expected)
|
|
|
|
def test_add(self):
|
|
self._addfile()
|
|
self.assertTrackCountEqual(0)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
# This test now fails and I don't understand why
|
|
#def test_add_nowait_stop(self):
|
|
# self._addfile()
|
|
# self._stop()
|
|
# self.assertTrackCountEqual(1)
|
|
|
|
def test_add_multiple(self):
|
|
self._addfile()
|
|
self._addfile()
|
|
self._addfile()
|
|
self.assertTrackCountEqual(0)
|
|
self._sleep()
|
|
with db_session:
|
|
self.assertEqual(Track.select().count(), 3)
|
|
self.assertEqual(Artist.select().count(), 1)
|
|
|
|
def test_change(self):
|
|
path = self._addfile()
|
|
self._sleep()
|
|
|
|
trackid = None
|
|
with db_session:
|
|
self.assertEqual(Track.select().count(), 1)
|
|
self.assertEqual(Artist.select(lambda a: a.name == 'Some artist').count(), 1)
|
|
trackid = Track.select().first().id
|
|
|
|
tags = mutagen.File(path, easy = True)
|
|
tags['artist'] = 'Renamed'
|
|
tags.save()
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Track.select().count(), 1)
|
|
self.assertEqual(Artist.select(lambda a: a.name == 'Some artist').count(), 0)
|
|
self.assertEqual(Artist.select(lambda a: a.name == 'Renamed').count(), 1)
|
|
self.assertEqual(Track.select().first().id, trackid)
|
|
|
|
def test_rename(self):
|
|
path = self._addfile()
|
|
self._sleep()
|
|
|
|
trackid = None
|
|
with db_session:
|
|
self.assertEqual(Track.select().count(), 1)
|
|
trackid = Track.select().first().id
|
|
|
|
newpath = self._temppath('.mp3')
|
|
shutil.move(path, newpath)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
track = Track.select().first()
|
|
self.assertIsNotNone(track)
|
|
self.assertNotEqual(track.path, path)
|
|
self.assertEqual(track.path, newpath)
|
|
self.assertEqual(track._path_hash, memoryview(sha1(newpath.encode('utf-8')).digest()))
|
|
self.assertEqual(track.id, trackid)
|
|
|
|
def test_move_in(self):
|
|
filename = self._tempname() + '.mp3'
|
|
initialpath = os.path.join(tempfile.gettempdir(), filename)
|
|
shutil.copyfile('tests/assets/folder/silence.mp3', initialpath)
|
|
shutil.move(initialpath, self._temppath('.mp3'))
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
def test_move_out(self):
|
|
initialpath = self._addfile()
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
newpath = os.path.join(tempfile.gettempdir(), os.path.basename(initialpath))
|
|
shutil.move(initialpath, newpath)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(0)
|
|
|
|
os.unlink(newpath)
|
|
|
|
def test_delete(self):
|
|
path = self._addfile()
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
os.unlink(path)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(0)
|
|
|
|
def test_add_delete(self):
|
|
path = self._addfile()
|
|
os.unlink(path)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(0)
|
|
|
|
def test_add_rename(self):
|
|
path = self._addfile()
|
|
shutil.move(path, self._temppath('.mp3'))
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
def test_rename_delete(self):
|
|
path = self._addfile()
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
newpath = self._temppath('.mp3')
|
|
shutil.move(path, newpath)
|
|
os.unlink(newpath)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(0)
|
|
|
|
def test_add_rename_delete(self):
|
|
path = self._addfile()
|
|
newpath = self._temppath('.mp3')
|
|
shutil.move(path, newpath)
|
|
os.unlink(newpath)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(0)
|
|
|
|
def test_rename_rename(self):
|
|
path = self._addfile()
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
newpath = self._temppath('.mp3')
|
|
finalpath = self._temppath('.mp3')
|
|
shutil.move(path, newpath)
|
|
shutil.move(newpath, finalpath)
|
|
self._sleep()
|
|
self.assertTrackCountEqual(1)
|
|
|
|
class CoverWatcherTestCase(WatcherTestCase):
|
|
def test_add_file_then_cover(self):
|
|
self._addfile()
|
|
path = self._addcover()
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
|
|
|
|
def test_add_cover_then_file(self):
|
|
path = self._addcover()
|
|
self._addfile()
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
|
|
|
|
def test_remove_cover(self):
|
|
self._addfile()
|
|
path = self._addcover()
|
|
self._sleep()
|
|
|
|
os.unlink(path)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertIsNone(Folder.select().first().cover_art)
|
|
|
|
def test_naming_add_good(self):
|
|
bad = os.path.basename(self._addcover())
|
|
self._sleep()
|
|
good = os.path.basename(self._addcover('cover'))
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, good)
|
|
|
|
def test_naming_add_bad(self):
|
|
good = os.path.basename(self._addcover('cover'))
|
|
self._sleep()
|
|
bad = os.path.basename(self._addcover())
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, good)
|
|
|
|
def test_naming_remove_good(self):
|
|
bad = self._addcover()
|
|
good = self._addcover('cover')
|
|
self._sleep()
|
|
os.unlink(good)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad))
|
|
|
|
def test_naming_remove_bad(self):
|
|
bad = self._addcover()
|
|
good = self._addcover('cover')
|
|
self._sleep()
|
|
os.unlink(bad)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, os.path.basename(good))
|
|
|
|
def test_rename(self):
|
|
path = self._addcover()
|
|
self._sleep()
|
|
newpath = self._temppath('.jpg')
|
|
shutil.move(path, newpath)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertEqual(Folder.select().first().cover_art, os.path.basename(newpath))
|
|
|
|
def test_add_to_folder_without_track(self):
|
|
path = self._addcover(depth = 1)
|
|
self._sleep()
|
|
|
|
with db_session:
|
|
self.assertFalse(Folder.exists(cover_art = os.path.basename(path)))
|
|
|
|
def test_remove_from_folder_without_track(self):
|
|
path = self._addcover(depth = 1)
|
|
self._sleep()
|
|
os.unlink(path)
|
|
self._sleep()
|
|
|
|
def test_add_track_to_empty_folder(self):
|
|
self._addfile(1)
|
|
self._sleep()
|
|
|
|
def suite():
|
|
suite = unittest.TestSuite()
|
|
|
|
suite.addTest(unittest.makeSuite(AudioWatcherTestCase))
|
|
suite.addTest(unittest.makeSuite(CoverWatcherTestCase))
|
|
|
|
return suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|