Compare commits

..

4 Commits

Author SHA1 Message Date
matt
bde1ddf465 add shazm recog fallback. 2026-05-12 20:54:54 -07:00
matt
c36a6a7f62 replace prints with structlog. 2026-05-06 16:11:30 -07:00
matt
ce66005bb3 fix yt download: save to quarter dir, parse title for artist/track. 2026-05-06 16:08:08 -07:00
matt
7982c7700b migrate cli from click to pydantic-settings. 2026-05-06 15:57:08 -07:00
11 changed files with 1295 additions and 102 deletions

View File

@@ -6,15 +6,21 @@ readme = "README.md"
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }] authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"audioop-lts>=0.2.2 ; python_full_version >= '3.13'",
"click>=8.1.8", "click>=8.1.8",
"mpv>=1.0.7", "mpv>=1.0.7",
"mutagen>=1.47.0", "mutagen>=1.47.0",
"pydantic>=2.0",
"pydantic-settings>=2.12.0",
"requests>=2.32.3", "requests>=2.32.3",
"shazamio>=0.8.1",
"structlog>=25.5.0",
"yt-dlp>=2025.1.15", "yt-dlp>=2025.1.15",
] ]
[project.scripts] [project.scripts]
music = "music.__main__:cli" music = "music.__main__:cli"
stream = "music.__main__:stream"
[build-system] [build-system]
requires = ["hatchling"] requires = ["hatchling"]

View File

@@ -1,53 +1,15 @@
from datetime import datetime, timezone from pydantic_settings import CliApp
from typing import Optional
import click from .cli import Cli
from .commands import Stream
from .save import yt
from .stream import Stream, local, play
@click.group()
def cli(): def cli():
pass CliApp.run(Cli)
@cli.command("stream") def stream():
@click.argument( CliApp.run(Stream)
"stream", type=click.Choice([e.value for e in Stream]), default=Stream.KEXP
)
@click.option(
"-t",
"--timestamp",
"t",
type=click.DateTime(formats=["%Y-%m-%d %H:%M"]),
required=False,
)
def _stream(stream, t: Optional[datetime]):
"""play streams."""
if t is not None:
time = t.astimezone(tz=timezone.utc)
else:
time = None
play(stream, time)
@cli.command("random")
def _random():
"""play randomly from local music."""
local()
@cli.command("yt")
@click.argument("url", required=True)
def _save(url):
"""
download a youtube song from URL to current quarter dir.
URL: youtube url to download
"""
yt(url)
if __name__ == "__main__": if __name__ == "__main__":

24
src/music/cli.py Normal file
View File

@@ -0,0 +1,24 @@
from pydantic import Field
from pydantic_settings import BaseSettings, CliApp, CliSubCommand, get_subcommand
from music.commands import Random, Stream, YtDownload
class Cli(BaseSettings):
model_config = {
"env_file": [".env"],
"cli_kebab_case": True,
"cli_use_class_docs_for_groups": True,
}
download: CliSubCommand[YtDownload] = Field(alias="yt")
stream: CliSubCommand[Stream] = Field(alias="stream")
random: CliSubCommand[Random] = Field(alias="random")
def cli_cmd(self) -> None:
if (cmd := get_subcommand(self, is_required=False)) is not None:
CliApp.run_subcommand(self, cli_cmd_method_name="run")
else:
CliApp.run(Cli, cli_args=["--help"])
pass

View File

@@ -0,0 +1,5 @@
from .download import YtDownload
from .random import Random
from .stream import Stream
__all__ = ["YtDownload", "Random", "Stream"]

View File

