1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-11-09 11:42:16 +00:00

Improved folder deletion

This commit is contained in:
spl0k 2018-03-15 20:50:01 +01:00
parent 0fe96f05c3
commit b72daea109
4 changed files with 62 additions and 75 deletions

View File

@ -14,7 +14,7 @@ import os.path
from datetime import datetime
from pony.orm import Database, Required, Optional, Set, PrimaryKey, LongStr
from pony.orm import ObjectNotFound
from pony.orm import min, max, avg, sum
from pony.orm import min, max, avg, sum, exists
from uuid import UUID, uuid4
from .py23 import dict, strtype
@ -79,6 +79,17 @@ class Folder(db.Entity):
return info
@classmethod
def prune(cls):
query = cls.select(lambda self: not exists(t for t in Track if t.folder == self) and \
not exists(f for f in Folder if f.parent == self) and not self.root)
total = 0
while True:
count = query.delete(bulk = True)
total += count
if not count:
return total
class Artist(db.Entity):
_table_ = 'artist'
@ -104,6 +115,11 @@ class Artist(db.Entity):
return info
@classmethod
def prune(cls):
return cls.select(lambda self: not exists(a for a in Album if a.artist == self) and \
not exists(t for t in Track if t.artist == self)).delete(bulk = True)
class Album(db.Entity):
_table_ = 'album'
@ -140,6 +156,10 @@ class Album(db.Entity):
year = min(map(lambda t: t.year if t.year else 9999, self.tracks))
return '%i%s' % (year, self.name.lower())
@classmethod
def prune(cls):
return cls.select(lambda self: not exists(t for t in Track if t.album == self)).delete(bulk = True)
class Track(db.Entity):
_table_ = 'track'

View File

@ -13,9 +13,8 @@ import uuid
from pony.orm import select
from pony.orm import ObjectNotFound
from ..db import Folder, Track
from ..db import Folder, Track, Artist, Album
from ..py23 import strtype
from ..scanner import Scanner
class FolderManager:
@staticmethod
@ -52,10 +51,10 @@ class FolderManager:
if not folder.root:
raise ObjectNotFound(Folder)
scanner = Scanner()
for track in Track.select(lambda t: t.root_folder == folder):
scanner.remove_file(track.path)
scanner.finish()
Track.select(lambda t: t.root_folder == folder).delete(bulk = True)
Album.prune()
Artist.prune()
Folder.prune()
folder.delete()

View File

@ -41,14 +41,6 @@ class Scanner:
self.__stats = Stats()
self.__extensions = extensions
self.__folders_to_check = set()
self.__artists_to_check = set()
self.__albums_to_check = set()
def __del__(self):
if self.__folders_to_check or self.__artists_to_check or self.__albums_to_check:
raise Exception("There's still something to check. Did you run Scanner.finish()?")
def scan(self, folder, progress_callback = None):
if not isinstance(folder, Folder):
raise TypeError('Expecting Folder instance, got ' + str(type(folder)))
@ -99,31 +91,9 @@ class Scanner:
@db_session
def finish(self):
for album in Album.select(lambda a: a.id in self.__albums_to_check):
if not album.tracks.is_empty():
continue
self.__artists_to_check.add(album.artist.id)
self.__stats.deleted.albums += 1
album.delete()
self.__albums_to_check.clear()
for artist in Artist.select(lambda a: a.id in self.__artists_to_check):
if not artist.albums.is_empty() or not artist.tracks.is_empty():
continue
self.__stats.deleted.artists += 1
artist.delete()
self.__artists_to_check.clear()
while self.__folders_to_check:
folder = Folder[self.__folders_to_check.pop()]
if folder.root:
continue
if folder.tracks.is_empty() and folder.children.is_empty():
self.__folders_to_check.add(folder.parent.id)
folder.delete()
self.__stats.deleted.albums = Album.prune()
self.__stats.deleted.artists = Artist.prune()
Folder.prune()
def __is_valid_path(self, path):
if not os.path.exists(path):
@ -185,11 +155,9 @@ class Scanner:
self.__stats.added.tracks += 1
else:
if tr.album.id != tralbum.id:
self.__albums_to_check.add(tr.album.id)
trdict['album'] = tralbum
if tr.artist.id != trartist.id:
self.__artists_to_check.add(tr.artist.id)
trdict['artist'] = trartist
tr.set(**trdict)
@ -203,9 +171,6 @@ class Scanner:
if not tr:
return
self.__folders_to_check.add(tr.folder.id)
self.__albums_to_check.add(tr.album.id)
self.__artists_to_check.add(tr.artist.id)
self.__stats.deleted.tracks += 1
tr.delete()
@ -223,7 +188,6 @@ class Scanner:
if tr is None:
return
self.__folders_to_check.add(tr.folder.id)
tr_dst = Track.get(path = dst_path)
if tr_dst is not None:
root = tr_dst.root_folder

