1
0
mirror of https://github.com/spl0k/supysonic.git synced 2025-01-21 22:47:24 +00:00

pyupgrade

This commit is contained in:
Alban Féron 2023-01-08 16:16:28 +01:00
parent 14dc63631f
commit 82187fd4c4
No known key found for this signature in database
GPG Key ID: 8CE0313646D16165
25 changed files with 65 additions and 77 deletions

View File

@ -24,7 +24,7 @@ api = Blueprint("api", __name__)
def api_routing(endpoint):
def decorator(func):
viewendpoint = "{}.view".format(endpoint)
viewendpoint = f"{endpoint}.view"
api.add_url_rule(endpoint, view_func=func, methods=["GET", "POST"])
api.add_url_rule(viewendpoint, view_func=func, methods=["GET", "POST"])
return func

View File

@ -29,11 +29,11 @@ def star_single(cls, starcls, eid):
try:
e = cls[eid]
except cls.DoesNotExist:
raise NotFound("{} {}".format(cls.__name__, eid))
raise NotFound(f"{cls.__name__} {eid}")
try:
starcls[request.user, eid]
raise GenericError("{} {} already starred".format(cls.__name__, eid))
raise GenericError(f"{cls.__name__} {eid} already starred")
except starcls.DoesNotExist:
pass

View File

@ -21,7 +21,7 @@ class SubsonicAPIException(HTTPException):
def __str__(self):
code = self.api_code if self.api_code is not None else "??"
return "{}: {}".format(code, self.message)
return f"{code}: {self.message}"
class GenericError(SubsonicAPIException):
@ -38,7 +38,7 @@ class ServerError(GenericError):
class UnsupportedParameter(GenericError):
def __init__(self, parameter, *args, **kwargs):
message = "Unsupported parameter '{}'".format(parameter)
message = f"Unsupported parameter '{parameter}'"
super().__init__(message, *args, **kwargs)
@ -89,7 +89,7 @@ class NotFound(SubsonicAPIException):
def __init__(self, entity, *args, **kwargs):
super().__init__(*args, **kwargs)
self.message = "{} not found".format(entity)
self.message = f"{entity} not found"
class AggregateException(SubsonicAPIException):

View File

@ -82,7 +82,7 @@ class JSONPFormatter(JSONBaseFormatter):
)
rv = self._subsonicify(elem, data)
rv = "{}({})".format(self.__callback, json.dumps(rv))
rv = f"{self.__callback}({json.dumps(rv)})"
rv = make_response(rv)
rv.mimetype = "application/javascript"
return rv

View File

