mirror of
https://github.com/spl0k/supysonic.git
synced 2024-11-14 14:12:17 +00:00
Scanner get its own thread, one instance might be used for several files
This commit is contained in:
parent
b83538f80b
commit
52891cbf4c
@ -21,6 +21,7 @@
|
|||||||
|
|
||||||
import time, sys
|
import time, sys
|
||||||
import logging
|
import logging
|
||||||
|
from threading import Thread, Lock
|
||||||
from logging.handlers import TimedRotatingFileHandler
|
from logging.handlers import TimedRotatingFileHandler
|
||||||
from watchdog.observers import Observer
|
from watchdog.observers import Observer
|
||||||
from watchdog.events import PatternMatchingEventHandler
|
from watchdog.events import PatternMatchingEventHandler
|
||||||
@ -29,36 +30,83 @@ from supysonic import config
|
|||||||
from supysonic.scanner import Scanner
|
from supysonic.scanner import Scanner
|
||||||
|
|
||||||
class SupysonicWatcherEventHandler(PatternMatchingEventHandler):
|
class SupysonicWatcherEventHandler(PatternMatchingEventHandler):
|
||||||
def __init__(self, logger):
|
def __init__(self, queue, logger):
|
||||||
extensions = config.get('base', 'scanner_extensions')
|
extensions = config.get('base', 'scanner_extensions')
|
||||||
patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None
|
patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None
|
||||||
super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True)
|
super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True)
|
||||||
|
|
||||||
|
self.__queue = queue
|
||||||
self.__logger = logger
|
self.__logger = logger
|
||||||
|
|
||||||
def on_created(self, event):
|
def on_created(self, event):
|
||||||
self.__logger.debug("Scanning created file %s", event.src_path)
|
self.__logger.debug("File created: '%s'", event.src_path)
|
||||||
Scanner(db.session).scan_file(event.src_path)
|
self.__queue.put(event.src_path)
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
def on_deleted(self, event):
|
def on_deleted(self, event):
|
||||||
self.__logger.debug("File %s deleted", event.src_path)
|
self.__logger.debug("File deleted: '%s'", event.src_path)
|
||||||
track = db.Track.query.filter(db.Track.path == event.src_path).first()
|
track = db.Track.query.filter(db.Track.path == event.src_path).first()
|
||||||
if track:
|
if track:
|
||||||
folder = track.root_folder
|
folder = track.root_folder
|
||||||
Scanner(db.session).prune(folder)
|
Scanner(db.session).prune(folder)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
db.session.remove()
|
||||||
else:
|
else:
|
||||||
self.__logger.debug("Deleted file %s not in the database", event.src_path)
|
self.__logger.debug("Deleted file %s not in the database", event.src_path)
|
||||||
|
|
||||||
def on_modified(self, event):
|
def on_modified(self, event):
|
||||||
self.__logger.debug("Scanning modified file %s", event.src_path)
|
self.__logger.debug("File modified: '%s'", event.src_path)
|
||||||
Scanner(db.session).scan_file(event.src_path)
|
self.__queue.put(event.src_path)
|
||||||
db.session.commit()
|
|
||||||
|
|
||||||
def on_moved(self, event):
|
def on_moved(self, event):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
class ScannerProcessingQueue(Thread):
|
||||||
|
def __init__(self, logger):
|
||||||
|
super(ScannerProcessingQueue, self).__init__()
|
||||||
|
|
||||||
|
self.__logger = logger
|
||||||
|
self.__lock = Lock()
|
||||||
|
self.__queue = {}
|
||||||
|
self.__running = True
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
while self.__running:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
with self.__lock:
|
||||||
|
if not self.__queue:
|
||||||
|
continue
|
||||||
|
|
||||||
|
self.__logger.debug("Instantiating scanner")
|
||||||
|
scanner = Scanner(db.session)
|
||||||
|
|
||||||
|
self.__lock.acquire()
|
||||||
|
while self.__queue:
|
||||||
|
path = sorted(self.__queue.iteritems(), key = lambda i: i[1])[0][0]
|
||||||
|
self.__lock.release()
|
||||||
|
|
||||||
|
self.__logger.info("Scanning: '%s'", path)
|
||||||
|
scanner.scan_file(path)
|
||||||
|
|
||||||
|
self.__lock.acquire()
|
||||||
|
del self.__queue[path]
|
||||||
|
self.__lock.release()
|
||||||
|
|
||||||
|
db.session.commit()
|
||||||
|
db.session.remove()
|
||||||
|
self.__logger.debug("Freeing scanner")
|
||||||
|
del scanner
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.__running = False
|
||||||
|
|
||||||
|
def put(self, path):
|
||||||
|
if not self.__running:
|
||||||
|
raise RuntimeError("Trying to put an item in a stopped queue")
|
||||||
|
|
||||||
|
with self.__lock:
|
||||||
|
self.__queue[path] = time.time()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
if not config.check():
|
if not config.check():
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
@ -87,12 +135,15 @@ if __name__ == "__main__":
|
|||||||
logger.info("No folder set. Exiting.")
|
logger.info("No folder set. Exiting.")
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
|
|
||||||
handler = SupysonicWatcherEventHandler(logger)
|
queue = ScannerProcessingQueue(logger)
|
||||||
|
handler = SupysonicWatcherEventHandler(queue, logger)
|
||||||
observer = Observer()
|
observer = Observer()
|
||||||
|
|
||||||
for folder in db.Folder.query.filter(db.Folder.root == True):
|
for folder in db.Folder.query.filter(db.Folder.root == True):
|
||||||
logger.info("Starting watcher for %s", folder.path)
|
logger.info("Starting watcher for %s", folder.path)
|
||||||
observer.schedule(handler, folder.path, recursive = True)
|
observer.schedule(handler, folder.path, recursive = True)
|
||||||
|
|
||||||
|
queue.start()
|
||||||
observer.start()
|
observer.start()
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
@ -101,4 +152,6 @@ if __name__ == "__main__":
|
|||||||
logger.info("Stopping watcher")
|
logger.info("Stopping watcher")
|
||||||
observer.stop()
|
observer.stop()
|
||||||
observer.join()
|
observer.join()
|
||||||
|
queue.stop()
|
||||||
|
queue.join()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user