1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 17:06:17 +00:00

Untabify watcher

This commit is contained in:
spl0k 2017-10-08 12:40:49 +02:00
parent 68aaf145ad
commit 81609be7ce

View File

@ -35,228 +35,228 @@ OP_MOVE = 4
FLAG_CREATE = 8 FLAG_CREATE = 8
class SupysonicWatcherEventHandler(PatternMatchingEventHandler): class SupysonicWatcherEventHandler(PatternMatchingEventHandler):
def __init__(self, queue, logger): def __init__(self, queue, logger):
extensions = config.get('base', 'scanner_extensions') extensions = config.get('base', 'scanner_extensions')
patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None
super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True) super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True)
self.__queue = queue self.__queue = queue
self.__logger = logger self.__logger = logger
def dispatch(self, event): def dispatch(self, event):
try: try:
super(SupysonicWatcherEventHandler, self).dispatch(event) super(SupysonicWatcherEventHandler, self).dispatch(event)
except Exception, e: except Exception, e:
self.__logger.critical(e) self.__logger.critical(e)
def on_created(self, event): def on_created(self, event):
self.__logger.debug("File created: '%s'", event.src_path) self.__logger.debug("File created: '%s'", event.src_path)
self.__queue.put(event.src_path, OP_SCAN | FLAG_CREATE) self.__queue.put(event.src_path, OP_SCAN | FLAG_CREATE)
def on_deleted(self, event): def on_deleted(self, event):
self.__logger.debug("File deleted: '%s'", event.src_path) self.__logger.debug("File deleted: '%s'", event.src_path)
self.__queue.put(event.src_path, OP_REMOVE) self.__queue.put(event.src_path, OP_REMOVE)
def on_modified(self, event): def on_modified(self, event):
self.__logger.debug("File modified: '%s'", event.src_path) self.__logger.debug("File modified: '%s'", event.src_path)
self.__queue.put(event.src_path, OP_SCAN) self.__queue.put(event.src_path, OP_SCAN)
def on_moved(self, event): def on_moved(self, event):
self.__logger.debug("File moved: '%s' -> '%s'", event.src_path, event.dest_path) self.__logger.debug("File moved: '%s' -> '%s'", event.src_path, event.dest_path)
self.__queue.put(event.dest_path, OP_MOVE, src_path = event.src_path) self.__queue.put(event.dest_path, OP_MOVE, src_path = event.src_path)
class Event(object): class Event(object):
def __init__(self, path, operation, **kwargs): def __init__(self, path, operation, **kwargs):
if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE):
raise Exception("Flags SCAN and REMOVE both set") raise Exception("Flags SCAN and REMOVE both set")
self.__path = path self.__path = path
self.__time = time.time() self.__time = time.time()
self.__op = operation self.__op = operation
self.__src = kwargs.get("src_path") self.__src = kwargs.get("src_path")
def set(self, operation, **kwargs): def set(self, operation, **kwargs):
if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE):
raise Exception("Flags SCAN and REMOVE both set") raise Exception("Flags SCAN and REMOVE both set")
self.__time = time.time() self.__time = time.time()
if operation & OP_SCAN: if operation & OP_SCAN:
self.__op &= ~OP_REMOVE self.__op &= ~OP_REMOVE
if operation & OP_REMOVE: if operation & OP_REMOVE:
self.__op &= ~OP_SCAN self.__op &= ~OP_SCAN
if operation & FLAG_CREATE: if operation & FLAG_CREATE:
self.__op &= ~OP_MOVE self.__op &= ~OP_MOVE
self.__op |= operation self.__op |= operation
src_path = kwargs.get("src_path") src_path = kwargs.get("src_path")
if src_path: if src_path:
self.__src = src_path self.__src = src_path
@property @property
def path(self): def path(self):
return self.__path return self.__path
@property @property
def time(self): def time(self):
return self.__time return self.__time
@property @property
def operation(self): def operation(self):
return self.__op return self.__op
@property @property
def src_path(self): def src_path(self):
return self.__src return self.__src
class ScannerProcessingQueue(Thread): class ScannerProcessingQueue(Thread):
def __init__(self, logger): def __init__(self, logger):
super(ScannerProcessingQueue, self).__init__() super(ScannerProcessingQueue, self).__init__()
self.__logger = logger self.__logger = logger
self.__cond = Condition() self.__cond = Condition()
self.__timer = None self.__timer = None
self.__queue = {} self.__queue = {}
self.__running = True self.__running = True
def run(self): def run(self):
try: try:
self.__run() self.__run()
except Exception, e: except Exception, e:
self.__logger.critical(e) self.__logger.critical(e)
def __run(self): def __run(self):
while self.__running: while self.__running:
time.sleep(0.1) time.sleep(0.1)
with self.__cond: with self.__cond:
self.__cond.wait() self.__cond.wait()
if not self.__queue: if not self.__queue:
continue continue
self.__logger.debug("Instantiating scanner") self.__logger.debug("Instantiating scanner")
store = db.get_store(config.get('base', 'database_uri')) store = db.get_store(config.get('base', 'database_uri'))
scanner = Scanner(store) scanner = Scanner(store)
item = self.__next_item() item = self.__next_item()
while item: while item:
if item.operation & OP_MOVE: if item.operation & OP_MOVE:
self.__logger.info("Moving: '%s' -> '%s'", item.src_path, item.path) self.__logger.info("Moving: '%s' -> '%s'", item.src_path, item.path)
scanner.move_file(item.src_path, item.path) scanner.move_file(item.src_path, item.path)
if item.operation & OP_SCAN: if item.operation & OP_SCAN:
self.__logger.info("Scanning: '%s'", item.path) self.__logger.info("Scanning: '%s'", item.path)
scanner.scan_file(item.path) scanner.scan_file(item.path)
if item.operation & OP_REMOVE: if item.operation & OP_REMOVE:
self.__logger.info("Removing: '%s'", item.path) self.__logger.info("Removing: '%s'", item.path)
scanner.remove_file(item.path) scanner.remove_file(item.path)
item = self.__next_item() item = self.__next_item()
scanner.finish() scanner.finish()
store.commit() store.commit()
store.close() store.close()
self.__logger.debug("Freeing scanner") self.__logger.debug("Freeing scanner")
del scanner del scanner
def stop(self): def stop(self):
self.__running = False self.__running = False
with self.__cond: with self.__cond:
self.__cond.notify() self.__cond.notify()
def put(self, path, operation, **kwargs): def put(self, path, operation, **kwargs):
if not self.__running: if not self.__running:
raise RuntimeError("Trying to put an item in a stopped queue") raise RuntimeError("Trying to put an item in a stopped queue")
with self.__cond: with self.__cond:
if path in self.__queue: if path in self.__queue:
event = self.__queue[path] event = self.__queue[path]
event.set(operation, **kwargs) event.set(operation, **kwargs)
else: else:
event = Event(path, operation, **kwargs) event = Event(path, operation, **kwargs)
self.__queue[path] = event self.__queue[path] = event
if operation & OP_MOVE and kwargs["src_path"] in self.__queue: if operation & OP_MOVE and kwargs["src_path"] in self.__queue:
previous = self.__queue[kwargs["src_path"]] previous = self.__queue[kwargs["src_path"]]
event.set(previous.operation, src_path = previous.src_path) event.set(previous.operation, src_path = previous.src_path)
del self.__queue[kwargs["src_path"]] del self.__queue[kwargs["src_path"]]
if self.__timer: if self.__timer:
self.__timer.cancel() self.__timer.cancel()
self.__timer = Timer(5, self.__wakeup) self.__timer = Timer(5, self.__wakeup)
self.__timer.start() self.__timer.start()
def __wakeup(self): def __wakeup(self):
with self.__cond: with self.__cond:
self.__cond.notify() self.__cond.notify()
self.__timer = None self.__timer = None
def __next_item(self): def __next_item(self):
with self.__cond: with self.__cond:
if not self.__queue: if not self.__queue:
return None return None
next = min(self.__queue.iteritems(), key = lambda i: i[1].time) next = min(self.__queue.iteritems(), key = lambda i: i[1].time)
if not self.__running or next[1].time + 5 <= time.time(): if not self.__running or next[1].time + 5 <= time.time():
del self.__queue[next[0]] del self.__queue[next[0]]
return next[1] return next[1]
return None return None
class SupysonicWatcher(object): class SupysonicWatcher(object):
def run(self): def run(self):
if not config.check(): if not config.check():
return return
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if config.get('daemon', 'log_file'): if config.get('daemon', 'log_file'):
log_handler = TimedRotatingFileHandler(config.get('daemon', 'log_file'), when = 'midnight') log_handler = TimedRotatingFileHandler(config.get('daemon', 'log_file'), when = 'midnight')
else: else:
log_handler = logging.NullHandler() log_handler = logging.NullHandler()
log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s"))
logger.addHandler(log_handler) logger.addHandler(log_handler)
if config.get('daemon', 'log_level'): if config.get('daemon', 'log_level'):
mapping = { mapping = {
'DEBUG': logging.DEBUG, 'DEBUG': logging.DEBUG,
'INFO': logging.INFO, 'INFO': logging.INFO,
'WARNING': logging.WARNING, 'WARNING': logging.WARNING,
'ERROR': logging.ERROR, 'ERROR': logging.ERROR,
'CRTICAL': logging.CRITICAL 'CRTICAL': logging.CRITICAL
} }
logger.setLevel(mapping.get(config.get('daemon', 'log_level').upper(), logging.NOTSET)) logger.setLevel(mapping.get(config.get('daemon', 'log_level').upper(), logging.NOTSET))
store = db.get_store(config.get('base', 'database_uri')) store = db.get_store(config.get('base', 'database_uri'))
folders = store.find(db.Folder, db.Folder.root == True) folders = store.find(db.Folder, db.Folder.root == True)
if not folders.count(): if not folders.count():
logger.info("No folder set. Exiting.") logger.info("No folder set. Exiting.")
store.close() store.close()
return return
queue = ScannerProcessingQueue(logger) queue = ScannerProcessingQueue(logger)
handler = SupysonicWatcherEventHandler(queue, logger) handler = SupysonicWatcherEventHandler(queue, logger)
observer = Observer() observer = Observer()
for folder in folders: for folder in folders:
logger.info("Starting watcher for %s", folder.path) logger.info("Starting watcher for %s", folder.path)
observer.schedule(handler, folder.path, recursive = True) observer.schedule(handler, folder.path, recursive = True)
store.close() store.close()
signal(SIGTERM, self.__terminate) signal(SIGTERM, self.__terminate)
self.__running = True self.__running = True
queue.start() queue.start()
observer.start() observer.start()
while self.__running: while self.__running:
time.sleep(2) time.sleep(2)
logger.info("Stopping watcher") logger.info("Stopping watcher")
observer.stop() observer.stop()
observer.join() observer.join()
queue.stop() queue.stop()
queue.join() queue.join()
def stop(self): def stop(self):
self.__running = False self.__running = False
def __terminate(self, signum, frame): def __terminate(self, signum, frame):
self.stop() self.stop()