diff --git a/supysonic/watcher.py b/supysonic/watcher.py index 632023c..d394cd6 100644 --- a/supysonic/watcher.py +++ b/supysonic/watcher.py @@ -35,228 +35,228 @@ OP_MOVE = 4 FLAG_CREATE = 8 class SupysonicWatcherEventHandler(PatternMatchingEventHandler): - def __init__(self, queue, logger): - extensions = config.get('base', 'scanner_extensions') - patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None - super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True) + def __init__(self, queue, logger): + extensions = config.get('base', 'scanner_extensions') + patterns = map(lambda e: "*." + e.lower(), extensions.split()) if extensions else None + super(SupysonicWatcherEventHandler, self).__init__(patterns = patterns, ignore_directories = True) - self.__queue = queue - self.__logger = logger + self.__queue = queue + self.__logger = logger - def dispatch(self, event): - try: - super(SupysonicWatcherEventHandler, self).dispatch(event) - except Exception, e: - self.__logger.critical(e) + def dispatch(self, event): + try: + super(SupysonicWatcherEventHandler, self).dispatch(event) + except Exception, e: + self.__logger.critical(e) - def on_created(self, event): - self.__logger.debug("File created: '%s'", event.src_path) - self.__queue.put(event.src_path, OP_SCAN | FLAG_CREATE) + def on_created(self, event): + self.__logger.debug("File created: '%s'", event.src_path) + self.__queue.put(event.src_path, OP_SCAN | FLAG_CREATE) - def on_deleted(self, event): - self.__logger.debug("File deleted: '%s'", event.src_path) - self.__queue.put(event.src_path, OP_REMOVE) + def on_deleted(self, event): + self.__logger.debug("File deleted: '%s'", event.src_path) + self.__queue.put(event.src_path, OP_REMOVE) - def on_modified(self, event): - self.__logger.debug("File modified: '%s'", event.src_path) - self.__queue.put(event.src_path, OP_SCAN) + def on_modified(self, event): + self.__logger.debug("File modified: '%s'", event.src_path) + self.__queue.put(event.src_path, OP_SCAN) - def on_moved(self, event): - self.__logger.debug("File moved: '%s' -> '%s'", event.src_path, event.dest_path) - self.__queue.put(event.dest_path, OP_MOVE, src_path = event.src_path) + def on_moved(self, event): + self.__logger.debug("File moved: '%s' -> '%s'", event.src_path, event.dest_path) + self.__queue.put(event.dest_path, OP_MOVE, src_path = event.src_path) class Event(object): - def __init__(self, path, operation, **kwargs): - if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): - raise Exception("Flags SCAN and REMOVE both set") + def __init__(self, path, operation, **kwargs): + if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): + raise Exception("Flags SCAN and REMOVE both set") - self.__path = path - self.__time = time.time() - self.__op = operation - self.__src = kwargs.get("src_path") + self.__path = path + self.__time = time.time() + self.__op = operation + self.__src = kwargs.get("src_path") - def set(self, operation, **kwargs): - if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): - raise Exception("Flags SCAN and REMOVE both set") + def set(self, operation, **kwargs): + if operation & (OP_SCAN | OP_REMOVE) == (OP_SCAN | OP_REMOVE): + raise Exception("Flags SCAN and REMOVE both set") - self.__time = time.time() - if operation & OP_SCAN: - self.__op &= ~OP_REMOVE - if operation & OP_REMOVE: - self.__op &= ~OP_SCAN - if operation & FLAG_CREATE: - self.__op &= ~OP_MOVE - self.__op |= operation + self.__time = time.time() + if operation & OP_SCAN: + self.__op &= ~OP_REMOVE + if operation & OP_REMOVE: + self.__op &= ~OP_SCAN + if operation & FLAG_CREATE: + self.__op &= ~OP_MOVE + self.__op |= operation - src_path = kwargs.get("src_path") - if src_path: - self.__src = src_path + src_path = kwargs.get("src_path") + if src_path: + self.__src = src_path - @property - def path(self): - return self.__path + @property + def path(self): + return self.__path - @property - def time(self): - return self.__time + @property + def time(self): + return self.__time - @property - def operation(self): - return self.__op + @property + def operation(self): + return self.__op - @property - def src_path(self): - return self.__src + @property + def src_path(self): + return self.__src class ScannerProcessingQueue(Thread): - def __init__(self, logger): - super(ScannerProcessingQueue, self).__init__() + def __init__(self, logger): + super(ScannerProcessingQueue, self).__init__() - self.__logger = logger - self.__cond = Condition() - self.__timer = None - self.__queue = {} - self.__running = True + self.__logger = logger + self.__cond = Condition() + self.__timer = None + self.__queue = {} + self.__running = True - def run(self): - try: - self.__run() - except Exception, e: - self.__logger.critical(e) + def run(self): + try: + self.__run() + except Exception, e: + self.__logger.critical(e) - def __run(self): - while self.__running: - time.sleep(0.1) + def __run(self): + while self.__running: + time.sleep(0.1) - with self.__cond: - self.__cond.wait() + with self.__cond: + self.__cond.wait() - if not self.__queue: - continue + if not self.__queue: + continue - self.__logger.debug("Instantiating scanner") - store = db.get_store(config.get('base', 'database_uri')) - scanner = Scanner(store) + self.__logger.debug("Instantiating scanner") + store = db.get_store(config.get('base', 'database_uri')) + scanner = Scanner(store) - item = self.__next_item() - while item: - if item.operation & OP_MOVE: - self.__logger.info("Moving: '%s' -> '%s'", item.src_path, item.path) - scanner.move_file(item.src_path, item.path) - if item.operation & OP_SCAN: - self.__logger.info("Scanning: '%s'", item.path) - scanner.scan_file(item.path) - if item.operation & OP_REMOVE: - self.__logger.info("Removing: '%s'", item.path) - scanner.remove_file(item.path) - item = self.__next_item() + item = self.__next_item() + while item: + if item.operation & OP_MOVE: + self.__logger.info("Moving: '%s' -> '%s'", item.src_path, item.path) + scanner.move_file(item.src_path, item.path) + if item.operation & OP_SCAN: + self.__logger.info("Scanning: '%s'", item.path) + scanner.scan_file(item.path) + if item.operation & OP_REMOVE: + self.__logger.info("Removing: '%s'", item.path) + scanner.remove_file(item.path) + item = self.__next_item() - scanner.finish() - store.commit() - store.close() - self.__logger.debug("Freeing scanner") - del scanner + scanner.finish() + store.commit() + store.close() + self.__logger.debug("Freeing scanner") + del scanner - def stop(self): - self.__running = False - with self.__cond: - self.__cond.notify() + def stop(self): + self.__running = False + with self.__cond: + self.__cond.notify() - def put(self, path, operation, **kwargs): - if not self.__running: - raise RuntimeError("Trying to put an item in a stopped queue") + def put(self, path, operation, **kwargs): + if not self.__running: + raise RuntimeError("Trying to put an item in a stopped queue") - with self.__cond: - if path in self.__queue: - event = self.__queue[path] - event.set(operation, **kwargs) - else: - event = Event(path, operation, **kwargs) - self.__queue[path] = event + with self.__cond: + if path in self.__queue: + event = self.__queue[path] + event.set(operation, **kwargs) + else: + event = Event(path, operation, **kwargs) + self.__queue[path] = event - if operation & OP_MOVE and kwargs["src_path"] in self.__queue: - previous = self.__queue[kwargs["src_path"]] - event.set(previous.operation, src_path = previous.src_path) - del self.__queue[kwargs["src_path"]] + if operation & OP_MOVE and kwargs["src_path"] in self.__queue: + previous = self.__queue[kwargs["src_path"]] + event.set(previous.operation, src_path = previous.src_path) + del self.__queue[kwargs["src_path"]] - if self.__timer: - self.__timer.cancel() - self.__timer = Timer(5, self.__wakeup) - self.__timer.start() + if self.__timer: + self.__timer.cancel() + self.__timer = Timer(5, self.__wakeup) + self.__timer.start() - def __wakeup(self): - with self.__cond: - self.__cond.notify() - self.__timer = None + def __wakeup(self): + with self.__cond: + self.__cond.notify() + self.__timer = None - def __next_item(self): - with self.__cond: - if not self.__queue: - return None + def __next_item(self): + with self.__cond: + if not self.__queue: + return None - next = min(self.__queue.iteritems(), key = lambda i: i[1].time) - if not self.__running or next[1].time + 5 <= time.time(): - del self.__queue[next[0]] - return next[1] + next = min(self.__queue.iteritems(), key = lambda i: i[1].time) + if not self.__running or next[1].time + 5 <= time.time(): + del self.__queue[next[0]] + return next[1] - return None + return None class SupysonicWatcher(object): - def run(self): - if not config.check(): - return + def run(self): + if not config.check(): + return - logger = logging.getLogger(__name__) - if config.get('daemon', 'log_file'): - log_handler = TimedRotatingFileHandler(config.get('daemon', 'log_file'), when = 'midnight') - else: - log_handler = logging.NullHandler() - log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) - logger.addHandler(log_handler) - if config.get('daemon', 'log_level'): - mapping = { - 'DEBUG': logging.DEBUG, - 'INFO': logging.INFO, - 'WARNING': logging.WARNING, - 'ERROR': logging.ERROR, - 'CRTICAL': logging.CRITICAL - } - logger.setLevel(mapping.get(config.get('daemon', 'log_level').upper(), logging.NOTSET)) + logger = logging.getLogger(__name__) + if config.get('daemon', 'log_file'): + log_handler = TimedRotatingFileHandler(config.get('daemon', 'log_file'), when = 'midnight') + else: + log_handler = logging.NullHandler() + log_handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")) + logger.addHandler(log_handler) + if config.get('daemon', 'log_level'): + mapping = { + 'DEBUG': logging.DEBUG, + 'INFO': logging.INFO, + 'WARNING': logging.WARNING, + 'ERROR': logging.ERROR, + 'CRTICAL': logging.CRITICAL + } + logger.setLevel(mapping.get(config.get('daemon', 'log_level').upper(), logging.NOTSET)) - store = db.get_store(config.get('base', 'database_uri')) - folders = store.find(db.Folder, db.Folder.root == True) + store = db.get_store(config.get('base', 'database_uri')) + folders = store.find(db.Folder, db.Folder.root == True) - if not folders.count(): - logger.info("No folder set. Exiting.") - store.close() - return + if not folders.count(): + logger.info("No folder set. Exiting.") + store.close() + return - queue = ScannerProcessingQueue(logger) - handler = SupysonicWatcherEventHandler(queue, logger) - observer = Observer() + queue = ScannerProcessingQueue(logger) + handler = SupysonicWatcherEventHandler(queue, logger) + observer = Observer() - for folder in folders: - logger.info("Starting watcher for %s", folder.path) - observer.schedule(handler, folder.path, recursive = True) + for folder in folders: + logger.info("Starting watcher for %s", folder.path) + observer.schedule(handler, folder.path, recursive = True) - store.close() - signal(SIGTERM, self.__terminate) + store.close() + signal(SIGTERM, self.__terminate) - self.__running = True - queue.start() - observer.start() - while self.__running: - time.sleep(2) + self.__running = True + queue.start() + observer.start() + while self.__running: + time.sleep(2) - logger.info("Stopping watcher") - observer.stop() - observer.join() - queue.stop() - queue.join() + logger.info("Stopping watcher") + observer.stop() + observer.join() + queue.stop() + queue.join() - def stop(self): - self.__running = False + def stop(self): + self.__running = False - def __terminate(self, signum, frame): - self.stop() + def __terminate(self, signum, frame): + self.stop()