mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-22 17:06:17 +00:00
Queue scans rather than rejecting them
This commit is contained in:
parent
d2ff37428f
commit
7bbbdac41c
@ -17,7 +17,7 @@ from pony.orm import db_session
|
|||||||
from pony.orm import ObjectNotFound
|
from pony.orm import ObjectNotFound
|
||||||
|
|
||||||
from .daemon.client import DaemonClient
|
from .daemon.client import DaemonClient
|
||||||
from .daemon.exceptions import DaemonUnavailableError, ScannerAlreadyRunningError
|
from .daemon.exceptions import DaemonUnavailableError
|
||||||
from .db import Folder, User
|
from .db import Folder, User
|
||||||
from .managers.folder import FolderManager
|
from .managers.folder import FolderManager
|
||||||
from .managers.user import UserManager
|
from .managers.user import UserManager
|
||||||
@ -180,10 +180,7 @@ class SupysonicCLI(cmd.Cmd):
|
|||||||
self.__folder_scan_foreground(folders, force)
|
self.__folder_scan_foreground(folders, force)
|
||||||
|
|
||||||
def __folder_scan_background(self, folders, force):
|
def __folder_scan_background(self, folders, force):
|
||||||
try:
|
self.__daemon.scan(folders, force)
|
||||||
self.__daemon.scan(folders, force)
|
|
||||||
except ScannerAlreadyRunningError:
|
|
||||||
self.write_error_line('The daemon is already scanning, please try again later')
|
|
||||||
|
|
||||||
def __folder_scan_foreground(self, folders, force):
|
def __folder_scan_foreground(self, folders, force):
|
||||||
try:
|
try:
|
||||||
|
@ -8,13 +8,16 @@
|
|||||||
# Distributed under terms of the GNU AGPLv3 license.
|
# Distributed under terms of the GNU AGPLv3 license.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
try:
|
||||||
|
from queue import Queue, Empty
|
||||||
|
except ImportError:
|
||||||
|
from Queue import Queue, Empty
|
||||||
|
|
||||||
from multiprocessing.connection import Listener
|
from multiprocessing.connection import Listener
|
||||||
from pony.orm import db_session
|
from pony.orm import db_session, select
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
from .client import DaemonCommand
|
from .client import DaemonCommand
|
||||||
from .exceptions import ScannerAlreadyRunningError
|
|
||||||
from ..db import Folder
|
from ..db import Folder
|
||||||
from ..scanner import Scanner
|
from ..scanner import Scanner
|
||||||
from ..utils import get_secret_key
|
from ..utils import get_secret_key
|
||||||
@ -55,14 +58,23 @@ class Daemon(object):
|
|||||||
self.__handle_connection(conn)
|
self.__handle_connection(conn)
|
||||||
|
|
||||||
def start_scan(self, folders = [], force = False):
|
def start_scan(self, folders = [], force = False):
|
||||||
|
if not folders:
|
||||||
|
with db_session:
|
||||||
|
folders = select(f.name for f in Folder if f.root)[:]
|
||||||
|
|
||||||
if self.__scanner is not None and self.__scanner.is_alive():
|
if self.__scanner is not None and self.__scanner.is_alive():
|
||||||
raise ScannerAlreadyRunningError()
|
for f in folders:
|
||||||
|
self.__scanner.queue_folder(f)
|
||||||
|
return
|
||||||
|
|
||||||
extensions = self.__config.BASE['scanner_extensions']
|
extensions = self.__config.BASE['scanner_extensions']
|
||||||
if extensions:
|
if extensions:
|
||||||
extensions = extensions.split(' ')
|
extensions = extensions.split(' ')
|
||||||
|
|
||||||
self.__scanner = ScannerThread(self.__watcher, folders, kwargs = { 'force': force, 'extensions': extensions, 'notify_watcher': False })
|
self.__scanner = ScannerThread(self.__watcher, kwargs = { 'force': force, 'extensions': extensions, 'notify_watcher': False })
|
||||||
|
for f in folders:
|
||||||
|
self.__scanner.queue_folder(f)
|
||||||
|
|
||||||
self.__scanner.start()
|
self.__scanner.start()
|
||||||
|
|
||||||
def terminate(self):
|
def terminate(self):
|
||||||
@ -70,32 +82,50 @@ class Daemon(object):
|
|||||||
if self.__watcher is not None:
|
if self.__watcher is not None:
|
||||||
self.__watcher.stop()
|
self.__watcher.stop()
|
||||||
|
|
||||||
|
class ScanQueue(Queue):
|
||||||
|
def _init(self, maxsize):
|
||||||
|
self.queue = set()
|
||||||
|
self.__last_got = None
|
||||||
|
|
||||||
|
def _put(self, item):
|
||||||
|
if self.__last_got != item:
|
||||||
|
self.queue.add(item)
|
||||||
|
|
||||||
|
def _get(self):
|
||||||
|
self.__last_got = self.queue.pop()
|
||||||
|
return self.__last_got
|
||||||
|
|
||||||
class ScannerThread(Thread):
|
class ScannerThread(Thread):
|
||||||
def __init__(self, watcher, folders, *args, **kwargs):
|
def __init__(self, watcher, *args, **kwargs):
|
||||||
super(ScannerThread, self).__init__(*args, **kwargs)
|
super(ScannerThread, self).__init__(*args, **kwargs)
|
||||||
self.__watcher = watcher
|
self.__watcher = watcher
|
||||||
self.__folders = folders
|
|
||||||
self.__scanned = {}
|
self.__scanned = {}
|
||||||
|
self.__queue = ScanQueue()
|
||||||
|
|
||||||
|
def queue_folder(self, folder):
|
||||||
|
self.__queue.put(folder)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
s = Scanner(*self._args, **self._kwargs)
|
s = Scanner(*self._args, **self._kwargs)
|
||||||
|
|
||||||
with db_session:
|
with db_session:
|
||||||
if self.__folders:
|
try:
|
||||||
folders = Folder.select(lambda f: f.root and f.name in self.__folders)
|
while True:
|
||||||
else:
|
name = self.__queue.get(False)
|
||||||
folders = Folder.select(lambda f: f.root)
|
folder = Folder.get(root = True, name = name)
|
||||||
|
if folder is None:
|
||||||
|
continue
|
||||||
|
|
||||||
for f in folders:
|
|
||||||
name = f.name
|
|
||||||
if self.__watcher is not None:
|
|
||||||
self.__watcher.remove_folder(f)
|
|
||||||
try:
|
|
||||||
logger.info('Scanning %s', name)
|
|
||||||
s.scan(f, lambda x: self.__scanned.update({ name: x }))
|
|
||||||
finally:
|
|
||||||
if self.__watcher is not None:
|
if self.__watcher is not None:
|
||||||
self.__watcher.add_folder(f)
|
self.__watcher.remove_folder(folder)
|
||||||
|
try:
|
||||||
|
logger.info('Scanning %s', name)
|
||||||
|
s.scan(folder, lambda x: self.__scanned.update({ name: x }))
|
||||||
|
finally:
|
||||||
|
if self.__watcher is not None:
|
||||||
|
self.__watcher.add_folder(folder)
|
||||||
|
except Empty:
|
||||||
|
pass
|
||||||
|
|
||||||
s.finish()
|
s.finish()
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@
|
|||||||
|
|
||||||
from multiprocessing.connection import Client
|
from multiprocessing.connection import Client
|
||||||
|
|
||||||
from .exceptions import DaemonUnavailableError, ScannerAlreadyRunningError
|
from .exceptions import DaemonUnavailableError
|
||||||
from ..config import get_current_config
|
from ..config import get_current_config
|
||||||
from ..py23 import strtype
|
from ..py23 import strtype
|
||||||
from ..utils import get_secret_key
|
from ..utils import get_secret_key
|
||||||
@ -49,11 +49,7 @@ class ScannerStartCommand(ScannerCommand):
|
|||||||
self.__force = force
|
self.__force = force
|
||||||
|
|
||||||
def apply(self, connection, daemon):
|
def apply(self, connection, daemon):
|
||||||
try:
|
daemon.start_scan(self.__folders, self.__force)
|
||||||
daemon.start_scan(self.__folders, self.__force)
|
|
||||||
connection.send(ScannerStartResult(None))
|
|
||||||
except ScannerAlreadyRunningError as e:
|
|
||||||
connection.send(ScannerStartResult(e))
|
|
||||||
|
|
||||||
class DaemonCommandResult(object):
|
class DaemonCommandResult(object):
|
||||||
pass
|
pass
|
||||||
@ -64,12 +60,6 @@ class ScannerProgressResult(DaemonCommandResult):
|
|||||||
|
|
||||||
scanned = property(lambda self: self.__scanned)
|
scanned = property(lambda self: self.__scanned)
|
||||||
|
|
||||||
class ScannerStartResult(DaemonCommandResult):
|
|
||||||
def __init__(self, exception):
|
|
||||||
self.__exception = exception
|
|
||||||
|
|
||||||
exception = property(lambda self: self.__exception)
|
|
||||||
|
|
||||||
class DaemonClient(object):
|
class DaemonClient(object):
|
||||||
def __init__(self, address = None):
|
def __init__(self, address = None):
|
||||||
self.__address = address or get_current_config().DAEMON['socket']
|
self.__address = address or get_current_config().DAEMON['socket']
|
||||||
@ -105,6 +95,3 @@ class DaemonClient(object):
|
|||||||
raise TypeError('Expecting list, got ' + str(type(folders)))
|
raise TypeError('Expecting list, got ' + str(type(folders)))
|
||||||
with self.__get_connection() as c:
|
with self.__get_connection() as c:
|
||||||
c.send(ScannerStartCommand(folders, force))
|
c.send(ScannerStartCommand(folders, force))
|
||||||
rv = c.recv()
|
|
||||||
if rv.exception is not None:
|
|
||||||
raise rv.exception
|
|
||||||
|
@ -9,6 +9,3 @@
|
|||||||
|
|
||||||
class DaemonUnavailableError(Exception):
|
class DaemonUnavailableError(Exception):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
class ScannerAlreadyRunningError(Exception):
|
|
||||||
pass
|
|
||||||
|
Loading…
Reference in New Issue
Block a user