Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 92 additions & 63 deletions beetsplug/smartplaylist.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

"""Generates smart playlists based on beets queries."""

from __future__ import annotations

import os
from typing import Any, TypeAlias
from urllib.parse import quote
from urllib.request import pathname2url

from beets import ui
from beets.dbcore import OrQuery
from beets.dbcore.query import MultipleSort, ParsingError
from beets.library import Album, Item, parse_query_string
from beets.dbcore.query import ParsingError, Query, Sort
from beets.library import Album, Item, Library, parse_query_string
from beets.plugins import BeetsPlugin
from beets.plugins import send as send_event
from beets.util import (
Expand All @@ -34,9 +36,17 @@
syspath,
)

QueryAndSort = tuple[Query, Sort]
PlaylistQuery = Query | tuple[QueryAndSort, ...] | None
PlaylistMatch: TypeAlias = tuple[
str,
tuple[PlaylistQuery, Sort | None],
tuple[PlaylistQuery, Sort | None],
]


class SmartPlaylistPlugin(BeetsPlugin):
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.config.add(
{
Expand All @@ -55,13 +65,13 @@ def __init__(self):
)

self.config["prefix"].redact = True # May contain username/password.
self._matched_playlists = None
self._unmatched_playlists = None
self._matched_playlists: set[PlaylistMatch] = set()
self._unmatched_playlists: set[PlaylistMatch] = set()

if self.config["auto"]:
self.register_listener("database_change", self.db_change)

def commands(self):
def commands(self) -> list[ui.Subcommand]:
spl_update = ui.Subcommand(
"splupdate",
help="update the smart playlists. Playlist names may be "
Expand Down Expand Up @@ -124,18 +134,18 @@ def commands(self):
spl_update.func = self.update_cmd
return [spl_update]

def update_cmd(self, lib, opts, args):
def update_cmd(self, lib: Library, opts: Any, args: list[str]) -> None:
self.build_queries()
if args:
args = set(args)
for a in list(args):
args_set = set(args)
for a in list(args_set):
if not a.endswith(".m3u"):
args.add(f"{a}.m3u")
args_set.add(f"{a}.m3u")

playlists = {
(name, q, a_q)
for name, q, a_q in self._unmatched_playlists
if name in args
if name in args_set
}
if not playlists:
unmatched = [name for name, _, _ in self._unmatched_playlists]
Expand All @@ -151,16 +161,32 @@ def update_cmd(self, lib, opts, args):
self.__apply_opts_to_config(opts)
self.update_playlists(lib, opts.pretend)

def __apply_opts_to_config(self, opts):
def __apply_opts_to_config(self, opts: Any) -> None:
for k, v in opts.__dict__.items():
if v is not None and k in self.config:
self.config[k] = v

def build_queries(self):
def _parse_one_query(
self, playlist: dict[str, Any], key: str, model_cls: type
) -> tuple[PlaylistQuery, Sort | None]:
qs = playlist.get(key)
if qs is None:
return None, None
if isinstance(qs, str):
return parse_query_string(qs, model_cls)
if len(qs) == 1:
return parse_query_string(qs[0], model_cls)

queries_and_sorts: tuple[QueryAndSort, ...] = tuple(
parse_query_string(q, model_cls) for q in qs
)
return queries_and_sorts, None

def build_queries(self) -> None:
"""
Instantiate queries for the playlists.

Each playlist has 2 queries: one or items one for albums, each with a
Each playlist has 2 queries: one for items, one for albums, each with a
sort. We must also remember its name. _unmatched_playlists is a set of
tuples (name, (q, q_sort), (album_q, album_q_sort)).

Expand All @@ -169,7 +195,7 @@ def build_queries(self):
More precisely
- it will be NullSort when a playlist query ('query' or 'album_query')
is a single item or a list with 1 element
- it will be None when there are multiple items i a query
- it will be None when there are multiple items in a query
"""
self._unmatched_playlists = set()
self._matched_playlists = set()
Expand All @@ -179,55 +205,37 @@ def build_queries(self):
self._log.warning("playlist configuration is missing name")
continue

playlist_data = (playlist["name"],)
try:
for key, model_cls in (("query", Item), ("album_query", Album)):
qs = playlist.get(key)
if qs is None:
query_and_sort = None, None
elif isinstance(qs, str):
query_and_sort = parse_query_string(qs, model_cls)
elif len(qs) == 1:
query_and_sort = parse_query_string(qs[0], model_cls)
else:
# multiple queries and sorts
queries, sorts = zip(
*(parse_query_string(q, model_cls) for q in qs)
)
query = OrQuery(queries)
final_sorts = []
for s in sorts:
if s:
if isinstance(s, MultipleSort):
final_sorts += s.sorts
else:
final_sorts.append(s)
if not final_sorts:
sort = None
elif len(final_sorts) == 1:
(sort,) = final_sorts
else:
sort = MultipleSort(final_sorts)
query_and_sort = query, sort

playlist_data += (query_and_sort,)

q_match = self._parse_one_query(playlist, "query", Item)
a_match = self._parse_one_query(playlist, "album_query", Album)
except ParsingError as exc:
self._log.warning(
"invalid query in playlist {}: {}", playlist["name"], exc
)
continue

self._unmatched_playlists.add(playlist_data)

def matches(self, model, query, album_query):
if album_query and isinstance(model, Album):
return album_query.match(model)
if query and isinstance(model, Item):
return query.match(model)
self._unmatched_playlists.add((playlist["name"], q_match, a_match))

def _matches_query(self, model: Item | Album, query: PlaylistQuery) -> bool:
if not query:
return False
if isinstance(query, (list, tuple)):
return any(q.match(model) for q, _ in query)
return query.match(model)

def matches(
self,
model: Item | Album,
query: PlaylistQuery,
album_query: PlaylistQuery,
) -> bool:
if isinstance(model, Album):
return self._matches_query(model, album_query)
if isinstance(model, Item):
return self._matches_query(model, query)
return False

def db_change(self, lib, model):
def db_change(self, lib: Library, model: Item | Album) -> None:
if self._unmatched_playlists is None:
self.build_queries()

Expand All @@ -240,7 +248,7 @@ def db_change(self, lib, model):

self._unmatched_playlists -= self._matched_playlists

def update_playlists(self, lib, pretend=False):
def update_playlists(self, lib: Library, pretend: bool = False) -> None:
if pretend:
self._log.info(
"Showing query results for {} smart playlists...",
Expand All @@ -260,7 +268,7 @@ def update_playlists(self, lib, pretend=False):
relative_to = normpath(relative_to)

# Maps playlist filenames to lists of track filenames.
m3us = {}
m3us: dict[str, list[PlaylistItem]] = {}

for playlist in self._matched_playlists:
name, (query, q_sort), (album_query, a_q_sort) = playlist
Expand All @@ -270,9 +278,28 @@ def update_playlists(self, lib, pretend=False):
self._log.info("Creating playlist {}", name)
items = []

if query:
# Handle tuple/list of queries (preserves order)
# Track seen items to avoid duplicates when an item matches
# multiple queries
seen_ids = set()

if isinstance(query, (list, tuple)):
for q, sort in query:
for item in lib.items(q, sort):
if item.id not in seen_ids:
items.append(item)
seen_ids.add(item.id)
elif query:
items.extend(lib.items(query, q_sort))
if album_query:

if isinstance(album_query, (list, tuple)):
for q, sort in album_query:
for album in lib.albums(q, sort):
for item in album.items():
if item.id not in seen_ids:
items.append(item)
seen_ids.add(item.id)
elif album_query:
for album in lib.albums(album_query, a_q_sort):
items.extend(album.items())

Expand All @@ -292,7 +319,9 @@ def update_playlists(self, lib, pretend=False):
if self.config["forward_slash"].get():
item_uri = path_as_posix(item_uri)
if self.config["urlencode"]:
item_uri = bytestring_path(pathname2url(item_uri))
item_uri = bytestring_path(
pathname2url(os.fsdecode(item_uri))
)
item_uri = prefix + item_uri

if item_uri not in m3us[m3u_name]:
Expand Down Expand Up @@ -336,7 +365,7 @@ def update_playlists(self, lib, pretend=False):
)
f.write(comment.encode("utf-8") + entry.uri + b"\n")
# Send an event when playlists were updated.
send_event("smartplaylist_update")
send_event("smartplaylist_update") # type: ignore

if pretend:
self._log.info(
Expand All @@ -348,6 +377,6 @@ def update_playlists(self, lib, pretend=False):


class PlaylistItem:
def __init__(self, item, uri):
def __init__(self, item: Item, uri: bytes) -> None:
self.item = item
self.uri = uri
4 changes: 4 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ New features:

Bug fixes:

- :doc:`/plugins/smartplaylist`: Fixed an issue where multiple queries in a
playlist configuration were not preserving their order, causing items to
appear in database order rather than the order specified in the config.
:bug:`6183`
- :doc:`plugins/inline`: Fix recursion error when an inline field definition
shadows a built-in item field (e.g., redefining ``track_no``). Inline
expressions now skip self-references during evaluation to avoid infinite
Expand Down
Loading