@ -115,14 +115,14 @@ def stream_media():
if dst_suffix != src_suffix or dst_bitrate != res.bitrate:
# Requires transcoding
cache = current_app.transcode_cache
cache_key = "{}-{}.{}".format(res.id, dst_bitrate, dst_suffix)
cache_key = f"{res.id}-{dst_bitrate}.{dst_suffix}"
try:
response = send_file(
cache.get(cache_key), mimetype=dst_mimetype, conditional=True
)
except CacheMiss:
transcoder = config.get("transcoder_{}_{}".format(src_suffix, dst_suffix))
transcoder = config.get(f"transcoder_{src_suffix}_{dst_suffix}")
decoder = config.get("decoder_" + src_suffix) or config.get("decoder")
encoder = config.get("encoder_" + dst_suffix) or config.get("encoder")
if not transcoder and (not decoder or not encoder):
@ -134,10 +134,10 @@ def stream_media():
logger.info(message)
raise GenericError(message)
transcoder, decoder, encoder = [
transcoder, decoder, encoder = (
prepare_transcoding_cmdline(x, res, src_suffix, dst_suffix, dst_bitrate)
for x in (transcoder, decoder, encoder)
]
)
try:
if transcoder:
dec_proc = None
@ -280,7 +280,7 @@ def download_media():
raise GenericError("Nothing to download")
resp = Response(z, mimetype="application/zip")
resp.headers["Content-Disposition"] = "attachment; filename={}.zip".format(rv.name)
resp.headers["Content-Disposition"] = f"attachment; filename={rv.name}.zip"
resp.headers["Content-Length"] = len(z)
return resp
@ -291,7 +291,7 @@ def _cover_from_track(obj):
Returns None if no cover art is available.
"""
cache = current_app.cache
cache_key = "{}-cover".format(obj.id)
cache_key = f"{obj.id}-cover"
try:
return cache.get(cache_key)
except CacheMiss:
@ -383,15 +383,15 @@ def cover_art():
mimetype = None
if os.path.splitext(cover_path)[1].lower() not in EXTENSIONS:
with Image.open(cover_path) as im:
mimetype = "image/{}".format(im.format.lower())
mimetype = f"image/{im.format.lower()}"
return send_file(cover_path, mimetype=mimetype)
with Image.open(cover_path) as im:
mimetype = "image/{}".format(im.format.lower())
mimetype = f"image/{im.format.lower()}"
if size > im.width and size > im.height:
return send_file(cover_path, mimetype=mimetype)
cache_key = "{}-cover-{}".format(eid, size)
cache_key = f"{eid}-cover-{size}"
try:
return send_file(cache.get(cache_key), mimetype=mimetype)
except CacheMiss:
@ -447,7 +447,7 @@ def lyrics():
unique = hashlib.md5(
json.dumps([x.lower() for x in (artist, title)]).encode("utf-8")
).hexdigest()
cache_key = "lyrics-{}".format(unique)
cache_key = f"lyrics-{unique}"
lyrics = {}
try:

View File

@ -24,9 +24,5 @@ def unsupported():
for m in methods:
api.add_url_rule(
"/{}".format(m), "unsupported", unsupported, methods=["GET", "POST"]
)
api.add_url_rule(
"/{}.view".format(m), "unsupported", unsupported, methods=["GET", "POST"]
)
api.add_url_rule(f"/{m}", "unsupported", unsupported, methods=["GET", "POST"])
api.add_url_rule(f"/{m}.view", "unsupported", unsupported, methods=["GET", "POST"])

View File

@ -28,7 +28,7 @@ class TimedProgressDisplay:
def __call__(self, name, scanned):
if time.time() - self.__last_display > self.__interval:
progress = "Scanning '{}': {} files scanned".format(name, scanned)
progress = f"Scanning '{name}': {scanned} files scanned"
self.__stdout.write("\b" * self.__last_len)
self.__stdout.write(progress)
self.__stdout.flush()
@ -55,7 +55,7 @@ def folder_list():
click.echo("Name\t\tPath\n----\t\t----")
for f in Folder.select().where(Folder.root):
click.echo("{: <16}{}".format(f.name, f.path))
click.echo(f"{f.name: <16}{f.path}")
@folder.command("add")
@ -76,7 +76,7 @@ def folder_add(name, path):
try:
FolderManager.add(name, path)
click.echo("Folder '{}' added".format(name))
click.echo(f"Folder '{name}' added")
except ValueError as e:
raise ClickException(str(e)) from e
@ -91,9 +91,9 @@ def folder_delete(name):
try:
FolderManager.delete_by_name(name)
click.echo("Deleted folder '{}'".format(name))
click.echo(f"Deleted folder '{name}'")
except Folder.DoesNotExist as e:
raise ClickException("Folder '{}' does not exist.".format(name)) from e
raise ClickException(f"Folder '{name}' does not exist.") from e
@folder.command("scan")
@ -272,9 +272,9 @@ def user_delete(name):
try:
UserManager.delete_by_name(name)
click.echo("Deleted user '{}'".format(name))
click.echo(f"Deleted user '{name}'")
except User.DoesNotExist as e:
raise ClickException("User '{}' does not exist.".format(name)) from e
raise ClickException(f"User '{name}' does not exist.") from e
def _echo_role_change(username, name, value):
@ -325,9 +325,9 @@ def user_changepass(name, password):
try:
UserManager.change_password2(name, password)
click.echo("Successfully changed '{}' password".format(name))
click.echo(f"Successfully changed '{name}' password")
except User.DoesNotExist as e:
raise ClickException("User '{}' does not exist.".format(name)) from e
raise ClickException(f"User '{name}' does not exist.") from e
@user.command("rename")
@ -358,7 +358,7 @@ def user_rename(name, newname):
user.name = newname
user.save()
click.echo("User '{}' renamed to '{}'".format(name, newname))
click.echo(f"User '{name}' renamed to '{newname}'")
def main():

View File

@ -138,7 +138,7 @@ class DaemonClient:
return Client(address=self.__address, authkey=self.__key)
except OSError:
raise DaemonUnavailableError(
"Couldn't connect to daemon at {}".format(self.__address)
f"Couldn't connect to daemon at {self.__address}"
)
def add_watched_folder(self, folder):

View File

@ -368,9 +368,9 @@ class Track(PathMixin, _Model):
return mimetypes.guess_type(self.path, False)[0] or "application/octet-stream"
def duration_str(self):
ret = "{:02}:{:02}".format((self.duration % 3600) / 60, self.duration % 60)
ret = f"{(self.duration % 3600) / 60:02}:{self.duration % 60:02}"
if self.duration >= 3600:
ret = "{:02}:{}".format(self.duration / 3600, ret)
ret = f"{self.duration / 3600:02}:{ret}"
return ret
def suffix(self):
@ -492,7 +492,7 @@ class Playlist(_Model):
"id": str(self.id),
"name": self.name
if self.user.id == user.id
else "[{}] {}".format(self.user.name, self.name),
else f"[{self.user.name}] {self.name}",
"owner": self.user.name,
"public": self.public,
"songCount": len(tracks),
@ -536,7 +536,7 @@ class Playlist(_Model):
tid = UUID(track)
if self.tracks and len(self.tracks) > 0:
self.tracks = "{},{}".format(self.tracks, tid)
self.tracks = f"{self.tracks},{tid}"
else:
self.tracks = str(tid)

View File

@ -65,7 +65,7 @@ def scan_status():
current_app.config["DAEMON"]["socket"]
).get_scanning_progress()
if scanned is not None:
flash("Scanning in progress, {} files scanned.".format(scanned))
flash(f"Scanning in progress, {scanned} files scanned.")
except DaemonUnavailableError:
pass

View File

@ -56,9 +56,7 @@ def playlist_export(uid, playlist):
return Response(
render_template("playlist_export.m3u", playlist=playlist),
mimetype="audio/mpegurl",
headers={
"Content-disposition": "attachment; filename={}.m3u".format(playlist.name)
},
headers={"Content-disposition": f"attachment; filename={playlist.name}.m3u"},
)

View File

@ -26,7 +26,7 @@ class FolderManager:
def add(name, path):
try:
Folder.get(name=name, root=True)
raise ValueError("Folder '{}' exists".format(name))
raise ValueError(f"Folder '{name}' exists")
except Folder.DoesNotExist:
pass

View File

@ -29,7 +29,7 @@ class UserManager:
@staticmethod
def add(name, password, **kwargs):
if User.select().where(User.name == name).exists():
raise ValueError("User '{}' exists".format(name))
raise ValueError(f"User '{name}' exists")
crypt, salt = UserManager.__encrypt_password(password)
return User.create(name=name, password=crypt, salt=salt, **kwargs)

View File

@ -109,15 +109,13 @@ def main(server, host, port, socket, processes, threads):
server = find_first_available_server()
if server is None:
raise ClickException(
"Couldn't load any server, please install one of {}".format(_servers)
f"Couldn't load any server, please install one of {_servers}"
)
else:
try:
server = get_server(server)
except ImportError:
raise ClickException(
"Couldn't load {}, please install it first".format(server)
)
raise ClickException(f"Couldn't load {server}, please install it first")
if socket is not None:
host = None

View File

@ -28,9 +28,9 @@ class GunicornApp(BaseApplication):
threads = self.__config["threads"]
if socket is not None:
self.cfg.set("bind", "unix:{}".format(socket))
self.cfg.set("bind", f"unix:{socket}")
else:
self.cfg.set("bind", "{}:{}".format(host, port))
self.cfg.set("bind", f"{host}:{port}")
if processes is not None:
self.cfg.set("workers", processes)

View File

@ -23,9 +23,7 @@ class ApiTestBase(TestBase):
def setUp(self, apiVersion="1.12.0"):
super().setUp()
self.apiVersion = apiVersion
xsd = etree.parse(
"tests/assets/subsonic-rest-api-{}.xsd".format(self.apiVersion)
)
xsd = etree.parse(f"tests/assets/subsonic-rest-api-{self.apiVersion}.xsd")
self.schema = etree.XMLSchema(xsd)
def _find(self, xml, path):
@ -33,7 +31,7 @@ class ApiTestBase(TestBase):
Helper method that insert the namespace in ElementPath 'path'
"""
path = path_replace_regexp.sub(r"/{{{}}}\1".format(NS), path)
path = path_replace_regexp.sub(rf"/{{{NS}}}\1", path)
return xml.find(path)
def _xpath(self, elem, path):
@ -70,7 +68,7 @@ class ApiTestBase(TestBase):
if "u" not in args:
args.update({"u": "alice", "p": "Alic3"})
uri = "/rest/{}.view".format(endpoint)
uri = f"/rest/{endpoint}.view"
rg = self.client.get(uri, query_string=args)
if not skip_post:
rp = self.client.post(uri, data=args)
@ -82,13 +80,13 @@ class ApiTestBase(TestBase):
if xml.get("status") == "ok":
self.assertIsNone(error)
if tag:
self.assertEqual(xml[0].tag, "{{{}}}{}".format(NS, tag))
self.assertEqual(xml[0].tag, f"{{{NS}}}{tag}")
return rg, xml[0]
else:
self.assertEqual(len(xml), 0)
return rg, None
else:
self.assertIsNone(tag)
self.assertEqual(xml[0].tag, "{{{}}}error".format(NS))
self.assertEqual(xml[0].tag, f"{{{NS}}}error")
self.assertEqual(xml[0].get("code"), str(error))
return rg