@@ -0,0 +1,143 @@
import asyncio
import os
import re
from pathlib import Path
from typing import ClassVar
from pydantic_settings import CliImplicitFlag, CliPositionalArg
import structlog
import yt_dlp
from mutagen.easyid3 import EasyID3
from pydantic import Field, computed_field
from shazamio import Shazam
from music.dates import DateParse
from music.models import Command
log = structlog.get_logger()
def _clean(value) -> str | None:
if value is None:
return None
if isinstance(value, list):
value = ", ".join(v for v in value if v)
value = str(value).strip()
return value or None
def _prompt(label: str, default: str | None = None) -> str | None:
suffix = f" [{default}]" if default else ""
try:
value = input(f"{label}{suffix}: ").strip()
except EOFError:
return default
return value or default
def _extract_artist_track(info: dict) -> tuple[str | None, str | None]:
artist = (
_clean(info.get("artists"))
or _clean(info.get("artist"))
or _clean(info.get("creators"))
or _clean(info.get("creator"))
)
track = _clean(info.get("track")) or _clean(info.get("alt_title"))
title = _clean(info.get("title"))
if (artist is None or track is None) and title and " - " in title:
parsed_artist, parsed_track = title.split(" - ", 1)
parsed_track = re.sub(r"\s*[\(\[][^\)\]]*[\)\]]\s*$", "", parsed_track).strip()
if artist is None:
artist = _clean(parsed_artist)
if track is None:
track = _clean(parsed_track)
return artist, track
def _shazam_lookup(filepath: str) -> tuple[str | None, str | None]:
async def recognize() -> dict:
return await Shazam().recognize(filepath)
try:
result = asyncio.run(recognize())
except Exception as e:
log.warning("shazam lookup failed", error=str(e))
return None, None
track_info = result.get("track") or {}
return _clean(track_info.get("subtitle")), _clean(track_info.get("title"))
def current_quarter() -> Path:
date = DateParse()
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
class YtDownload(Command):
"""
download a youtube song from {URL} to current quarter dir.
"""
url: CliPositionalArg[str] = Field(description="youtube url to download")
ydl_opts: ClassVar = {
"format": "mp3/bestaudio/best",
"postprocessors": [
{
"key": "FFmpegExtractAudio",
"preferredcodec": "mp3",
}
],
"remote_components": ["ejs:github"],
}
parents: CliImplicitFlag[bool] = Field(
default=True, description="create the parent dirs if not exists"
)
@computed_field()
@property
def quarter_dir(self) -> Path:
date = DateParse()
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
def run(self) -> None:
os.chdir(Path.home() / "Downloads")
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
info_dict = ydl.extract_info(self.url, download=True)
_ = ydl.prepare_filename(info_dict)
if info_dict is None:
raise ValueError("error downloading")
filename = info_dict["requested_downloads"][0]["filepath"]
artist, track = _extract_artist_track(info_dict)
if artist is None or track is None:
log.info("trying shazam", file=filename)
shazam_artist, shazam_track = _shazam_lookup(filename)
artist = artist or shazam_artist
track = track or shazam_track
if shazam_artist or shazam_track:
log.info("shazam matched", artist=shazam_artist, track=shazam_track)
if artist is None or track is None:
title = info_dict.get("title") or Path(filename).stem
uploader = info_dict.get("uploader") or info_dict.get("channel")
log.info("could not parse metadata", title=title, uploader=uploader)
artist = _prompt("artist", default=artist or uploader)
track = _prompt("track", default=track or title)
base = self.quarter_dir
if not base.exists():
base.mkdir(parents=self.parents)
if track and artist:
name = f"{artist} - {track}.mp3"
new_filepath = base / name
os.rename(filename, new_filepath)
tags = EasyID3(new_filepath)
tags["title"] = track
tags["artist"] = artist
tags.save()
else:
new_filepath = base / Path(filename).name
os.rename(filename, new_filepath)
log.info("downloaded", path=str(new_filepath))

View File

@@ -0,0 +1,35 @@
import subprocess
from music.models import Command
from music.paths import seasons
import random
# import mpv
import tempfile
import os
class Random(Command):
def run(self):
"""play random local songs."""
songs = []
for d in seasons():
for f in d.iterdir():
if not f.suffix == ".mp3":
continue
songs.append(f)
random.shuffle(songs)
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
for song in songs:
tmp.write(f"{song}\n".encode())
playlist = tmp.name
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
try:
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
# process = subprocess.Popen(
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
# )
# process.wait()
finally:
os.remove(playlist)

