mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-22 17:06:17 +00:00
Refactored FolderManager to raise exceptions
rather than returning status codes
This commit is contained in:
parent
ef9e7af026
commit
4cbc8765e0
@ -156,18 +156,18 @@ class SupysonicCLI(cmd.Cmd):
|
||||
self.write_line('\n'.join('{0: <16}{1}'.format(f.name, f.path) for f in Folder.select(lambda f: f.root)))
|
||||
|
||||
def folder_add(self, name, path):
|
||||
ret = FolderManager.add(name, path)
|
||||
if ret != FolderManager.SUCCESS:
|
||||
self.write_error_line(FolderManager.error_str(ret))
|
||||
else:
|
||||
try:
|
||||
FolderManager.add(name, path)
|
||||
self.write_line("Folder '{}' added".format(name))
|
||||
except ValueError as e:
|
||||
self.write_error_line(str(e))
|
||||
|
||||
def folder_delete(self, name):
|
||||
ret = FolderManager.delete_by_name(name)
|
||||
if ret != FolderManager.SUCCESS:
|
||||
self.write_error_line(FolderManager.error_str(ret))
|
||||
else:
|
||||
try:
|
||||
FolderManager.delete_by_name(name)
|
||||
self.write_line("Deleted folder '{}'".format(name))
|
||||
except ObjectNotFound as e:
|
||||
self.write_error_line(str(e))
|
||||
|
||||
def folder_scan(self, folders, force):
|
||||
extensions = self.__config.BASE['scanner_extensions']
|
||||
|
@ -22,6 +22,7 @@ import os.path
|
||||
import uuid
|
||||
|
||||
from flask import current_app, flash, redirect, render_template, request, url_for
|
||||
from pony.orm import ObjectNotFound
|
||||
|
||||
from ..db import Folder
|
||||
from ..managers.folder import FolderManager
|
||||
@ -53,29 +54,25 @@ def add_folder_post():
|
||||
if error:
|
||||
return render_template('addfolder.html')
|
||||
|
||||
ret = FolderManager.add(name, path)
|
||||
if ret != FolderManager.SUCCESS:
|
||||
flash(FolderManager.error_str(ret))
|
||||
try:
|
||||
FolderManager.add(name, path)
|
||||
except ValueError as e:
|
||||
flash(str(e), 'error')
|
||||
return render_template('addfolder.html')
|
||||
|
||||
flash("Folder '%s' created. You should now run a scan" % name)
|
||||
|
||||
return redirect(url_for('frontend.folder_index'))
|
||||
|
||||
@frontend.route('/folder/del/<id>')
|
||||
@admin_only
|
||||
def del_folder(id):
|
||||
try:
|
||||
idid = uuid.UUID(id)
|
||||
except ValueError:
|
||||
flash('Invalid folder id')
|
||||
return redirect(url_for('frontend.folder_index'))
|
||||
|
||||
ret = FolderManager.delete(idid)
|
||||
if ret != FolderManager.SUCCESS:
|
||||
flash(FolderManager.error_str(ret))
|
||||
else:
|
||||
FolderManager.delete(id)
|
||||
flash('Deleted folder')
|
||||
except ValueError as e:
|
||||
flash(str(e), 'error')
|
||||
except ObjectNotFound:
|
||||
flash('No such folder', 'error')
|
||||
|
||||
return redirect(url_for('frontend.folder_index'))
|
||||
|
||||
@ -93,10 +90,15 @@ def scan_folder(id = None):
|
||||
for folder in Folder.select(lambda f: f.root):
|
||||
scanner.scan(folder)
|
||||
else:
|
||||
status, folder = FolderManager.get(id)
|
||||
if status != FolderManager.SUCCESS:
|
||||
flash(FolderManager.error_str(status))
|
||||
try:
|
||||
folder = FolderManager.get(id)
|
||||
except ValueError as e:
|
||||
flash(str(e), 'error')
|
||||
return redirect(url_for('frontend.folder_index'))
|
||||
except ObjectNotFound:
|
||||
flash('No such folder', 'error')
|
||||
return redirect(url_for('frontend.folder_index'))
|
||||
|
||||
scanner.scan(folder)
|
||||
|
||||
scanner.finish()
|
||||
|
@ -21,69 +21,47 @@
|
||||
import os.path
|
||||
import uuid
|
||||
|
||||
from pony.orm import db_session, select
|
||||
from pony.orm import select
|
||||
from pony.orm import ObjectNotFound
|
||||
|
||||
from ..db import Folder, Artist, Album, Track, StarredFolder, RatingFolder
|
||||
from ..db import Folder, Track
|
||||
from ..py23 import strtype
|
||||
from ..scanner import Scanner
|
||||
|
||||
class FolderManager:
|
||||
SUCCESS = 0
|
||||
INVALID_ID = 1
|
||||
NAME_EXISTS = 2
|
||||
INVALID_PATH = 3
|
||||
PATH_EXISTS = 4
|
||||
NO_SUCH_FOLDER = 5
|
||||
SUBPATH_EXISTS = 6
|
||||
|
||||
@staticmethod
|
||||
@db_session
|
||||
def get(uid):
|
||||
if isinstance(uid, strtype):
|
||||
try:
|
||||
uid = uuid.UUID(uid)
|
||||
except ValueError:
|
||||
return FolderManager.INVALID_ID, None
|
||||
elif isinstance(uid, uuid.UUID):
|
||||
pass
|
||||
else:
|
||||
return FolderManager.INVALID_ID, None
|
||||
raise ValueError('Invalid folder id')
|
||||
|
||||
try:
|
||||
folder = Folder[uid]
|
||||
return FolderManager.SUCCESS, folder
|
||||
except ObjectNotFound:
|
||||
return FolderManager.NO_SUCH_FOLDER, None
|
||||
return Folder[uid]
|
||||
|
||||
@staticmethod
|
||||
@db_session
|
||||
def add(name, path):
|
||||
if Folder.get(name = name, root = True) is not None:
|
||||
return FolderManager.NAME_EXISTS
|
||||
raise ValueError("Folder '{}' exists".format(name))
|
||||
|
||||
path = os.path.abspath(path)
|
||||
if not os.path.isdir(path):
|
||||
return FolderManager.INVALID_PATH
|
||||
raise ValueError("The path doesn't exits or is'nt a directory")
|
||||
if Folder.get(path = path) is not None:
|
||||
return FolderManager.PATH_EXISTS
|
||||
if any(path.startswith(p) for p in select(f.path for f in Folder)):
|
||||
return FolderManager.PATH_EXISTS
|
||||
raise ValueError('This path is already registered')
|
||||
if any(path.startswith(p) for p in select(f.path for f in Folder if f.root)):
|
||||
raise ValueError('This path is already registered')
|
||||
if Folder.exists(lambda f: f.path.startswith(path)):
|
||||
return FolderManager.SUBPATH_EXISTS
|
||||
raise ValueError('This path contains a folder that is already registered')
|
||||
|
||||
folder = Folder(root = True, name = name, path = path)
|
||||
return FolderManager.SUCCESS
|
||||
return Folder(root = True, name = name, path = path)
|
||||
|
||||
@staticmethod
|
||||
@db_session
|
||||
def delete(uid):
|
||||
status, folder = FolderManager.get(uid)
|
||||
if status != FolderManager.SUCCESS:
|
||||
return status
|
||||
|
||||
folder = FolderManager.get(uid)
|
||||
if not folder.root:
|
||||
return FolderManager.NO_SUCH_FOLDER
|
||||
raise ObjectNotFound(Folder)
|
||||
|
||||
scanner = Scanner()
|
||||
for track in Track.select(lambda t: t.root_folder == folder):
|
||||
@ -91,31 +69,11 @@ class FolderManager:
|
||||
scanner.finish()
|
||||
|
||||
folder.delete()
|
||||
return FolderManager.SUCCESS
|
||||
|
||||
@staticmethod
|
||||
@db_session
|
||||
def delete_by_name(name):
|
||||
folder = Folder.get(name = name, root = True)
|
||||
if not folder:
|
||||
return FolderManager.NO_SUCH_FOLDER
|
||||
return FolderManager.delete(folder.id)
|
||||
|
||||
@staticmethod
|
||||
def error_str(err):
|
||||
if err == FolderManager.SUCCESS:
|
||||
return 'No error'
|
||||
elif err == FolderManager.INVALID_ID:
|
||||
return 'Invalid folder id'
|
||||
elif err == FolderManager.NAME_EXISTS:
|
||||
return 'There is already a folder with that name. Please pick another one.'
|
||||
elif err == FolderManager.INVALID_PATH:
|
||||
return "The path doesn't exists or isn't a directory"
|
||||
elif err == FolderManager.PATH_EXISTS:
|
||||
return 'This path is already registered'
|
||||
elif err == FolderManager.NO_SUCH_FOLDER:
|
||||
return 'No such folder'
|
||||
elif err == FolderManager.SUBPATH_EXISTS:
|
||||
return 'This path contains a folder that is already registered'
|
||||
return 'Unknown error'
|
||||
raise ObjectNotFound(Folder)
|
||||
FolderManager.delete(folder.id)
|
||||
|
||||
|
@ -24,10 +24,10 @@ class TranscodingTestCase(ApiTestBase):
|
||||
super(TranscodingTestCase, self).setUp()
|
||||
self._patch_client()
|
||||
|
||||
FolderManager.add('Folder', 'tests/assets/folder')
|
||||
scanner = Scanner()
|
||||
with db_session:
|
||||
scanner.scan(Folder.get())
|
||||
folder = FolderManager.add('Folder', 'tests/assets/folder')
|
||||
scanner = Scanner()
|
||||
scanner.scan(folder)
|
||||
scanner.finish()
|
||||
|
||||
self.trackid = Track.get().id
|
||||
|
@ -26,9 +26,8 @@ class ScannerTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
db.init_database('sqlite:', True)
|
||||
|
||||
FolderManager.add('folder', os.path.abspath('tests/assets'))
|
||||
with db_session:
|
||||
folder = db.Folder.select().first()
|
||||
folder = FolderManager.add('folder', os.path.abspath('tests/assets'))
|
||||
self.assertIsNotNone(folder)
|
||||
self.folderid = folder.id
|
||||
|
||||
|
@ -85,6 +85,7 @@ class WatcherTestCase(WatcherTestBase):
|
||||
def setUp(self):
|
||||
super(WatcherTestCase, self).setUp()
|
||||
self.__dir = tempfile.mkdtemp()
|
||||
with db_session:
|
||||
FolderManager.add('Folder', self.__dir)
|
||||
self._start()
|
||||
|
||||
|
@ -72,7 +72,7 @@ class FolderTestCase(FrontendTestBase):
|
||||
|
||||
self._login('alice', 'Alic3')
|
||||
rv = self.client.get('/folder/del/string', follow_redirects = True)
|
||||
self.assertIn('Invalid', rv.data)
|
||||
self.assertIn('badly formed', rv.data)
|
||||
rv = self.client.get('/folder/del/' + str(uuid.uuid4()), follow_redirects = True)
|
||||
self.assertIn('No such folder', rv.data)
|
||||
rv = self.client.get('/folder/del/' + str(folder.id), follow_redirects = True)
|
||||
@ -91,7 +91,7 @@ class FolderTestCase(FrontendTestBase):
|
||||
self._login('alice', 'Alic3')
|
||||
|
||||
rv = self.client.get('/folder/scan/string', follow_redirects = True)
|
||||
self.assertIn('Invalid', rv.data)
|
||||
self.assertIn('badly formed', rv.data)
|
||||
rv = self.client.get('/folder/scan/' + str(uuid.uuid4()), follow_redirects = True)
|
||||
self.assertIn('No such folder', rv.data)
|
||||
rv = self.client.get('/folder/scan/' + str(folder.id), follow_redirects = True)
|
||||
|
@ -40,8 +40,8 @@ class FolderManagerTestCase(unittest.TestCase):
|
||||
@db_session
|
||||
def create_folders(self):
|
||||
# Add test folders
|
||||
self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.SUCCESS)
|
||||
self.assertEqual(FolderManager.add('music', self.music_dir), FolderManager.SUCCESS)
|
||||
self.assertIsNotNone(FolderManager.add('media', self.media_dir))
|
||||
self.assertIsNotNone(FolderManager.add('music', self.music_dir))
|
||||
|
||||
folder = db.Folder(
|
||||
root = False,
|
||||
@ -75,14 +75,14 @@ class FolderManagerTestCase(unittest.TestCase):
|
||||
# Get existing folders
|
||||
for name in ['media', 'music']:
|
||||
folder = db.Folder.get(name = name, root = True)
|
||||
self.assertEqual(FolderManager.get(folder.id), (FolderManager.SUCCESS, folder))
|
||||
self.assertEqual(FolderManager.get(folder.id), folder)
|
||||
|
||||
# Get with invalid UUID
|
||||
self.assertEqual(FolderManager.get('invalid-uuid'), (FolderManager.INVALID_ID, None))
|
||||
self.assertEqual(FolderManager.get(0xdeadbeef), (FolderManager.INVALID_ID, None))
|
||||
self.assertRaises(ValueError, FolderManager.get, 'invalid-uuid')
|
||||
self.assertRaises(ValueError, FolderManager.get, 0xdeadbeef)
|
||||
|
||||
# Non-existent folder
|
||||
self.assertEqual(FolderManager.get(uuid.uuid4()), (FolderManager.NO_SUCH_FOLDER, None))
|
||||
self.assertRaises(ObjectNotFound, FolderManager.get, uuid.uuid4())
|
||||
|
||||
@db_session
|
||||
def test_add_folder(self):
|
||||
@ -90,27 +90,27 @@ class FolderManagerTestCase(unittest.TestCase):
|
||||
self.assertEqual(db.Folder.select().count(), 3)
|
||||
|
||||
# Create duplicate
|
||||
self.assertEqual(FolderManager.add('media', self.media_dir), FolderManager.NAME_EXISTS)
|
||||
self.assertRaises(ValueError, FolderManager.add, 'media', self.media_dir)
|
||||
self.assertEqual(db.Folder.select(lambda f: f.name == 'media').count(), 1)
|
||||
|
||||
# Duplicate path
|
||||
self.assertEqual(FolderManager.add('new-folder', self.media_dir), FolderManager.PATH_EXISTS)
|
||||
self.assertRaises(ValueError, FolderManager.add, 'new-folder', self.media_dir)
|
||||
self.assertEqual(db.Folder.select(lambda f: f.path == self.media_dir).count(), 1)
|
||||
|
||||
# Invalid path
|
||||
path = os.path.abspath('/this/not/is/valid')
|
||||
self.assertEqual(FolderManager.add('invalid-path', path), FolderManager.INVALID_PATH)
|
||||
self.assertRaises(ValueError, FolderManager.add, 'invalid-path', path)
|
||||
self.assertFalse(db.Folder.exists(path = path))
|
||||
|
||||
# Subfolder of already added path
|
||||
path = os.path.join(self.media_dir, 'subfolder')
|
||||
os.mkdir(path)
|
||||
self.assertEqual(FolderManager.add('subfolder', path), FolderManager.PATH_EXISTS)
|
||||
self.assertRaises(ValueError, FolderManager.add, 'subfolder', path)
|
||||
self.assertEqual(db.Folder.select().count(), 3)
|
||||
|
||||
# Parent folder of an already added path
|
||||
path = os.path.join(self.media_dir, '..')
|
||||
self.assertEqual(FolderManager.add('parent', path), FolderManager.SUBPATH_EXISTS)
|
||||
self.assertRaises(ValueError, FolderManager.add, 'parent', path)
|
||||
self.assertEqual(db.Folder.select().count(), 3)
|
||||
|
||||
@db_session
|
||||
@ -120,20 +120,20 @@ class FolderManagerTestCase(unittest.TestCase):
|
||||
# Delete existing folders
|
||||
for name in ['media', 'music']:
|
||||
folder = db.Folder.get(name = name, root = True)
|
||||
self.assertEqual(FolderManager.delete(folder.id), FolderManager.SUCCESS)
|
||||
FolderManager.delete(folder.id)
|
||||
self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id)
|
||||
|
||||
# Delete invalid UUID
|
||||
self.assertEqual(FolderManager.delete('invalid-uuid'), FolderManager.INVALID_ID)
|
||||
self.assertRaises(ValueError, FolderManager.delete, 'invalid-uuid')
|
||||
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
|
||||
|
||||
# Delete non-existent folder
|
||||
self.assertEqual(FolderManager.delete(uuid.uuid4()), FolderManager.NO_SUCH_FOLDER)
|
||||
self.assertRaises(ObjectNotFound, FolderManager.delete, uuid.uuid4())
|
||||
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
|
||||
|
||||
# Delete non-root folder
|
||||
folder = db.Folder.get(name = 'non-root')
|
||||
self.assertEqual(FolderManager.delete(folder.id), FolderManager.NO_SUCH_FOLDER)
|
||||
self.assertRaises(ObjectNotFound, FolderManager.delete, folder.id)
|
||||
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
|
||||
|
||||
@db_session
|
||||
@ -142,20 +142,13 @@ class FolderManagerTestCase(unittest.TestCase):
|
||||
|
||||
# Delete existing folders
|
||||
for name in ['media', 'music']:
|
||||
self.assertEqual(FolderManager.delete_by_name(name), FolderManager.SUCCESS)
|
||||
FolderManager.delete_by_name(name)
|
||||
self.assertFalse(db.Folder.exists(name = name))
|
||||
|
||||
# Delete non-existent folder
|
||||
self.assertEqual(FolderManager.delete_by_name('null'), FolderManager.NO_SUCH_FOLDER)
|
||||
self.assertRaises(ObjectNotFound, FolderManager.delete_by_name, 'null')
|
||||
self.assertEqual(db.Folder.select().count(), 1) # 'non-root' remaining
|
||||
|
||||
def test_human_readable_error(self):
|
||||
values = [ FolderManager.SUCCESS, FolderManager.INVALID_ID, FolderManager.NAME_EXISTS,
|
||||
FolderManager.INVALID_PATH, FolderManager.PATH_EXISTS, FolderManager.NO_SUCH_FOLDER,
|
||||
FolderManager.SUBPATH_EXISTS, 1594826, 'string', uuid.uuid4() ]
|
||||
for value in values:
|
||||
self.assertIsInstance(FolderManager.error_str(value), strtype)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user