mirror of
https://github.com/spl0k/supysonic.git
synced 2024-12-22 17:06:17 +00:00
The scanner is now a stoppable thread
This commit is contained in:
parent
e210f25bb3
commit
7bd4c54e98
@ -3,7 +3,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2013-2018 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2013-2019 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
@ -13,7 +13,7 @@ import getpass
|
||||
import sys
|
||||
import time
|
||||
|
||||
from pony.orm import db_session
|
||||
from pony.orm import db_session, select
|
||||
from pony.orm import ObjectNotFound
|
||||
|
||||
from .daemon.client import DaemonClient
|
||||
@ -24,19 +24,15 @@ from .managers.user import UserManager
|
||||
from .scanner import Scanner
|
||||
|
||||
class TimedProgressDisplay:
|
||||
def __init__(self, name, stdout, interval = 5):
|
||||
self.__name = name
|
||||
def __init__(self, stdout, interval = 5):
|
||||
self.__stdout = stdout
|
||||
self.__interval = interval
|
||||
self.__last_display = 0
|
||||
self.__last_len = 0
|
||||
|
||||
def __call__(self, scanned):
|
||||
def __call__(self, name, scanned):
|
||||
if time.time() - self.__last_display > self.__interval:
|
||||
if not self.__last_len:
|
||||
self.__stdout.write("Scanning '{0}': ".format(self.__name))
|
||||
|
||||
progress = '{0} files scanned'.format(scanned)
|
||||
progress = "Scanning '{0}': {1} files scanned".format(name, scanned)
|
||||
self.__stdout.write('\b' * self.__last_len)
|
||||
self.__stdout.write(progress)
|
||||
self.__stdout.flush()
|
||||
@ -195,23 +191,21 @@ class SupysonicCLI(cmd.Cmd):
|
||||
if extensions:
|
||||
extensions = extensions.split(' ')
|
||||
|
||||
scanner = Scanner(force = force, extensions = extensions)
|
||||
scanner = Scanner(force = force, extensions = extensions, progress = TimedProgressDisplay(self.stdout))
|
||||
|
||||
if folders:
|
||||
fstrs = folders
|
||||
folders = Folder.select(lambda f: f.root and f.name in fstrs)[:]
|
||||
notfound = set(fstrs) - set(map(lambda f: f.name, folders))
|
||||
folders = select(f.name for f in Folder if f.root and f.name in fstrs)[:]
|
||||
notfound = set(fstrs) - set(folders)
|
||||
if notfound:
|
||||
self.write_line("No such folder(s): " + ' '.join(notfound))
|
||||
for folder in folders:
|
||||
scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout))
|
||||
self.write_line()
|
||||
scanner.queue_folder(folder)
|
||||
else:
|
||||
for folder in Folder.select(lambda f: f.root):
|
||||
scanner.scan(folder, TimedProgressDisplay(folder.name, self.stdout))
|
||||
self.write_line()
|
||||
for folder in select(f.name for f in Folder if f.root):
|
||||
scanner.queue_folder(folder)
|
||||
|
||||
scanner.finish()
|
||||
scanner.run()
|
||||
stats = scanner.stats()
|
||||
|
||||
self.write_line('Scanning done')
|
||||
|
@ -8,10 +8,6 @@
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
import logging
|
||||
try:
|
||||
from queue import Queue, Empty
|
||||
except ImportError:
|
||||
from Queue import Queue, Empty
|
||||
|
||||
from multiprocessing.connection import Listener
|
||||
from pony.orm import db_session, select
|
||||
@ -71,65 +67,24 @@ class Daemon(object):
|
||||
if extensions:
|
||||
extensions = extensions.split(' ')
|
||||
|
||||
self.__scanner = ScannerThread(self.__watcher, kwargs = { 'force': force, 'extensions': extensions, 'notify_watcher': False })
|
||||
self.__scanner = Scanner(force = force, extensions = extensions, on_folder_start = self.__unwatch, on_folder_end = self.__watch)
|
||||
for f in folders:
|
||||
self.__scanner.queue_folder(f)
|
||||
|
||||
self.__scanner.start()
|
||||
|
||||
def __watch(self, folder):
|
||||
if self.__watcher is not None:
|
||||
self.__watcher.add_folder(folder.path)
|
||||
|
||||
def __unwatch(self, folder):
|
||||
if self.__watcher is not None:
|
||||
self.__watcher.remove_folder(folder.path)
|
||||
|
||||
def terminate(self):
|
||||
self.__listener.close()
|
||||
if self.__scanner is not None:
|
||||
self.__scanner.stop()
|
||||
self.__scanner.join()
|
||||
if self.__watcher is not None:
|
||||
self.__watcher.stop()
|
||||
|
||||
class ScanQueue(Queue):
|
||||
def _init(self, maxsize):
|
||||
self.queue = set()
|
||||
self.__last_got = None
|
||||
|
||||
def _put(self, item):
|
||||
if self.__last_got != item:
|
||||
self.queue.add(item)
|
||||
|
||||
def _get(self):
|
||||
self.__last_got = self.queue.pop()
|
||||
return self.__last_got
|
||||
|
||||
class ScannerThread(Thread):
|
||||
def __init__(self, watcher, *args, **kwargs):
|
||||
super(ScannerThread, self).__init__(*args, **kwargs)
|
||||
self.__watcher = watcher
|
||||
self.__scanned = {}
|
||||
self.__queue = ScanQueue()
|
||||
|
||||
def queue_folder(self, folder):
|
||||
self.__queue.put(folder)
|
||||
|
||||
def run(self):
|
||||
s = Scanner(*self._args, **self._kwargs)
|
||||
|
||||
with db_session:
|
||||
try:
|
||||
while True:
|
||||
name = self.__queue.get(False)
|
||||
folder = Folder.get(root = True, name = name)
|
||||
if folder is None:
|
||||
continue
|
||||
|
||||
if self.__watcher is not None:
|
||||
self.__watcher.remove_folder(folder)
|
||||
try:
|
||||
logger.info('Scanning %s', name)
|
||||
s.scan(folder, lambda x: self.__scanned.update({ name: x }))
|
||||
finally:
|
||||
if self.__watcher is not None:
|
||||
self.__watcher.add_folder(folder)
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
s.finish()
|
||||
|
||||
@property
|
||||
def scanned(self):
|
||||
# This isn't quite thread-safe but locking each time a file is scanned could affect performance
|
||||
return sum(self.__scanned.values())
|
||||
|
@ -3,7 +3,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2018 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2018-2019 Alban 'spl0k' Féron
|
||||
# 2018-2019 Carey 'pR0Ps' Metcalfe
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
@ -33,6 +33,11 @@ except ImportError:
|
||||
pass
|
||||
os.rename(src, dst)
|
||||
|
||||
try:
|
||||
from queue import Queue, Empty as QueueEmpty
|
||||
except ImportError:
|
||||
from Queue import Queue, Empty as QueueEmpty
|
||||
|
||||
try:
|
||||
# Python 2
|
||||
strtype = basestring
|
||||
|
@ -3,7 +3,7 @@
|
||||
# This file is part of Supysonic.
|
||||
# Supysonic is a Python implementation of the Subsonic server API.
|
||||
#
|
||||
# Copyright (C) 2013-2018 Alban 'spl0k' Féron
|
||||
# Copyright (C) 2013-2019 Alban 'spl0k' Féron
|
||||
#
|
||||
# Distributed under terms of the GNU AGPLv3 license.
|
||||
|
||||
@ -14,6 +14,7 @@ import time
|
||||
|
||||
from datetime import datetime
|
||||
from pony.orm import db_session
|
||||
from threading import Thread, Event
|
||||
|
||||
from .covers import find_cover_in_folder, CoverFile
|
||||
from .daemon.exceptions import DaemonUnavailableError
|
||||
@ -21,7 +22,7 @@ from .daemon.client import DaemonClient
|
||||
from .db import Folder, Artist, Album, Track, User
|
||||
from .db import StarredFolder, StarredArtist, StarredAlbum, StarredTrack
|
||||
from .db import RatingFolder, RatingTrack
|
||||
from .py23 import strtype
|
||||
from .py23 import strtype, Queue, QueueEmpty
|
||||
|
||||
class StatsDetails(object):
|
||||
def __init__(self):
|
||||
@ -31,34 +32,98 @@ class StatsDetails(object):
|
||||
|
||||
class Stats(object):
|
||||
def __init__(self):
|
||||
self.scanned = 0
|
||||
self.added = StatsDetails()
|
||||
self.deleted = StatsDetails()
|
||||
self.errors = []
|
||||
|
||||
class Scanner:
|
||||
def __init__(self, force = False, extensions = None, notify_watcher = True):
|
||||
class ScanQueue(Queue):
|
||||
def _init(self, maxsize):
|
||||
self.queue = set()
|
||||
self.__last_got = None
|
||||
|
||||
def _put(self, item):
|
||||
if self.__last_got != item:
|
||||
self.queue.add(item)
|
||||
|
||||
def _get(self):
|
||||
self.__last_got = self.queue.pop()
|
||||
return self.__last_got
|
||||
|
||||
def _unwatch_folder(folder):
|
||||
daemon = DaemonClient()
|
||||
try: daemon.remove_watched_folder(folder.path)
|
||||
except DaemonUnavailableError: pass
|
||||
|
||||
def _watch_folder(folder):
|
||||
daemon = DaemonClient()
|
||||
try: daemon.add_watched_folder(folder.path)
|
||||
except DaemonUnavailableError: pass
|
||||
|
||||
class Scanner(Thread):
|
||||
def __init__(self, force = False, extensions = None, progress = None,
|
||||
on_folder_start = _unwatch_folder, on_folder_end = _watch_folder, on_done = None):
|
||||
super(Scanner, self).__init__()
|
||||
|
||||
if extensions is not None and not isinstance(extensions, list):
|
||||
raise TypeError('Invalid extensions type')
|
||||
|
||||
self.__force = force
|
||||
self.__notify = notify_watcher
|
||||
|
||||
self.__stats = Stats()
|
||||
self.__extensions = extensions
|
||||
|
||||
def scan(self, folder, progress_callback = None):
|
||||
if not isinstance(folder, Folder):
|
||||
raise TypeError('Expecting Folder instance, got ' + str(type(folder)))
|
||||
self.__progress = progress
|
||||
self.__on_folder_start = on_folder_start
|
||||
self.__on_folder_end = on_folder_end
|
||||
self.__on_done = on_done
|
||||
|
||||
if self.__notify:
|
||||
daemon = DaemonClient()
|
||||
try: daemon.remove_watched_folder(folder.path)
|
||||
except DaemonUnavailableError: pass
|
||||
self.__stopped = Event()
|
||||
self.__queue = ScanQueue()
|
||||
self.__stats = Stats()
|
||||
|
||||
scanned = property(lambda self: self.__stats.scanned)
|
||||
|
||||
def __report_progress(self, folder_name, scanned):
|
||||
if self.__progress is None:
|
||||
return
|
||||
|
||||
self.__progress(folder_name, scanned)
|
||||
|
||||
def queue_folder(self, folder_name):
|
||||
if not isinstance(folder_name, strtype):
|
||||
raise TypeError('Expecting string, got ' + str(type(folder_name)))
|
||||
|
||||
self.__queue.put(folder_name)
|
||||
|
||||
@db_session
|
||||
def run(self):
|
||||
while not self.__stopped.is_set():
|
||||
try:
|
||||
folder_name = self.__queue.get(False)
|
||||
except QueueEmpty:
|
||||
break
|
||||
|
||||
folder = Folder.get(name = folder_name, root = True)
|
||||
if folder is None:
|
||||
continue
|
||||
|
||||
self.__scan_folder(folder)
|
||||
|
||||
self.prune()
|
||||
|
||||
if self.__on_done is not None:
|
||||
self.__on_done()
|
||||
|
||||
def stop(self):
|
||||
self.__stopped.set()
|
||||
|
||||
def __scan_folder(self, folder):
|
||||
if self.__on_folder_start is not None:
|
||||
self.__on_folder_start(folder)
|
||||
|
||||
# Scan new/updated files
|
||||
to_scan = [ folder.path ]
|
||||
scanned = 0
|
||||
while to_scan:
|
||||
while not self.__stopped.is_set() and to_scan:
|
||||
path = to_scan.pop()
|
||||
|
||||
try:
|
||||
@ -80,19 +145,20 @@ class Scanner:
|
||||
to_scan.append(full_path)
|
||||
elif os.path.isfile(full_path) and self.__is_valid_path(full_path):
|
||||
self.scan_file(full_path)
|
||||
self.__stats.scanned += 1
|
||||
scanned += 1
|
||||
|
||||
if progress_callback:
|
||||
progress_callback(scanned)
|
||||
self.__report_progress(folder.name, scanned)
|
||||
|
||||
# Remove files that have been deleted
|
||||
for track in Track.select(lambda t: t.root_folder == folder):
|
||||
if not self.__is_valid_path(track.path):
|
||||
self.remove_file(track.path)
|
||||
if not self.__stopped.is_set():
|
||||
for track in Track.select(lambda t: t.root_folder == folder):
|
||||
if not self.__is_valid_path(track.path):
|
||||
self.remove_file(track.path)
|
||||
|
||||
# Remove deleted/moved folders and update cover art info
|
||||
folders = [ folder ]
|
||||
while folders:
|
||||
while not self.__stopped.is_set() and folders:
|
||||
f = folders.pop()
|
||||
|
||||
if not f.root and not os.path.isdir(f.path):
|
||||
@ -102,14 +168,17 @@ class Scanner:
|
||||
self.find_cover(f.path)
|
||||
folders += f.children
|
||||
|
||||
folder.last_scan = int(time.time())
|
||||
if not self.__stopped.is_set():
|
||||
folder.last_scan = int(time.time())
|
||||
|
||||
if self.__notify:
|
||||
try: daemon.add_watched_folder(folder.path)
|
||||
except DaemonUnavailableError: pass
|
||||
if self.__on_folder_end is not None:
|
||||
self.__on_folder_end(folder)
|
||||
|
||||
@db_session
|
||||
def finish(self):
|
||||
def prune(self):
|
||||
if self.__stopped.is_set():
|
||||
return
|
||||
|
||||
self.__stats.deleted.albums = Album.prune()
|
||||
self.__stats.deleted.artists = Artist.prune()
|
||||
Folder.prune()
|
||||
|
@ -162,7 +162,7 @@ class ScannerProcessingQueue(Thread):
|
||||
|
||||
item = self.__next_item()
|
||||
|
||||
scanner.finish()
|
||||
scanner.prune()
|
||||
logger.debug("Freeing scanner")
|
||||
del scanner
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user