View File

@@ -0,0 +1,73 @@
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
import subprocess
from pydantic import Field
import requests
import structlog
from music.dates import DateParse
from music.models import Command
log = structlog.get_logger()
class Source(str, Enum):
KEXP = "kexp"
KUGS = "kugs"
CLIS = "clis"
def current_quarter() -> Path:
date = DateParse()
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
def url_for_time(t) -> str:
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
log.info("getting archive", time=t)
check_url = (
f"https://api.kexp.org/get_streaming_url/?bitrate=128&timestamp={tz}&location=1"
)
response = requests.get(check_url)
if response.status_code == 200:
found = response.json().get("sg-url")
if found is None:
log.error("error getting archive", time=t)
return default
else:
log.error("error getting archive", time=t, status=response.status_code)
return default
return found
class Stream(Command):
"""play streams."""
source: Source = Field(default=Source.KEXP)
for_date: datetime | None = Field(default=None)
def run(self) -> None:
if self.for_date is not None:
time = self.for_date.astimezone(tz=timezone.utc)
else:
time = None
self.play(self.source, time)
def play(self, stream, t=None):
"""Play streams using mpv."""
match stream:
case Source.KEXP:
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
if t:
url = url_for_time(t)
case Source.KUGS:
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
case Source.CLIS:
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
case _:
raise ValueError(f"unrecognized stream: {stream}")
subprocess.run(["mpv", url])

View File

@@ -1,6 +1,8 @@
from datetime import datetime from datetime import datetime
from typing import Optional, Tuple from typing import Optional, Tuple
from pydantic import BaseModel, Field, computed_field
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]: def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
if for_date is None: if for_date is None:
@@ -16,3 +18,27 @@ def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
else: else:
quarter = "fall" quarter = "fall"
return quarter, year return quarter, year
class DateParse(BaseModel):
date: datetime = Field(default_factory=datetime.now)
@computed_field()
@property
def quarter(self) -> str:
match self.date.month:
case 12 | 1 | 2:
return "winter"
case 3 | 4 | 5:
return "spring"
case 6 | 7 | 8:
return "summer"
case 9 | 10 | 11:
return "fall"
case _:
raise ValueError(self.date.month)
@computed_field()
@property
def year(self) -> int:
return self.date.year

9
src/music/models.py Normal file
View File

@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from pydantic_settings import BaseSettings
class Command(BaseSettings, ABC):
@abstractmethod
def run(self) -> None:
pass

View File

@@ -3,11 +3,14 @@ from enum import Enum
from .paths import seasons from .paths import seasons
import random import random
import requests import requests
import structlog
# import mpv # import mpv
import tempfile import tempfile
import os import os
log = structlog.get_logger()
class Stream(str, Enum): class Stream(str, Enum):
KEXP = "kexp" KEXP = "kexp"
@@ -18,7 +21,7 @@ class Stream(str, Enum):
def url_for_time(t) -> str: def url_for_time(t) -> str:
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3" default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ") tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
print(f"getting archive: {t}") log.info("getting archive", time=t)
check_url = ( check_url = (
f"https://api.kexp.org/get_streaming_url/?bitrate=128&timestamp={tz}&location=1" f"https://api.kexp.org/get_streaming_url/?bitrate=128&timestamp={tz}&location=1"
) )
@@ -26,10 +29,10 @@ def url_for_time(t) -> str:
if response.status_code == 200: if response.status_code == 200:
found = response.json().get("sg-url") found = response.json().get("sg-url")
if found is None: if found is None:
print(f"error getting archive: {t}") log.error("error getting archive", time=t)
return default return default
else: else:
print(f"error getting archive: {t}") log.error("error getting archive", time=t, status=response.status_code)
return default return default
return found return found

1017
uv.lock generated

File diff suppressed because it is too large Load Diff