mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-22 08:56:17 +00:00
Port supysonic.managers.folder.FolderManager
This commit is contained in:
parent
64cf272887
commit
ccdd73f8a0
@ -1,7 +1,7 @@
|
|||||||
# This file is part of Supysonic.
|
# This file is part of Supysonic.
|
||||||
# Supysonic is a Python implementation of the Subsonic server API.
|
# Supysonic is a Python implementation of the Subsonic server API.
|
||||||
#
|
#
|
||||||
# Copyright (C) 2019 Alban 'spl0k' Féron
|
# Copyright (C) 2019-2022 Alban 'spl0k' Féron
|
||||||
#
|
#
|
||||||
# Distributed under terms of the GNU AGPLv3 license.
|
# Distributed under terms of the GNU AGPLv3 license.
|
||||||
|
|
||||||
@ -9,7 +9,6 @@ import logging
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from multiprocessing.connection import Listener, Client
|
from multiprocessing.connection import Listener, Client
|
||||||
from pony.orm import db_session, select
|
|
||||||
from threading import Thread, Event
|
from threading import Thread, Event
|
||||||
|
|
||||||
from .client import DaemonCommand
|
from .client import DaemonCommand
|
||||||
@ -73,8 +72,7 @@ class Daemon:
|
|||||||
|
|
||||||
def start_scan(self, folders=[], force=False):
|
def start_scan(self, folders=[], force=False):
|
||||||
if not folders:
|
if not folders:
|
||||||
with db_session:
|
folders = Folder.select().where(Folder.root)[:]
|
||||||
folders = select(f.name for f in Folder if f.root)[:]
|
|
||||||
|
|
||||||
if self.__scanner is not None and self.__scanner.is_alive():
|
if self.__scanner is not None and self.__scanner.is_alive():
|
||||||
for f in folders:
|
for f in folders:
|
||||||
|
@ -191,10 +191,10 @@ class Artist(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def prune(cls):
|
def prune(cls):
|
||||||
return cls.select(
|
cls.delete().where(
|
||||||
lambda self: not exists(a for a in Album if a.artist == self)
|
cls.id.not_in(Album.select(Album.artist)),
|
||||||
and not exists(t for t in Track if t.artist == self)
|
cls.id.not_in(Track.select(Track.artist)),
|
||||||
).delete()
|
).execute()
|
||||||
|
|
||||||
|
|
||||||
class Album(db.Model):
|
class Album(db.Model):
|
||||||
@ -254,9 +254,7 @@ class Album(db.Model):
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def prune(cls):
|
def prune(cls):
|
||||||
return cls.select(
|
cls.delete().where(cls.id.not_in(Track.select(Track.album))).execute()
|
||||||
lambda self: not exists(t for t in Track if t.album == self)
|
|
||||||
).delete()
|
|
||||||
|
|
||||||
|
|
||||||
class Track(PathMixin, db.Model):
|
class Track(PathMixin, db.Model):
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
# This file is part of Supysonic.
|
# This file is part of Supysonic.
|
||||||
# Supysonic is a Python implementation of the Subsonic server API.
|
# Supysonic is a Python implementation of the Subsonic server API.
|
||||||
#
|
#
|
||||||
# Copyright (C) 2019 Alban 'spl0k' Féron
|
# Copyright (C) 2019-2022 Alban 'spl0k' Féron
|
||||||
#
|
#
|
||||||
# Distributed under terms of the GNU AGPLv3 license.
|
# Distributed under terms of the GNU AGPLv3 license.
|
||||||
|
|
||||||
@ -10,7 +10,6 @@ import shlex
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from pony.orm import db_session, ObjectNotFound
|
|
||||||
from random import shuffle
|
from random import shuffle
|
||||||
from subprocess import Popen, DEVNULL
|
from subprocess import Popen, DEVNULL
|
||||||
from threading import Thread, Event, RLock
|
from threading import Thread, Event, RLock
|
||||||
@ -81,12 +80,11 @@ class Jukebox:
|
|||||||
|
|
||||||
def add(self, *tracks):
|
def add(self, *tracks):
|
||||||
with self.__lock:
|
with self.__lock:
|
||||||
with db_session:
|
for t in tracks:
|
||||||
for t in tracks:
|
try:
|
||||||
try:
|
self.__playlist.append(Track[t].path)
|
||||||
self.__playlist.append(Track[t].path)
|
except Track.DoesNotExist:
|
||||||
except ObjectNotFound:
|
pass
|
||||||
pass
|
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
with self.__lock:
|
with self.__lock:
|
||||||
|
@ -1,15 +1,12 @@
|
|||||||
# This file is part of Supysonic.
|
# This file is part of Supysonic.
|
||||||
# Supysonic is a Python implementation of the Subsonic server API.
|
# Supysonic is a Python implementation of the Subsonic server API.
|
||||||
#
|
#
|
||||||
# Copyright (C) 2013-2019 Alban 'spl0k' Féron
|
# Copyright (C) 2013-2022 Alban 'spl0k' Féron
|
||||||
#
|
#
|
||||||
# Distributed under terms of the GNU AGPLv3 license.
|
# Distributed under terms of the GNU AGPLv3 license.
|
||||||
|
|
||||||
import os.path
|
import os.path
|
||||||
|
|
||||||
from pony.orm import select
|
|
||||||
from pony.orm import ObjectNotFound
|
|
||||||
|
|
||||||
from ..daemon.client import DaemonClient
|
from ..daemon.client import DaemonClient
|
||||||
from ..daemon.exceptions import DaemonUnavailableError
|
from ..daemon.exceptions import DaemonUnavailableError
|
||||||
from ..db import Folder, Track, Artist, Album, User, RatingTrack, StarredTrack
|
from ..db import Folder, Track, Artist, Album, User, RatingTrack, StarredTrack
|
||||||
@ -27,20 +24,31 @@ class FolderManager:
|
|||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def add(name, path):
|
def add(name, path):
|
||||||
if Folder.get(name=name, root=True) is not None:
|
try:
|
||||||
|
Folder.get(name=name, root=True)
|
||||||
raise ValueError("Folder '{}' exists".format(name))
|
raise ValueError("Folder '{}' exists".format(name))
|
||||||
|
except Folder.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
path = os.path.abspath(os.path.expanduser(path))
|
path = os.path.abspath(os.path.expanduser(path))
|
||||||
if not os.path.isdir(path):
|
if not os.path.isdir(path):
|
||||||
raise ValueError("The path doesn't exits or isn't a directory")
|
raise ValueError("The path doesn't exits or isn't a directory")
|
||||||
if Folder.get(path=path) is not None:
|
|
||||||
|
try:
|
||||||
|
Folder.get(path=path)
|
||||||
raise ValueError("This path is already registered")
|
raise ValueError("This path is already registered")
|
||||||
if any(path.startswith(p) for p in select(f.path for f in Folder if f.root)):
|
except Folder.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if any(
|
||||||
|
path.startswith(p)
|
||||||
|
for (p,) in Folder.select(Folder.path).where(Folder.root).tuples()
|
||||||
|
):
|
||||||
raise ValueError("This path is already registered")
|
raise ValueError("This path is already registered")
|
||||||
if Folder.exists(lambda f: f.path.startswith(path)):
|
if Folder.select().where(Folder.path.startswith(path)).exists():
|
||||||
raise ValueError("This path contains a folder that is already registered")
|
raise ValueError("This path contains a folder that is already registered")
|
||||||
|
|
||||||
folder = Folder(root=True, name=name, path=path)
|
folder = Folder.create(root=True, name=name, path=path)
|
||||||
try:
|
try:
|
||||||
DaemonClient().add_watched_folder(path)
|
DaemonClient().add_watched_folder(path)
|
||||||
except DaemonUnavailableError:
|
except DaemonUnavailableError:
|
||||||
@ -52,30 +60,30 @@ class FolderManager:
|
|||||||
def delete(id):
|
def delete(id):
|
||||||
folder = FolderManager.get(id)
|
folder = FolderManager.get(id)
|
||||||
if not folder.root:
|
if not folder.root:
|
||||||
raise ObjectNotFound(Folder)
|
raise Folder.DoesNotExist(id)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
DaemonClient().remove_watched_folder(folder.path)
|
DaemonClient().remove_watched_folder(folder.path)
|
||||||
except DaemonUnavailableError:
|
except DaemonUnavailableError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
for user in User.select(lambda u: u.last_play.root_folder == folder):
|
users = User.select(User.id).join(Track).where(Track.root_folder == folder)
|
||||||
user.last_play = None
|
User.update(last_play=None).where(User.id.in_(users)).execute()
|
||||||
RatingTrack.select(lambda r: r.rated.root_folder == folder).delete(bulk=True)
|
|
||||||
StarredTrack.select(lambda s: s.starred.root_folder == folder).delete(bulk=True)
|
|
||||||
|
|
||||||
Track.select(lambda t: t.root_folder == folder).delete(bulk=True)
|
deleted_tracks_query = Track.select(Track.id).where(Track.root_folder == folder)
|
||||||
|
RatingTrack.delete().where(
|
||||||
|
RatingTrack.rated.in_(deleted_tracks_query)
|
||||||
|
).execute()
|
||||||
|
StarredTrack.delete().where(
|
||||||
|
StarredTrack.starred.in_(deleted_tracks_query)
|
||||||
|
).execute()
|
||||||
|
|
||||||
|
Track.delete().where(Track.root_folder == folder).execute()
|
||||||
Album.prune()
|
Album.prune()
|
||||||
Artist.prune()
|
Artist.prune()
|
||||||
Folder.select(lambda f: not f.root and f.path.startswith(folder.path)).delete(
|
Folder.delete().where(Folder.path.startswith(folder.path)).execute()
|
||||||
bulk=True
|
|
||||||
)
|
|
||||||
|
|
||||||
folder.delete()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def delete_by_name(name):
|
def delete_by_name(name):
|
||||||
folder = Folder.get(name=name, root=True)
|
folder = Folder.get(name=name, root=True)
|
||||||
if not folder:
|
|
||||||
raise ObjectNotFound(Folder)
|
|
||||||
FolderManager.delete(folder.id)
|
FolderManager.delete(folder.id)
|
||||||
|
@ -12,7 +12,6 @@ import mediafile
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pony.orm import db_session
|
|
||||||
from queue import Queue, Empty as QueueEmpty
|
from queue import Queue, Empty as QueueEmpty
|
||||||
from threading import Thread, Event
|
from threading import Thread, Event
|
||||||
|
|
||||||
@ -189,7 +188,6 @@ class Scanner(Thread):
|
|||||||
return True
|
return True
|
||||||
return os.path.splitext(path)[1][1:].lower() in self.__extensions
|
return os.path.splitext(path)[1][1:].lower() in self.__extensions
|
||||||
|
|
||||||
@db_session
|
|
||||||
def scan_file(self, path_or_direntry):
|
def scan_file(self, path_or_direntry):
|
||||||
if isinstance(path_or_direntry, str):
|
if isinstance(path_or_direntry, str):
|
||||||
path = path_or_direntry
|
path = path_or_direntry
|
||||||
@ -273,7 +271,6 @@ class Scanner(Thread):
|
|||||||
# Field validation error
|
# Field validation error
|
||||||
self.__stats.errors.append(path)
|
self.__stats.errors.append(path)
|
||||||
|
|
||||||
@db_session
|
|
||||||
def remove_file(self, path):
|
def remove_file(self, path):
|
||||||
if not isinstance(path, str):
|
if not isinstance(path, str):
|
||||||
raise TypeError("Expecting string, got " + str(type(path)))
|
raise TypeError("Expecting string, got " + str(type(path)))
|
||||||
@ -285,7 +282,6 @@ class Scanner(Thread):
|
|||||||
self.__stats.deleted.tracks += 1
|
self.__stats.deleted.tracks += 1
|
||||||
tr.delete()
|
tr.delete()
|
||||||
|
|
||||||
@db_session
|
|
||||||
def move_file(self, src_path, dst_path):
|
def move_file(self, src_path, dst_path):
|
||||||
if not isinstance(src_path, str):
|
if not isinstance(src_path, str):
|
||||||
raise TypeError("Expecting string, got " + str(type(src_path)))
|
raise TypeError("Expecting string, got " + str(type(src_path)))
|
||||||
@ -313,7 +309,6 @@ class Scanner(Thread):
|
|||||||
tr.folder = folder
|
tr.folder = folder
|
||||||
tr.path = dst_path
|
tr.path = dst_path
|
||||||
|
|
||||||
@db_session
|
|
||||||
def find_cover(self, dirpath):
|
def find_cover(self, dirpath):
|
||||||
if not isinstance(dirpath, str): # pragma: nocover
|
if not isinstance(dirpath, str): # pragma: nocover
|
||||||
raise TypeError("Expecting string, got " + str(type(dirpath)))
|
raise TypeError("Expecting string, got " + str(type(dirpath)))
|
||||||
@ -333,7 +328,6 @@ class Scanner(Thread):
|
|||||||
cover = find_cover_in_folder(folder.path, album_name)
|
cover = find_cover_in_folder(folder.path, album_name)
|
||||||
folder.cover_art = cover.name if cover is not None else None
|
folder.cover_art = cover.name if cover is not None else None
|
||||||
|
|
||||||
@db_session
|
|
||||||
def add_cover(self, path):
|
def add_cover(self, path):
|
||||||
if not isinstance(path, str): # pragma: nocover
|
if not isinstance(path, str): # pragma: nocover
|
||||||
raise TypeError("Expecting string, got " + str(type(path)))
|
raise TypeError("Expecting string, got " + str(type(path)))
|
||||||
|
@ -9,7 +9,6 @@ import logging
|
|||||||
import os.path
|
import os.path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from pony.orm import db_session
|
|
||||||
from threading import Thread, Condition, Timer
|
from threading import Thread, Condition, Timer
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.events import PatternMatchingEventHandler
|
from watchdog.events import PatternMatchingEventHandler
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
# This file is part of Supysonic.
|
# This file is part of Supysonic.
|
||||||
# Supysonic is a Python implementation of the Subsonic server API.
|
# Supysonic is a Python implementation of the Subsonic server API.
|
||||||
#
|
#
|
||||||
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
|
# Copyright (C) 2017-2022 Alban 'spl0k' Féron
|
||||||
# 2017 Óscar García Amor
|
# 2017 Óscar García Amor
|
||||||
#
|
#
|
||||||
# Distributed under terms of the GNU AGPLv3 license.
|
# Distributed under terms of the GNU AGPLv3 license.
|
||||||
|
|
||||||
from supysonic import db
|
from supysonic.db import Folder, Album, Artist, Track, init_database, release_database
|
||||||
from supysonic.managers.folder import FolderManager
|
from supysonic.managers.folder import FolderManager
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@ -14,20 +14,18 @@ import shutil
|
|||||||
import tempfile
|
import tempfile
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from pony.orm import db_session, ObjectNotFound
|
|
||||||
|
|
||||||
|
|
||||||
class FolderManagerTestCase(unittest.TestCase):
|
class FolderManagerTestCase(unittest.TestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
# Create an empty sqlite database in memory
|
# Create an empty sqlite database in memory
|
||||||
db.init_database("sqlite:")
|
init_database("sqlite:")
|
||||||
|
|
||||||
# Create some temporary directories
|
# Create some temporary directories
|
||||||
self.media_dir = tempfile.mkdtemp()
|
self.media_dir = tempfile.mkdtemp()
|
||||||
self.music_dir = tempfile.mkdtemp()
|
self.music_dir = tempfile.mkdtemp()
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
db.release_database()
|
release_database()
|
||||||
shutil.rmtree(self.media_dir)
|
shutil.rmtree(self.media_dir)
|
||||||
shutil.rmtree(self.music_dir)
|
shutil.rmtree(self.music_dir)
|
||||||
|
|
||||||
@ -36,15 +34,15 @@ class FolderManagerTestCase(unittest.TestCase):
|
|||||||
self.assertIsNotNone(FolderManager.add("media", self.media_dir))
|
self.assertIsNotNone(FolderManager.add("media", self.media_dir))
|
||||||
self.assertIsNotNone(FolderManager.add("music", self.music_dir))
|
self.assertIsNotNone(FolderManager.add("music", self.music_dir))
|
||||||
|
|
||||||
db.Folder(
|
Folder.create(
|
||||||
root=False, name="non-root", path=os.path.join(self.music_dir, "subfolder")
|
root=False, name="non-root", path=os.path.join(self.music_dir, "subfolder")
|
||||||
)
|
)
|
||||||
|
|
||||||
artist = db.Artist(name="Artist")
|
artist = Artist.create(name="Artist")
|
||||||
album = db.Album(name="Album", artist=artist)
|
album = Album.create(name="Album", artist=artist)
|
||||||
|
|
||||||
root = db.Folder.get(name="media")
|
root = Folder.get(name="media")
|
||||||
db.Track(
|
Track(
|
||||||
title="Track",
|
title="Track",
|
||||||
artist=artist,
|
artist=artist,
|
||||||
album=album,
|
album=album,
|
||||||
@ -58,95 +56,86 @@ class FolderManagerTestCase(unittest.TestCase):
|
|||||||
last_modification=0,
|
last_modification=0,
|
||||||
)
|
)
|
||||||
|
|
||||||
@db_session
|
|
||||||
def test_get_folder(self):
|
def test_get_folder(self):
|
||||||
self.create_folders()
|
self.create_folders()
|
||||||
|
|
||||||
# Get existing folders
|
# Get existing folders
|
||||||
for name in ["media", "music"]:
|
for name in ["media", "music"]:
|
||||||
folder = db.Folder.get(name=name, root=True)
|
folder = Folder.get(name=name, root=True)
|
||||||
self.assertEqual(FolderManager.get(folder.id), folder)
|
self.assertEqual(FolderManager.get(folder.id), folder)
|
||||||
|
|
||||||
# Get with invalid UUID
|
# Get with invalid id
|
||||||
self.assertRaises(ValueError, FolderManager.get, "invalid-uuid")
|
self.assertRaises(ValueError, FolderManager.get, "invalid-uuid")
|
||||||
self.assertRaises(ValueError, FolderManager.get, 0xDEADBEEF)
|
|
||||||
|
|
||||||
# Non-existent folder
|
# Non-existent folder
|
||||||
self.assertRaises(ObjectNotFound, FolderManager.get, 1234567890)
|
self.assertRaises(Folder.DoesNotExist, FolderManager.get, 1234567890)
|
||||||
|
|
||||||
@db_session
|
|
||||||
def test_add_folder(self):
|
def test_add_folder(self):
|
||||||
self.create_folders()
|
self.create_folders()
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
|
|
||||||
# Create duplicate
|
# Create duplicate
|
||||||
self.assertRaises(ValueError, FolderManager.add, "media", self.media_dir)
|
self.assertRaises(ValueError, FolderManager.add, "media", self.media_dir)
|
||||||
self.assertEqual(db.Folder.select(lambda f: f.name == "media").count(), 1)
|
self.assertEqual(Folder.select().where(Folder.name == "media").count(), 1)
|
||||||
|
|
||||||
# Duplicate path
|
# Duplicate path
|
||||||
self.assertRaises(ValueError, FolderManager.add, "new-folder", self.media_dir)
|
self.assertRaises(ValueError, FolderManager.add, "new-folder", self.media_dir)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
db.Folder.select(lambda f: f.path == self.media_dir).count(), 1
|
Folder.select().where(Folder.path == self.media_dir).count(), 1
|
||||||
)
|
)
|
||||||
|
|
||||||
# Invalid path
|
# Invalid path
|
||||||
path = os.path.abspath("/this/not/is/valid")
|
path = os.path.abspath("/this/not/is/valid")
|
||||||
self.assertRaises(ValueError, FolderManager.add, "invalid-path", path)
|
self.assertRaises(ValueError, FolderManager.add, "invalid-path", path)
|
||||||
self.assertFalse(db.Folder.exists(path=path))
|
self.assertFalse(Folder.select().where(Folder.path == path).exists())
|
||||||
|
|
||||||
# Subfolder of already added path
|
# Subfolder of already added path
|
||||||
path = os.path.join(self.media_dir, "subfolder")
|
path = os.path.join(self.media_dir, "subfolder")
|
||||||
os.mkdir(path)
|
os.mkdir(path)
|
||||||
self.assertRaises(ValueError, FolderManager.add, "subfolder", path)
|
self.assertRaises(ValueError, FolderManager.add, "subfolder", path)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
|
|
||||||
# Parent folder of an already added path
|
# Parent folder of an already added path
|
||||||
path = os.path.join(self.media_dir, "..")
|
path = os.path.join(self.media_dir, "..")
|
||||||
self.assertRaises(ValueError, FolderManager.add, "parent", path)
|
self.assertRaises(ValueError, FolderManager.add, "parent", path)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
|
|
||||||
def test_delete_folder(self):
|
def test_delete_folder(self):
|
||||||
with db_session:
|
self.create_folders()
|
||||||
self.create_folders()
|
|
||||||
|
|
||||||
with db_session:
|
# Delete invalid Folder ID
|
||||||
# Delete invalid Folder ID
|
self.assertRaises(ValueError, FolderManager.delete, "invalid-uuid")
|
||||||
self.assertRaises(ValueError, FolderManager.delete, "invalid-uuid")
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
|
||||||
|
|
||||||
# Delete non-existent folder
|
# Delete non-existent folder
|
||||||
self.assertRaises(ObjectNotFound, FolderManager.delete, 1234567890)
|
self.assertRaises(Folder.DoesNotExist, FolderManager.delete, 1234567890)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
|
|
||||||
# Delete non-root folder
|
# Delete non-root folder
|
||||||
folder = db.Folder.get(name="non-root")
|
folder = Folder.get(name="non-root")
|
||||||
self.assertRaises(ObjectNotFound, FolderManager.delete, folder.id)
|
self.assertRaises(Folder.DoesNotExist, FolderManager.delete, folder.id)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
|
|
||||||
with db_session:
|
# Delete existing folders
|
||||||
# Delete existing folders
|
for name in ["media", "music"]:
|
||||||
for name in ["media", "music"]:
|
folder = Folder.get(name=name, root=True)
|
||||||
folder = db.Folder.get(name=name, root=True)
|
FolderManager.delete(folder.id)
|
||||||
FolderManager.delete(folder.id)
|
self.assertRaises(Folder.DoesNotExist, Folder.__getitem__, folder.id)
|
||||||
self.assertRaises(ObjectNotFound, db.Folder.__getitem__, folder.id)
|
|
||||||
|
|
||||||
# Even if we have only 2 root folders, non-root should never exist and be cleaned anyway
|
# Even if we have only 2 root folders, non-root should never exist and be cleaned anyway
|
||||||
self.assertEqual(db.Folder.select().count(), 0)
|
self.assertEqual(Folder.select().count(), 0)
|
||||||
|
|
||||||
def test_delete_by_name(self):
|
def test_delete_by_name(self):
|
||||||
with db_session:
|
self.create_folders()
|
||||||
self.create_folders()
|
|
||||||
|
|
||||||
with db_session:
|
# Delete non-existent folder
|
||||||
# Delete non-existent folder
|
self.assertRaises(Folder.DoesNotExist, FolderManager.delete_by_name, "null")
|
||||||
self.assertRaises(ObjectNotFound, FolderManager.delete_by_name, "null")
|
self.assertEqual(Folder.select().count(), 3)
|
||||||
self.assertEqual(db.Folder.select().count(), 3)
|
|
||||||
|
|
||||||
with db_session:
|
# Delete existing folders
|
||||||
# Delete existing folders
|
for name in ["media", "music"]:
|
||||||
for name in ["media", "music"]:
|
FolderManager.delete_by_name(name)
|
||||||
FolderManager.delete_by_name(name)
|
self.assertFalse(Folder.select().where(Folder.name == name).exists())
|
||||||
self.assertFalse(db.Folder.exists(name=name))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
Loading…
Reference in New Issue
Block a user