Compare commits
5 Commits
323d6c0499
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bde1ddf465 | ||
|
|
c36a6a7f62 | ||
|
|
ce66005bb3 | ||
|
|
7982c7700b | ||
|
|
cd7f93100b |
@@ -6,15 +6,21 @@ readme = "README.md"
|
||||
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"audioop-lts>=0.2.2 ; python_full_version >= '3.13'",
|
||||
"click>=8.1.8",
|
||||
"mpv>=1.0.7",
|
||||
"mutagen>=1.47.0",
|
||||
"pydantic>=2.0",
|
||||
"pydantic-settings>=2.12.0",
|
||||
"requests>=2.32.3",
|
||||
"shazamio>=0.8.1",
|
||||
"structlog>=25.5.0",
|
||||
"yt-dlp>=2025.1.15",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
music = "music.__main__:cli"
|
||||
stream = "music.__main__:stream"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
|
||||
@@ -1,51 +1,15 @@
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
from pydantic_settings import CliApp
|
||||
|
||||
import click
|
||||
|
||||
from .save import yt
|
||||
from .stream import Stream, local, play
|
||||
from .cli import Cli
|
||||
from .commands import Stream
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
CliApp.run(Cli)
|
||||
|
||||
|
||||
@cli.command("stream")
|
||||
@click.argument(
|
||||
"stream", type=click.Choice([e.value for e in Stream]), default=Stream.KEXP
|
||||
)
|
||||
@click.option(
|
||||
"-t",
|
||||
"--timestamp",
|
||||
"t",
|
||||
type=click.DateTime(formats=["%Y-%m-%d %H:%M"]),
|
||||
required=False,
|
||||
)
|
||||
def _stream(stream, t: Optional[datetime]):
|
||||
"""play streams."""
|
||||
|
||||
if t is not None:
|
||||
time = t.astimezone(tz=timezone.utc)
|
||||
play(stream, time)
|
||||
|
||||
|
||||
@cli.command("random")
|
||||
def _random():
|
||||
"""play randomly from local music."""
|
||||
local()
|
||||
|
||||
|
||||
@cli.command("yt")
|
||||
@click.argument("url", required=True)
|
||||
def _save(url):
|
||||
"""
|
||||
download a youtube song from URL to current quarter dir.
|
||||
|
||||
URL: youtube url to download
|
||||
"""
|
||||
yt(url)
|
||||
def stream():
|
||||
CliApp.run(Stream)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
24
src/music/cli.py
Normal file
24
src/music/cli.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings, CliApp, CliSubCommand, get_subcommand
|
||||
|
||||
from music.commands import Random, Stream, YtDownload
|
||||
|
||||
|
||||
class Cli(BaseSettings):
|
||||
model_config = {
|
||||
"env_file": [".env"],
|
||||
"cli_kebab_case": True,
|
||||
"cli_use_class_docs_for_groups": True,
|
||||
}
|
||||
|
||||
download: CliSubCommand[YtDownload] = Field(alias="yt")
|
||||
stream: CliSubCommand[Stream] = Field(alias="stream")
|
||||
random: CliSubCommand[Random] = Field(alias="random")
|
||||
|
||||
def cli_cmd(self) -> None:
|
||||
if (cmd := get_subcommand(self, is_required=False)) is not None:
|
||||
CliApp.run_subcommand(self, cli_cmd_method_name="run")
|
||||
else:
|
||||
CliApp.run(Cli, cli_args=["--help"])
|
||||
|
||||
pass
|
||||
5
src/music/commands/__init__.py
Normal file
5
src/music/commands/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .download import YtDownload
|
||||
from .random import Random
|
||||
from .stream import Stream
|
||||
|
||||
__all__ = ["YtDownload", "Random", "Stream"]
|
||||
143
src/music/commands/download.py
Normal file
143
src/music/commands/download.py
Normal file
@@ -0,0 +1,143 @@
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import ClassVar
|
||||
|
||||
from pydantic_settings import CliImplicitFlag, CliPositionalArg
|
||||
import structlog
|
||||
import yt_dlp
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from pydantic import Field, computed_field
|
||||
from shazamio import Shazam
|
||||
|
||||
from music.dates import DateParse
|
||||
from music.models import Command
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
def _clean(value) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, list):
|
||||
value = ", ".join(v for v in value if v)
|
||||
value = str(value).strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def _prompt(label: str, default: str | None = None) -> str | None:
|
||||
suffix = f" [{default}]" if default else ""
|
||||
try:
|
||||
value = input(f"{label}{suffix}: ").strip()
|
||||
except EOFError:
|
||||
return default
|
||||
return value or default
|
||||
|
||||
|
||||
def _extract_artist_track(info: dict) -> tuple[str | None, str | None]:
|
||||
artist = (
|
||||
_clean(info.get("artists"))
|
||||
or _clean(info.get("artist"))
|
||||
or _clean(info.get("creators"))
|
||||
or _clean(info.get("creator"))
|
||||
)
|
||||
track = _clean(info.get("track")) or _clean(info.get("alt_title"))
|
||||
|
||||
title = _clean(info.get("title"))
|
||||
if (artist is None or track is None) and title and " - " in title:
|
||||
parsed_artist, parsed_track = title.split(" - ", 1)
|
||||
parsed_track = re.sub(r"\s*[\(\[][^\)\]]*[\)\]]\s*$", "", parsed_track).strip()
|
||||
if artist is None:
|
||||
artist = _clean(parsed_artist)
|
||||
if track is None:
|
||||
track = _clean(parsed_track)
|
||||
return artist, track
|
||||
|
||||
|
||||
def _shazam_lookup(filepath: str) -> tuple[str | None, str | None]:
|
||||
async def recognize() -> dict:
|
||||
return await Shazam().recognize(filepath)
|
||||
|
||||
try:
|
||||
result = asyncio.run(recognize())
|
||||
except Exception as e:
|
||||
log.warning("shazam lookup failed", error=str(e))
|
||||
return None, None
|
||||
track_info = result.get("track") or {}
|
||||
return _clean(track_info.get("subtitle")), _clean(track_info.get("title"))
|
||||
|
||||
|
||||
def current_quarter() -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
|
||||
class YtDownload(Command):
|
||||
"""
|
||||
download a youtube song from {URL} to current quarter dir.
|
||||
"""
|
||||
|
||||
url: CliPositionalArg[str] = Field(description="youtube url to download")
|
||||
ydl_opts: ClassVar = {
|
||||
"format": "mp3/bestaudio/best",
|
||||
"postprocessors": [
|
||||
{
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "mp3",
|
||||
}
|
||||
],
|
||||
"remote_components": ["ejs:github"],
|
||||
}
|
||||
parents: CliImplicitFlag[bool] = Field(
|
||||
default=True, description="create the parent dirs if not exists"
|
||||
)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def quarter_dir(self) -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
def run(self) -> None:
|
||||
os.chdir(Path.home() / "Downloads")
|
||||
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
||||
info_dict = ydl.extract_info(self.url, download=True)
|
||||
_ = ydl.prepare_filename(info_dict)
|
||||
if info_dict is None:
|
||||
raise ValueError("error downloading")
|
||||
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||
|
||||
artist, track = _extract_artist_track(info_dict)
|
||||
|
||||
if artist is None or track is None:
|
||||
log.info("trying shazam", file=filename)
|
||||
shazam_artist, shazam_track = _shazam_lookup(filename)
|
||||
artist = artist or shazam_artist
|
||||
track = track or shazam_track
|
||||
if shazam_artist or shazam_track:
|
||||
log.info("shazam matched", artist=shazam_artist, track=shazam_track)
|
||||
|
||||
if artist is None or track is None:
|
||||
title = info_dict.get("title") or Path(filename).stem
|
||||
uploader = info_dict.get("uploader") or info_dict.get("channel")
|
||||
log.info("could not parse metadata", title=title, uploader=uploader)
|
||||
artist = _prompt("artist", default=artist or uploader)
|
||||
track = _prompt("track", default=track or title)
|
||||
|
||||
base = self.quarter_dir
|
||||
if not base.exists():
|
||||
base.mkdir(parents=self.parents)
|
||||
if track and artist:
|
||||
name = f"{artist} - {track}.mp3"
|
||||
new_filepath = base / name
|
||||
os.rename(filename, new_filepath)
|
||||
tags = EasyID3(new_filepath)
|
||||
tags["title"] = track
|
||||
tags["artist"] = artist
|
||||
tags.save()
|
||||
else:
|
||||
new_filepath = base / Path(filename).name
|
||||
os.rename(filename, new_filepath)
|
||||
|
||||
log.info("downloaded", path=str(new_filepath))
|
||||
35
src/music/commands/random.py
Normal file
35
src/music/commands/random.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import subprocess
|
||||
from music.models import Command
|
||||
from music.paths import seasons
|
||||
import random
|
||||
|
||||
# import mpv
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
class Random(Command):
|
||||
def run(self):
|
||||
"""play random local songs."""
|
||||
songs = []
|
||||
for d in seasons():
|
||||
for f in d.iterdir():
|
||||
if not f.suffix == ".mp3":
|
||||
continue
|
||||
songs.append(f)
|
||||
random.shuffle(songs)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||
for song in songs:
|
||||
tmp.write(f"{song}\n".encode())
|
||||
playlist = tmp.name
|
||||
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||
|
||||
try:
|
||||
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||
# process = subprocess.Popen(
|
||||
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||
# )
|
||||
|
||||
# process.wait()
|
||||
finally:
|
||||
os.remove(playlist)
|
||||
73
src/music/commands/stream.py
Normal file
73
src/music/commands/stream.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
from pydantic import Field
|
||||
import requests
|
||||
import structlog
|
||||
|
||||
from music.dates import DateParse
|
||||
from music.models import Command
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
class Source(str, Enum):
|
||||
KEXP = "kexp"
|
||||
KUGS = "kugs"
|
||||
CLIS = "clis"
|
||||
|
||||
|
||||
def current_quarter() -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
|
||||
def url_for_time(t) -> str:
|
||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
log.info("getting archive", time=t)
|
||||
check_url = (
|
||||
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||
)
|
||||
response = requests.get(check_url)
|
||||
if response.status_code == 200:
|
||||
found = response.json().get("sg-url")
|
||||
if found is None:
|
||||
log.error("error getting archive", time=t)
|
||||
return default
|
||||
else:
|
||||
log.error("error getting archive", time=t, status=response.status_code)
|
||||
return default
|
||||
|
||||
return found
|
||||
|
||||
|
||||
class Stream(Command):
|
||||
"""play streams."""
|
||||
|
||||
source: Source = Field(default=Source.KEXP)
|
||||
for_date: datetime | None = Field(default=None)
|
||||
|
||||
def run(self) -> None:
|
||||
if self.for_date is not None:
|
||||
time = self.for_date.astimezone(tz=timezone.utc)
|
||||
else:
|
||||
time = None
|
||||
self.play(self.source, time)
|
||||
|
||||
def play(self, stream, t=None):
|
||||
"""Play streams using mpv."""
|
||||
match stream:
|
||||
case Source.KEXP:
|
||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
if t:
|
||||
url = url_for_time(t)
|
||||
case Source.KUGS:
|
||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||
case Source.CLIS:
|
||||
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||
case _:
|
||||
raise ValueError(f"unrecognized stream: {stream}")
|
||||
subprocess.run(["mpv", url])
|
||||
@@ -1,6 +1,8 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pydantic import BaseModel, Field, computed_field
|
||||
|
||||
|
||||
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
||||
if for_date is None:
|
||||
@@ -16,3 +18,27 @@ def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
||||
else:
|
||||
quarter = "fall"
|
||||
return quarter, year
|
||||
|
||||
|
||||
class DateParse(BaseModel):
|
||||
date: datetime = Field(default_factory=datetime.now)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def quarter(self) -> str:
|
||||
match self.date.month:
|
||||
case 12 | 1 | 2:
|
||||
return "winter"
|
||||
case 3 | 4 | 5:
|
||||
return "spring"
|
||||
case 6 | 7 | 8:
|
||||
return "summer"
|
||||
case 9 | 10 | 11:
|
||||
return "fall"
|
||||
case _:
|
||||
raise ValueError(self.date.month)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def year(self) -> int:
|
||||
return self.date.year
|
||||
|
||||
9
src/music/models.py
Normal file
9
src/music/models.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class Command(BaseSettings, ABC):
|
||||
@abstractmethod
|
||||
def run(self) -> None:
|
||||
pass
|
||||
@@ -3,11 +3,14 @@ from enum import Enum
|
||||
from .paths import seasons
|
||||
import random
|
||||
import requests
|
||||
import structlog
|
||||
|
||||
# import mpv
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
class Stream(str, Enum):
|
||||
KEXP = "kexp"
|
||||
@@ -15,23 +18,33 @@ class Stream(str, Enum):
|
||||
CLIS = "clis"
|
||||
|
||||
|
||||
def url_for_time(t) -> str:
|
||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
log.info("getting archive", time=t)
|
||||
check_url = (
|
||||
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||
)
|
||||
response = requests.get(check_url)
|
||||
if response.status_code == 200:
|
||||
found = response.json().get("sg-url")
|
||||
if found is None:
|
||||
log.error("error getting archive", time=t)
|
||||
return default
|
||||
else:
|
||||
log.error("error getting archive", time=t, status=response.status_code)
|
||||
return default
|
||||
|
||||
return found
|
||||
|
||||
|
||||
def play(stream, t=None):
|
||||
"""Play streams using mpv."""
|
||||
match stream:
|
||||
case Stream.KEXP:
|
||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
if t:
|
||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
print(f"getting archive: {t}")
|
||||
check_url = f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||
response = requests.get(check_url)
|
||||
if response.status_code == 200:
|
||||
url = response.json().get("sg-url")
|
||||
if url is None:
|
||||
print(f"error getting archive: {t}")
|
||||
url = default
|
||||
else:
|
||||
url = default
|
||||
url = url_for_time(t)
|
||||
case Stream.KUGS:
|
||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||
case Stream.CLIS:
|
||||
|
||||
Reference in New Issue
Block a user