1
0
mirror of https://github.com/spl0k/supysonic.git synced 2025-01-13 03:36:17 +00:00
supysonic/tests/base/test_scanner.py

202 lines
6.5 KiB
Python
Raw Normal View History

2017-11-25 21:21:58 +00:00
#!/usr/bin/env python
2018-03-04 20:49:56 +00:00
# coding: utf-8
2017-11-25 21:21:58 +00:00
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
2017-11-25 21:21:58 +00:00
#
# Distributed under terms of the GNU AGPLv3 license.
import io
import mutagen
import os.path
import tempfile
import unittest
from contextlib import contextmanager
from pony.orm import db_session, commit
2017-11-25 21:21:58 +00:00
from supysonic import db
from supysonic.managers.folder import FolderManager
from supysonic.scanner import Scanner
class ScannerTestCase(unittest.TestCase):
def setUp(self):
db.init_database('sqlite:')
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
with db_session:
2018-10-09 01:19:22 +00:00
folder = FolderManager.add('folder', os.path.abspath('tests/assets/folder'))
2017-12-17 22:25:34 +00:00
self.assertIsNotNone(folder)
self.folderid = folder.id
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
self.scanner = Scanner()
self.scanner.scan(folder)
2017-11-25 21:21:58 +00:00
def tearDown(self):
self.scanner.finish()
2017-12-19 22:16:55 +00:00
db.release_database()
2017-11-25 21:21:58 +00:00
@contextmanager
def __temporary_track_copy(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2017-11-25 21:21:58 +00:00
with tempfile.NamedTemporaryFile(dir = os.path.dirname(track.path)) as tf:
with io.open(track.path, 'rb') as f:
tf.write(f.read())
yield tf
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_scan(self):
2017-12-17 22:25:34 +00:00
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
self.assertRaises(TypeError, self.scanner.scan, None)
self.assertRaises(TypeError, self.scanner.scan, 'string')
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_progress(self):
def progress(processed):
2017-11-25 21:21:58 +00:00
self.assertIsInstance(processed, int)
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid], progress)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_rescan(self):
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_force_rescan(self):
2017-12-17 22:25:34 +00:00
self.scanner = Scanner(True)
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_scan_file(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2017-11-25 21:21:58 +00:00
self.assertRaises(TypeError, self.scanner.scan_file, None)
self.assertRaises(TypeError, self.scanner.scan_file, track)
self.scanner.scan_file('/some/inexistent/path')
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_remove_file(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2017-11-25 21:21:58 +00:00
self.assertRaises(TypeError, self.scanner.remove_file, None)
self.assertRaises(TypeError, self.scanner.remove_file, track)
self.scanner.remove_file('/some/inexistent/path')
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
self.scanner.remove_file(track.path)
self.scanner.finish()
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 0)
self.assertEqual(db.Album.select().count(), 0)
self.assertEqual(db.Artist.select().count(), 0)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_move_file(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2017-11-25 21:21:58 +00:00
self.assertRaises(TypeError, self.scanner.move_file, None, 'string')
self.assertRaises(TypeError, self.scanner.move_file, track, 'string')
self.assertRaises(TypeError, self.scanner.move_file, 'string', None)
self.assertRaises(TypeError, self.scanner.move_file, 'string', track)
self.scanner.move_file('/some/inexistent/path', track.path)
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
self.scanner.move_file(track.path, track.path)
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
self.assertRaises(Exception, self.scanner.move_file, track.path, '/some/inexistent/path')
with self.__temporary_track_copy() as tf:
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 2)
2017-11-25 21:21:58 +00:00
self.scanner.move_file(tf.name, track.path)
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2018-10-09 01:19:22 +00:00
new_path = track.path.replace('silence','silence_moved')
2017-11-25 21:21:58 +00:00
self.scanner.move_file(track.path, new_path)
2017-12-17 22:25:34 +00:00
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
self.assertEqual(track.path, new_path)
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_rescan_corrupt_file(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
self.scanner = Scanner(True)
2017-11-25 21:21:58 +00:00
with self.__temporary_track_copy() as tf:
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 2)
2017-11-25 21:21:58 +00:00
tf.seek(0, 0)
tf.write(b'\x00' * 4096)
2017-11-25 21:21:58 +00:00
tf.truncate()
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_rescan_removed_file(self):
2017-12-17 22:25:34 +00:00
track = db.Track.select().first()
2017-11-25 21:21:58 +00:00
with self.__temporary_track_copy() as tf:
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 2)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
self.scanner.scan(db.Folder[self.folderid])
commit()
self.assertEqual(db.Track.select().count(), 1)
2017-11-25 21:21:58 +00:00
2017-12-17 22:25:34 +00:00
@db_session
2017-11-25 21:21:58 +00:00
def test_scan_tag_change(self):
2017-12-17 22:25:34 +00:00
self.scanner = Scanner(True)
folder = db.Folder[self.folderid]
2017-11-25 21:21:58 +00:00
with self.__temporary_track_copy() as tf:
2017-12-17 22:25:34 +00:00
self.scanner.scan(folder)
commit()
copy = db.Track.get(path = tf.name)
2017-11-25 21:21:58 +00:00
self.assertEqual(copy.artist.name, 'Some artist')
self.assertEqual(copy.album.name, 'Awesome album')
tags = mutagen.File(copy.path, easy = True)
tags['artist'] = 'Renamed artist'
tags['album'] = 'Crappy album'
tags.save()
2017-12-17 22:25:34 +00:00
self.scanner.scan(folder)
2017-11-25 21:21:58 +00:00
self.scanner.finish()
self.assertEqual(copy.artist.name, 'Renamed artist')
self.assertEqual(copy.album.name, 'Crappy album')
2017-12-17 22:25:34 +00:00
self.assertIsNotNone(db.Artist.get(name = 'Some artist'))
self.assertIsNotNone(db.Album.get(name = 'Awesome album'))
2017-11-25 21:21:58 +00:00
def test_stats(self):
stats = self.scanner.stats()
self.assertEqual(stats.added.artists, 1)
self.assertEqual(stats.added.albums, 1)
self.assertEqual(stats.added.tracks, 1)
self.assertEqual(stats.deleted.artists, 0)
self.assertEqual(stats.deleted.albums, 0)
self.assertEqual(stats.deleted.tracks, 0)
2017-11-25 21:21:58 +00:00
if __name__ == '__main__':
unittest.main()