View File

@ -11,10 +11,8 @@
from supysonic import db
from supysonic.managers.folder import FolderManager
from supysonic.py23 import strtype
import os
import io
import shutil
import tempfile
import unittest
@ -36,7 +34,6 @@ class FolderManagerTestCase(unittest.TestCase):
shutil.rmtree(self.media_dir)
shutil.rmtree(self.music_dir)
@db_session
def create_folders(self):
# Add test folders
self.assertIsNotNone(FolderManager.add('media', self.media_dir))
@ -112,41 +109,48 @@ class FolderManagerTestCase(unittest.TestCase):
self.assertRaises(ValueError, FolderManager.add, 'parent', path)
self.assertEqual(db.Folder.select().count(), 3)
@db_session
def test_delete_folder(self):
self.create_folders()
with db_session:
self.create_folders()
# Delete existing folders
for name in ['media', 'music']:
folder = db.Folder.get(name = name, root = True)
FolderManager.delete(folder.id)
self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id)
with db_session:
# Delete invalid UUID
self.assertRaises(ValueError, FolderManager.delete, 'invalid-uuid')
self.assertEqual(db.Folder.select().count(), 3)
# Delete invalid UUID
self.assertRaises(ValueError, FolderManager.delete, 'invalid-uuid')
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
# Delete non-existent folder
self.assertRaises(ObjectNotFound, FolderManager.delete, uuid.uuid4())
self.assertEqual(db.Folder.select().count(), 3)
# Delete non-existent folder
self.assertRaises(ObjectNotFound, FolderManager.delete, uuid.uuid4())
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
# Delete non-root folder
folder = db.Folder.get(name = 'non-root')
self.assertRaises(ObjectNotFound, FolderManager.delete, folder.id)
self.assertEqual(db.Folder.select().count(), 3)
# Delete non-root folder
folder = db.Folder.get(name = 'non-root')
self.assertRaises(ObjectNotFound, FolderManager.delete, folder.id)
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
with db_session:
# Delete existing folders
for name in ['media', 'music']:
folder = db.Folder.get(name = name, root = True)
FolderManager.delete(folder.id)
self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id)
# Even if we have only 2 root folders, non-root should never exist and be cleaned anyway
self.assertEqual(db.Folder.select().count(), 0)
@db_session
def test_delete_by_name(self):
self.create_folders()
with db_session:
self.create_folders()
# Delete existing folders
for name in ['media', 'music']:
FolderManager.delete_by_name(name)
self.assertFalse(db.Folder.exists(name = name))
with db_session:
# Delete non-existent folder
self.assertRaises(ObjectNotFound, FolderManager.delete_by_name, 'null')
self.assertEqual(db.Folder.select().count(), 3)
# Delete non-existent folder
self.assertRaises(ObjectNotFound, FolderManager.delete_by_name, 'null')
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
with db_session:
# Delete existing folders
for name in ['media', 'music']:
FolderManager.delete_by_name(name)
self.assertFalse(db.Folder.exists(name = name))
if __name__ == '__main__':
unittest.main()