View File

@ -24,7 +24,7 @@ class ApiSetupTestCase(TestBase):
self._patch_client()
def __basic_auth_get(self, username, password):
hashed = base64.b64encode("{}:{}".format(username, password).encode("utf-8"))
hashed = base64.b64encode(f"{username}:{password}".encode("utf-8"))
headers = {"Authorization": "Basic " + hashed.decode("utf-8")}
return self.client.get(
"/rest/ping.view", headers=headers, query_string={"c": "tests"}

View File

@ -24,7 +24,7 @@ class BrowseTestCase(ApiTestBase):
for letter in "ABC":
folder = Folder.create(
name=letter + "rtist",
path="tests/assets/{}rtist".format(letter),
path=f"tests/assets/{letter}rtist",
root=False,
parent=self.root,
)

View File

@ -57,9 +57,7 @@ class MediaTestCase(ApiTestBase):
disc=1,
artist=artist,
album=album,
path=os.path.abspath(
"tests/assets/formats/silence.{}".format(self.formats[i])
),
path=os.path.abspath(f"tests/assets/formats/silence.{self.formats[i]}"),
root_folder=folder,
folder=folder,
duration=2,

View File

@ -178,9 +178,9 @@ class RadioStationTestCase(ApiTestBase):
test_range = 3
for x in range(test_range):
RadioStation.create(
stream_url="http://example.com/radio-{}".format(x),
name="Radio {}".format(x),
homepage_url="http://example.com/update-{}".format(x),
stream_url=f"http://example.com/radio-{x}",
name=f"Radio {x}",
homepage_url=f"http://example.com/update-{x}",
)
# verify happy path is clean
@ -193,9 +193,9 @@ class RadioStationTestCase(ApiTestBase):
# Test data is sequential by design.
for x in range(test_range):
station = child[x]
self.assertTrue(station.get("streamUrl").endswith("radio-{}".format(x)))
self.assertTrue(station.get("name").endswith("Radio {}".format(x)))
self.assertTrue(station.get("homePageUrl").endswith("update-{}".format(x)))
self.assertTrue(station.get("streamUrl").endswith(f"radio-{x}"))
self.assertTrue(station.get("name").endswith(f"Radio {x}"))
self.assertTrue(station.get("homePageUrl").endswith(f"update-{x}"))
# test for non-admin access
rv, child = self._make_request(

View File

@ -23,7 +23,7 @@ class SearchTestCase(ApiTestBase):
for letter in "ABC":
folder = Folder.create(
name=letter + "rtist",
path="tests/assets/{}rtist".format(letter),
path=f"tests/assets/{letter}rtist",
root=False,
parent=root,
)

View File

@ -81,7 +81,7 @@ class TranscodingTestCase(ApiTestBase):
rv.response.close()
rv.close()
key = "{}-96.rnd".format(self.trackid)
key = f"{self.trackid}-96.rnd"
with self.app_context():
self.assertTrue(current_app.transcode_cache.has(key))
self.assertEqual(current_app.transcode_cache.size, 52000)
@ -98,7 +98,7 @@ class TranscodingTestCase(ApiTestBase):
rv.response.close()
rv.close()
key = "{}-96.rnd".format(self.trackid)
key = f"{self.trackid}-96.rnd"
with self.app_context():
self.assertFalse(current_app.transcode_cache.has(key))
self.assertEqual(current_app.transcode_cache.size, 0)
@ -118,7 +118,7 @@ class TranscodingTestCase(ApiTestBase):
rv.response.close()
rv.close()
key = "{}-96.rnd".format(self.trackid)
key = f"{self.trackid}-96.rnd"
with self.app_context():
self.assertTrue(current_app.transcode_cache.has(key))
self.assertEqual(current_app.transcode_cache.size, 52000)

View File

@ -41,7 +41,7 @@ class CLITestCase(unittest.TestCase):
return rv
def __add_folder(self, name, path, expect_fail=False):
self.__invoke("folder add {} {}".format(name, shlex.quote(path)), expect_fail)
self.__invoke(f"folder add {name} {shlex.quote(path)}", expect_fail)
def test_folder_add(self):
with tempfile.TemporaryDirectory() as d:

View File

@ -86,7 +86,7 @@ class UserTestCase(FrontendTestBase):
rv = self.client.get("/user/whatever/changeusername", follow_redirects=True)
self.assertIn("badly formed", rv.data)
rv = self.client.get(
"/user/{}/changeusername".format(uuid.uuid4()), follow_redirects=True
f"/user/{uuid.uuid4()}/changeusername", follow_redirects=True
)
self.assertIn("No such user", rv.data)
self.client.get("/user/{}/changeusername".format(self.users["bob"]))
@ -96,7 +96,7 @@ class UserTestCase(FrontendTestBase):
rv = self.client.post("/user/whatever/changeusername", follow_redirects=True)
self.assertIn("badly formed", rv.data)
rv = self.client.post(
"/user/{}/changeusername".format(uuid.uuid4()), follow_redirects=True
f"/user/{uuid.uuid4()}/changeusername", follow_redirects=True
)
self.assertIn("No such user", rv.data)

View File

@ -19,7 +19,7 @@ class Issue221TestCase(unittest.TestCase):
for i in range(3):
db.Track.create(
title="Track {}".format(i),
title=f"Track {i}",
album=album,
artist=artist,
disc=1,
@ -27,7 +27,7 @@ class Issue221TestCase(unittest.TestCase):
duration=3,
has_art=False,
bitrate=64,
path="tests/track{}".format(i),
path=f"tests/track{i}",
last_modification=2,
root_folder=root,
folder=root,