1
0
mirror of https://github.com/spl0k/supysonic.git synced 2025-01-22 06:53:59 +00:00

Porting supysonic.watcher

Which mostly means fixing the scanner
This commit is contained in:
Alban Féron 2022-12-11 15:40:23 +01:00
parent cd369f6c7f
commit c5246c74bb
No known key found for this signature in database
GPG Key ID: 8CE0313646D16165
4 changed files with 57 additions and 68 deletions

View File

@ -291,19 +291,20 @@ class Scanner(Thread):
except Track.DoesNotExist:
return
tr_dst = Track.get(path=dst_path)
if tr_dst is not None:
try:
tr_dst = Track.get(path=dst_path)
root = tr_dst.root_folder
folder = tr_dst.folder
self.remove_file(dst_path)
tr.root_folder = root
tr.folder = folder
else:
except Track.DoesNotExist:
root = self.__find_root_folder(dst_path)
folder = self.__find_folder(dst_path)
tr.root_folder = root
tr.folder = folder
tr.path = dst_path
tr.save()
def find_cover(self, dirpath):
if not isinstance(dirpath, str): # pragma: nocover
@ -312,8 +313,9 @@ class Scanner(Thread):
if not os.path.exists(dirpath):
return
folder = Folder.get(path=dirpath)
if folder is None:
try:
folder = Folder.get(path=dirpath)
except Folder.DoesNotExist:
return
album_name = None
@ -323,18 +325,21 @@ class Scanner(Thread):
cover = find_cover_in_folder(folder.path, album_name)
folder.cover_art = cover.name if cover is not None else None
folder.save()
def add_cover(self, path):
if not isinstance(path, str): # pragma: nocover
raise TypeError("Expecting string, got " + str(type(path)))
folder = Folder.get(path=os.path.dirname(path))
if folder is None:
try:
folder = Folder.get(path=os.path.dirname(path))
except Folder.DoesNotExist:
return
cover_name = os.path.basename(path)
if not folder.cover_art:
folder.cover_art = cover_name
folder.save()
elif folder.cover_art != cover_name:
album_name = None
track = folder.tracks.select().first()
@ -345,6 +350,7 @@ class Scanner(Thread):
new_cover = CoverFile(cover_name, album_name)
if new_cover.score > current_cover.score:
folder.cover_art = cover_name
folder.save()
def __find_album(self, artist, album):
ar = self.__find_artist(artist)
@ -379,9 +385,11 @@ class Scanner(Thread):
drive, _ = os.path.splitdrive(path)
path = os.path.dirname(path)
while path not in (drive, "/"):
folder = Folder.get(path=path)
if folder is not None:
try:
folder = Folder.get(path=path)
break
except Folder.DoesNotExist:
pass
created = datetime.fromtimestamp(os.path.getmtime(path))
children.append(
@ -396,7 +404,7 @@ class Scanner(Thread):
assert folder is not None
while children:
folder = Folder(parent=folder, **children.pop())
folder = Folder.create(parent=folder, **children.pop())
return folder

View File

@ -49,9 +49,9 @@ class SupysonicWatcherEventHandler(PatternMatchingEventHandler):
self.queue.put(event.src_path, op)
dirname = os.path.dirname(event.src_path)
with db_session:
folder = Folder.get(path=dirname)
if folder is None:
try:
Folder.get(path=dirname)
except Folder.DoesNotExist:
self.queue.put(dirname, op | FLAG_COVER)
else:
self.queue.put(event.src_path, op | FLAG_COVER)
@ -289,9 +289,8 @@ class SupysonicWatcher:
self.__observer = Observer()
self.__handler.queue = self.__queue
with db_session:
for folder in Folder.select(lambda f: f.root):
self.add_folder(folder)
for folder in Folder.select().where(Folder.root):
self.add_folder(folder)
logger.info("Starting watcher")
self.__queue.start()

View File

@ -109,6 +109,8 @@ class ScannerTestCase(unittest.TestCase):
track = db.Track.select().first()
new_path = track.path.replace("silence", "silence_moved")
self.scanner.move_file(track.path, new_path)
track = db.Track.select().first()
self.assertEqual(db.Track.select().count(), 1)
self.assertEqual(track.path, new_path)

View File

@ -13,7 +13,6 @@ import time
import unittest
from hashlib import sha1
from pony.orm import db_session
from supysonic.db import init_database, release_database, Track, Artist, Folder
from supysonic.managers.folder import FolderManager
@ -64,8 +63,7 @@ class WatcherTestCase(WatcherTestBase):
def setUp(self):
super().setUp()
self.__dir = tempfile.mkdtemp()
with db_session:
FolderManager.add("Folder", self.__dir)
FolderManager.add("Folder", self.__dir)
self._start()
def tearDown(self):
@ -101,7 +99,6 @@ class WatcherTestCase(WatcherTestBase):
class AudioWatcherTestCase(WatcherTestCase):
@db_session
def assertTrackCountEqual(self, expected):
self.assertEqual(Track.select().count(), expected)
@ -124,57 +121,49 @@ class AudioWatcherTestCase(WatcherTestCase):
self._addfile()
self.assertTrackCountEqual(0)
self._sleep()
with db_session:
self.assertEqual(Track.select().count(), 3)
self.assertEqual(Artist.select().count(), 1)
self.assertEqual(Track.select().count(), 3)
self.assertEqual(Artist.select().count(), 1)
def test_change(self):
path = self._addfile()
self._sleep()
trackid = None
with db_session:
self.assertEqual(Track.select().count(), 1)
self.assertEqual(
Artist.select(lambda a: a.name == "Some artist").count(), 1
)
trackid = Track.select().first().id
self.assertEqual(Track.select().count(), 1)
self.assertEqual(Artist.select().where(Artist.name == "Some artist").count(), 1)
trackid = Track.select().first().id
tags = mutagen.File(path, easy=True)
tags["artist"] = "Renamed"
tags.save()
self._sleep()
with db_session:
self.assertEqual(Track.select().count(), 1)
self.assertEqual(
Artist.select(lambda a: a.name == "Some artist").count(), 0
)
self.assertEqual(Artist.select(lambda a: a.name == "Renamed").count(), 1)
self.assertEqual(Track.select().first().id, trackid)
self.assertEqual(Track.select().count(), 1)
self.assertEqual(Artist.select().where(Artist.name == "Some artist").count(), 0)
self.assertEqual(Artist.select().where(Artist.name == "Renamed").count(), 1)
self.assertEqual(Track.select().first().id, trackid)
def test_rename(self):
path = self._addfile()
self._sleep()
trackid = None
with db_session:
self.assertEqual(Track.select().count(), 1)
trackid = Track.select().first().id
self.assertEqual(Track.select().count(), 1)
trackid = Track.select().first().id
newpath = self._temppath(".mp3")
shutil.move(path, newpath)
self._sleep()
with db_session:
track = Track.select().first()
self.assertIsNotNone(track)
self.assertNotEqual(track.path, path)
self.assertEqual(track.path, newpath)
self.assertEqual(
track._path_hash, memoryview(sha1(newpath.encode("utf-8")).digest())
)
self.assertEqual(track.id, trackid)
track = Track.select().first()
self.assertIsNotNone(track)
self.assertNotEqual(track.path, path)
self.assertEqual(track.path, newpath)
self.assertEqual(
track._path_hash, memoryview(sha1(newpath.encode("utf-8")).digest())
)
self.assertEqual(track.id, trackid)
def test_move_in(self):
filename = self._tempname() + ".mp3"
@ -255,16 +244,14 @@ class CoverWatcherTestCase(WatcherTestCase):
path = self._addcover()
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
def test_add_cover_then_file(self):
path = self._addcover()
self._addfile()
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
self.assertEqual(Folder.select().first().cover_art, os.path.basename(path))
def test_remove_cover(self):
self._addfile()
@ -274,8 +261,7 @@ class CoverWatcherTestCase(WatcherTestCase):
os.unlink(path)
self._sleep()
with db_session:
self.assertIsNone(Folder.select().first().cover_art)
self.assertIsNone(Folder.select().first().cover_art)
def test_naming_add_good(self):
self._addcover()
@ -283,8 +269,7 @@ class CoverWatcherTestCase(WatcherTestCase):
good = os.path.basename(self._addcover("cover"))
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, good)
self.assertEqual(Folder.select().first().cover_art, good)
def test_naming_add_bad(self):
good = os.path.basename(self._addcover("cover"))
@ -292,8 +277,7 @@ class CoverWatcherTestCase(WatcherTestCase):
self._addcover()
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, good)
self.assertEqual(Folder.select().first().cover_art, good)
def test_naming_remove_good(self):
bad = self._addcover()
@ -302,8 +286,7 @@ class CoverWatcherTestCase(WatcherTestCase):
os.unlink(good)
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad))
self.assertEqual(Folder.select().first().cover_art, os.path.basename(bad))
def test_naming_remove_bad(self):
bad = self._addcover()
@ -312,8 +295,7 @@ class CoverWatcherTestCase(WatcherTestCase):
os.unlink(bad)
self._sleep()
with db_session:
self.assertEqual(Folder.select().first().cover_art, os.path.basename(good))
self.assertEqual(Folder.select().first().cover_art, os.path.basename(good))
def test_rename(self):
path = self._addcover()
@ -322,17 +304,15 @@ class CoverWatcherTestCase(WatcherTestCase):
shutil.move(path, newpath)
self._sleep()
with db_session:
self.assertEqual(
Folder.select().first().cover_art, os.path.basename(newpath)
)
self.assertEqual(Folder.select().first().cover_art, os.path.basename(newpath))
def test_add_to_folder_without_track(self):
path = self._addcover(depth=1)
self._sleep()
with db_session:
self.assertFalse(Folder.exists(cover_art=os.path.basename(path)))
self.assertFalse(
Folder.select().where(Folder.cover_art == os.path.basename(path)).exists()
)
def test_remove_from_folder_without_track(self):
path = self._addcover(depth=1)