1
0
mirror of https://github.com/spl0k/supysonic.git synced 2024-12-22 08:56:17 +00:00

Some fixes for Windows support (especially for tests)

The main motive here isn't full Windows support per, but being
able to run tests on Windows, as this is my main platform.
Booting a VM just to run tests is cumbersome.
This commit is contained in:
Alban Féron 2020-11-08 15:39:09 +01:00
parent 7d1825151e
commit 5c46c96b53
No known key found for this signature in database
GPG Key ID: 8CE0313646D16165
14 changed files with 154 additions and 121 deletions

View File

@ -143,25 +143,26 @@ class Cache(object):
>>> with cache.set_fileobj(key) as fp:
... json.dump(some_data, fp)
"""
f = tempfile.NamedTemporaryFile(
dir=self._cache_dir, suffix=".part", delete=False
)
try:
with tempfile.NamedTemporaryFile(
dir=self._cache_dir, suffix=".part", delete=True
) as f:
yield f
yield f
# seek to end and get position to get filesize
f.seek(0, 2)
size = f.tell()
# seek to end and get position to get filesize
f.seek(0, 2)
size = f.tell()
f.close()
with self._lock:
if self._auto_prune:
self._make_space(size, key=key)
os.replace(f.name, self._filepath(key))
self._record_file(key, size)
except OSError as e:
# Ignore error from trying to delete the renamed temp file
if e.errno != errno.ENOENT:
raise
with self._lock:
if self._auto_prune:
self._make_space(size, key=key)
os.replace(f.name, self._filepath(key))
self._record_file(key, size)
except:
f.close()
os.remove(f.name)
raise
def set(self, key, value):
"""Set a literal value into the cache and return its path"""

View File

@ -3,12 +3,13 @@
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2013-2019 Alban 'spl0k' Féron
# Copyright (C) 2013-2020 Alban 'spl0k' Féron
# 2017 Óscar García Amor
#
# Distributed under terms of the GNU AGPLv3 license.
import os
import sys
import tempfile
from configparser import RawConfigParser
@ -39,7 +40,9 @@ class DefaultConfig(object):
"mount_api": True,
}
DAEMON = {
"socket": os.path.join(tempdir, "supysonic.sock"),
"socket": r"\\.\pipe\supysonic"
if sys.platform == "win32"
else os.path.join(tempdir, "supysonic.sock"),
"run_watcher": True,
"wait_delay": 5,
"jukebox_command": None,

View File

@ -35,22 +35,25 @@ class RadioStationTestCase(ApiTestBase):
self._make_request(
"createInternetRadioStation",
{"u": "bob", "p": "B0b", "username": "alice"},
error=50
error=50,
)
# check params
self._make_request("createInternetRadioStation", error=10)
self._make_request("createInternetRadioStation", {"streamUrl": "missingName"}, error=10)
self._make_request("createInternetRadioStation", {"name": "missing stream"}, error=10)
self._make_request(
"createInternetRadioStation", {"streamUrl": "missingName"}, error=10
)
self._make_request(
"createInternetRadioStation", {"name": "missing stream"}, error=10
)
# create w/ required fields
stream_url = "http://example.com/radio/create"
name = "radio station"
self._make_request("createInternetRadioStation", {
"streamUrl": stream_url,
"name": name,
})
self._make_request(
"createInternetRadioStation", {"streamUrl": stream_url, "name": name}
)
# the correct value is 2 because _make_request uses GET then POST
self.assertRadioStationCountEqual(2)
@ -66,11 +69,10 @@ class RadioStationTestCase(ApiTestBase):
name = "radio station1"
homepage_url = "http://example.com/home"
self._make_request("createInternetRadioStation", {
"streamUrl": stream_url,
"name": name,
"homepageUrl": homepage_url,
})
self._make_request(
"createInternetRadioStation",
{"streamUrl": stream_url, "name": name, "homepageUrl": homepage_url},
)
# the correct value is 2 because _make_request uses GET then POST
self.assertRadioStationCountEqual(2)
@ -83,7 +85,7 @@ class RadioStationTestCase(ApiTestBase):
self._make_request(
"updateInternetRadioStation",
{"u": "bob", "p": "B0b", "username": "alice"},
error=50
error=50,
)
# test data
@ -107,67 +109,86 @@ class RadioStationTestCase(ApiTestBase):
)
# check params
self._make_request("updateInternetRadioStation", {
"id": station.id, "homepageUrl": "missing required params",
}, error=10)
self._make_request("updateInternetRadioStation", {
"id": station.id, "name": "missing streamUrl",
}, error=10)
self._make_request("updateInternetRadioStation", {
"id": station.id, "streamUrl": "missing name",
}, error=10)
self._make_request(
"updateInternetRadioStation",
{"id": station.id, "homepageUrl": "missing required params"},
error=10,
)
self._make_request(
"updateInternetRadioStation",
{"id": station.id, "name": "missing streamUrl"},
error=10,
)
self._make_request(
"updateInternetRadioStation",
{"id": station.id, "streamUrl": "missing name"},
error=10,
)
# update the record w/ required fields
self._make_request("updateInternetRadioStation", {
"id": station.id,
"streamUrl": update["stream_url"],
"name": update["name"],
})
self._make_request(
"updateInternetRadioStation",
{
"id": station.id,
"streamUrl": update["stream_url"],
"name": update["name"],
},
)
with db_session:
rs_update = RadioStation[station.id]
self.assertRadioStationEquals(rs_update, update["stream_url"], update["name"], test["homepage_url"])
self.assertRadioStationEquals(
rs_update, update["stream_url"], update["name"], test["homepage_url"]
)
# update the record w/ all fields
self._make_request("updateInternetRadioStation", {
"id": station.id,
"streamUrl": update["stream_url"],
"name": update["name"],
"homepageUrl": update["homepage_url"],
})
self._make_request(
"updateInternetRadioStation",
{
"id": station.id,
"streamUrl": update["stream_url"],
"name": update["name"],
"homepageUrl": update["homepage_url"],
},
)
with db_session:
rs_update = RadioStation[station.id]
self.assertRadioStationEquals(rs_update, update["stream_url"], update["name"], update["homepage_url"])
self.assertRadioStationEquals(
rs_update, update["stream_url"], update["name"], update["homepage_url"]
)
def test_delete_radio_station(self):
# test for non-admin access
self._make_request(
"deleteInternetRadioStation",
{"u": "bob", "p": "B0b", "username": "alice"},
error=50
error=50,
)
# check params
self._make_request("deleteInternetRadioStation", error=10)
self._make_request("deleteInternetRadioStation", {"id": 1}, error=0)
self._make_request("deleteInternetRadioStation", {"id": str(uuid.uuid4())}, error=70)
self._make_request(
"deleteInternetRadioStation", {"id": str(uuid.uuid4())}, error=70
)
# delete
with db_session:
station = RadioStation(
stream_url="http://example.com/radio/delete",
name="Radio Delete",
homepage_url="http://example.com/update"
homepage_url="http://example.com/update",
)
self._make_request("deleteInternetRadioStation", {"id": station.id}, skip_post=True)
self._make_request(
"deleteInternetRadioStation", {"id": station.id}, skip_post=True
)
self.assertRadioStationCountEqual(0)
def test_get_radio_stations(self):
test_range = 3
with db_session:
@ -180,7 +201,9 @@ class RadioStationTestCase(ApiTestBase):
# verify happy path is clean
self.assertRadioStationCountEqual(test_range)
rv, child = self._make_request("getInternetRadioStations", tag="internetRadioStations")
rv, child = self._make_request(
"getInternetRadioStations", tag="internetRadioStations"
)
self.assertEqual(len(child), test_range)
# This order is guaranteed to work because the api returns in order by name.
# Test data is sequential by design.
@ -190,7 +213,6 @@ class RadioStationTestCase(ApiTestBase):
self.assertTrue(station.get("name").endswith("Radio {}".format(x)))
self.assertTrue(station.get("homePageUrl").endswith("update-{}".format(x)))
# test for non-admin access
rv, child = self._make_request(
"getInternetRadioStations",
@ -199,4 +221,3 @@ class RadioStationTestCase(ApiTestBase):
)
self.assertEqual(len(child), test_range)

View File

@ -1,14 +1,14 @@
#!/usr/bin/env python
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
# Copyright (C) 2017-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import unittest
import sys
from pony.orm import db_session
@ -46,11 +46,19 @@ class TranscodingTestCase(ApiTestBase):
def test_no_transcoding_available(self):
self._make_request("stream", {"id": self.trackid, "format": "wat"}, error=0)
@unittest.skipIf(
sys.platform == "win32",
"Can't test transcoding on Windows because of a lack of simple commandline tools",
)
def test_direct_transcode(self):
rv = self._stream(maxBitRate=96, estimateContentLength="true")
self.assertIn("tests/assets/folder/silence.mp3", rv.data)
self.assertTrue(rv.data.endswith("96"))
@unittest.skipIf(
sys.platform == "win32",
"Can't test transcoding on Windows because of a lack of simple commandline tools",
)
def test_decode_encode(self):
rv = self._stream(format="cat")
self.assertEqual(rv.data, "Pushing out some mp3 data...")

View File

@ -1 +0,0 @@
../folder/silence.mp3

Binary file not shown.

View File

@ -1,20 +1,17 @@
#!/usr/bin/env python
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
# Copyright (C) 2017-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import io
import os
import shutil
import tempfile
import shlex
import unittest
from contextlib import contextmanager
from io import StringIO
from pony.orm import db_session
@ -29,8 +26,8 @@ class CLITestCase(unittest.TestCase):
def setUp(self):
conf = TestConfig(False, False)
self.__dbfile = tempfile.mkstemp()[1]
conf.BASE["database_uri"] = "sqlite:///" + self.__dbfile
self.__db = tempfile.mkstemp()
conf.BASE["database_uri"] = "sqlite:///" + self.__db[1]
init_database(conf.BASE["database_uri"])
self.__stdout = StringIO()
@ -41,19 +38,15 @@ class CLITestCase(unittest.TestCase):
self.__stdout.close()
self.__stderr.close()
release_database()
os.unlink(self.__dbfile)
os.close(self.__db[0])
os.remove(self.__db[1])
@contextmanager
def _tempdir(self):
d = tempfile.mkdtemp()
try:
yield d
finally:
shutil.rmtree(d)
def __add_folder(self, name, path):
self.__cli.onecmd("folder add {0} {1}".format(name, shlex.quote(path)))
def test_folder_add(self):
with self._tempdir() as d:
self.__cli.onecmd("folder add tmpfolder " + d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d)
with db_session:
f = Folder.select().first()
@ -61,19 +54,19 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(f.path, d)
def test_folder_add_errors(self):
with self._tempdir() as d:
self.__cli.onecmd("folder add f1 " + d)
self.__cli.onecmd("folder add f2 " + d)
with self._tempdir() as d:
self.__cli.onecmd("folder add f1 " + d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("f1", d)
self.__add_folder("f2", d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("f1", d)
self.__cli.onecmd("folder add f3 /invalid/path")
with db_session:
self.assertEqual(Folder.select().count(), 1)
def test_folder_delete(self):
with self._tempdir() as d:
self.__cli.onecmd("folder add tmpfolder " + d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d)
self.__cli.onecmd("folder delete randomfolder")
self.__cli.onecmd("folder delete tmpfolder")
@ -81,15 +74,15 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(Folder.select().count(), 0)
def test_folder_list(self):
with self._tempdir() as d:
self.__cli.onecmd("folder add tmpfolder " + d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d)
self.__cli.onecmd("folder list")
self.assertIn("tmpfolder", self.__stdout.getvalue())
self.assertIn(d, self.__stdout.getvalue())
def test_folder_scan(self):
with self._tempdir() as d:
self.__cli.onecmd("folder add tmpfolder " + d)
with tempfile.TemporaryDirectory() as d:
self.__add_folder("tmpfolder", d)
with tempfile.NamedTemporaryFile(dir=d):
self.__cli.onecmd("folder scan")
self.__cli.onecmd("folder scan tmpfolder nonexistent")

View File

@ -54,7 +54,7 @@ class DbTestCase(unittest.TestCase):
return (
db.Folder.get(name="Root folder"),
db.Folder.get(name="Child folder"),
db.Folder.get(name="Child Folder (No Art)")
db.Folder.get(name="Child Folder (No Art)"),
)
def create_some_tracks(self, artist=None, album=None):

View File

@ -1,15 +1,15 @@
#!/usr/bin/env python
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
# Copyright (C) 2017-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import io
import mutagen
import os
import os.path
import tempfile
import unittest
@ -39,10 +39,15 @@ class ScannerTestCase(unittest.TestCase):
@contextmanager
def __temporary_track_copy(self):
track = db.Track.select().first()
with tempfile.NamedTemporaryFile(dir=os.path.dirname(track.path)) as tf:
with tempfile.NamedTemporaryFile(
dir=os.path.dirname(track.path), delete=False
) as tf:
with io.open(track.path, "rb") as f:
tf.write(f.read())
yield tf
try:
yield tf.name
finally:
os.remove(tf.name)
def __scan(self, force=False):
self.scanner = Scanner(force=force)
@ -115,7 +120,7 @@ class ScannerTestCase(unittest.TestCase):
with self.__temporary_track_copy() as tf:
self.__scan()
self.assertEqual(db.Track.select().count(), 2)
self.scanner.move_file(tf.name, track.path)
self.scanner.move_file(tf, track.path)
commit()
self.assertEqual(db.Track.select().count(), 1)
@ -134,9 +139,10 @@ class ScannerTestCase(unittest.TestCase):
self.__scan()
self.assertEqual(db.Track.select().count(), 2)
tf.seek(0, 0)
tf.write(b"\x00" * 4096)
tf.truncate()
with open(tf, "wb") as f:
f.seek(0, 0)
f.write(b"\x00" * 4096)
f.truncate()
self.__scan(True)
self.assertEqual(db.Track.select().count(), 1)
@ -145,7 +151,7 @@ class ScannerTestCase(unittest.TestCase):
def test_rescan_removed_file(self):
track = db.Track.select().first()
with self.__temporary_track_copy() as tf:
with self.__temporary_track_copy():
self.__scan()
self.assertEqual(db.Track.select().count(), 2)
@ -158,7 +164,7 @@ class ScannerTestCase(unittest.TestCase):
with self.__temporary_track_copy() as tf:
self.__scan()
copy = db.Track.get(path=tf.name)
copy = db.Track.get(path=tf)
self.assertEqual(copy.artist.name, "Some artist")
self.assertEqual(copy.album.name, "Awesome album")

View File

@ -8,7 +8,6 @@
#
# Distributed under terms of the GNU AGPLv3 license.
import os
import unittest
import shutil
import tempfile
@ -21,10 +20,9 @@ from ..testbase import TestConfig
class SecretTestCase(unittest.TestCase):
def setUp(self):
self.__dbfile = tempfile.mkstemp()[1]
self.__dir = tempfile.mkdtemp()
self.config = TestConfig(False, False)
self.config.BASE["database_uri"] = "sqlite:///" + self.__dbfile
self.config.BASE["database_uri"] = "sqlite://"
self.config.WEBAPP["cache_dir"] = self.__dir
init_database(self.config.BASE["database_uri"])
@ -32,7 +30,6 @@ class SecretTestCase(unittest.TestCase):
def tearDown(self):
shutil.rmtree(self.__dir)
os.remove(self.__dbfile)
def test_key(self):
app1 = create_application(self.config)

View File

@ -1,14 +1,12 @@
#!/usr/bin/env python
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
# Copyright (C) 2017-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import io
import mutagen
import os
import shutil
@ -18,7 +16,6 @@ import unittest
from hashlib import sha1
from pony.orm import db_session
from threading import Thread
from supysonic.db import init_database, release_database, Track, Artist, Folder
from supysonic.managers.folder import FolderManager
@ -37,8 +34,8 @@ class WatcherTestConfig(TestConfig):
class WatcherTestBase(unittest.TestCase):
def setUp(self):
self.__dbfile = tempfile.mkstemp()[1]
dburi = "sqlite:///" + self.__dbfile
self.__db = tempfile.mkstemp()
dburi = "sqlite:///" + self.__db[1]
init_database(dburi)
conf = WatcherTestConfig(dburi)
@ -48,7 +45,8 @@ class WatcherTestBase(unittest.TestCase):
def tearDown(self):
release_database()
os.unlink(self.__dbfile)
os.close(self.__db[0])
os.remove(self.__db[1])
def _start(self):
self.__watcher.start()

View File

@ -1,14 +1,13 @@
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2019 Alban 'spl0k' Féron
# Copyright (C) 2019-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import os.path
import shutil
import sys
import tempfile
import unittest
@ -20,6 +19,7 @@ from supysonic.managers.folder import FolderManager
from supysonic.scanner import Scanner
@unittest.skipIf(sys.platform == "win32", "Windows doesn't allow space-only filenames")
class Issue148TestCase(unittest.TestCase):
def setUp(self):
self.__dir = tempfile.mkdtemp()

View File

@ -8,6 +8,7 @@
import os
import os.path
import shutil
import sys
import tempfile
import unittest
@ -18,6 +19,9 @@ from supysonic.managers.folder import FolderManager
from supysonic.scanner import Scanner
@unittest.skipIf(
sys.platform == "win32", "Windows doesn't seem too allow badly encoded paths"
)
class Issue85TestCase(unittest.TestCase):
def setUp(self):
self.__dir = tempfile.mkdtemp()

View File

@ -1,17 +1,15 @@
# coding: utf-8
#
# This file is part of Supysonic.
# Supysonic is a Python implementation of the Subsonic server API.
#
# Copyright (C) 2017-2018 Alban 'spl0k' Féron
# Copyright (C) 2017-2020 Alban 'spl0k' Féron
#
# Distributed under terms of the GNU AGPLv3 license.
import inspect
import os
import shutil
import unittest
import tempfile
import unittest
from pony.orm import db_session
@ -82,9 +80,10 @@ class TestBase(unittest.TestCase):
__with_api__ = False
def setUp(self):
self.__db = tempfile.mkstemp()
self.__dir = tempfile.mkdtemp()
config = TestConfig(self.__with_webui__, self.__with_api__)
config.BASE["database_uri"] = "sqlite:"
config.BASE["database_uri"] = "sqlite:///" + self.__db[1]
config.WEBAPP["cache_dir"] = self.__dir
init_database(config.BASE["database_uri"])
@ -107,3 +106,5 @@ class TestBase(unittest.TestCase):
def tearDown(self):
release_database()
shutil.rmtree(self.__dir)
os.close(self.__db[0])
os.remove(self.__db[1])

View File

@ -12,6 +12,7 @@ import unittest
from .api.test_lyrics import LyricsTestCase
from .base.test_lastfm import LastFmTestCase
def suite():
suite = unittest.TestSuite()