Compare commits
4 Commits
cd7f93100b
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bde1ddf465 | ||
|
|
c36a6a7f62 | ||
|
|
ce66005bb3 | ||
|
|
7982c7700b |
@@ -6,15 +6,21 @@ readme = "README.md"
|
|||||||
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
||||||
requires-python = ">=3.12"
|
requires-python = ">=3.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"audioop-lts>=0.2.2 ; python_full_version >= '3.13'",
|
||||||
"click>=8.1.8",
|
"click>=8.1.8",
|
||||||
"mpv>=1.0.7",
|
"mpv>=1.0.7",
|
||||||
"mutagen>=1.47.0",
|
"mutagen>=1.47.0",
|
||||||
|
"pydantic>=2.0",
|
||||||
|
"pydantic-settings>=2.12.0",
|
||||||
"requests>=2.32.3",
|
"requests>=2.32.3",
|
||||||
|
"shazamio>=0.8.1",
|
||||||
|
"structlog>=25.5.0",
|
||||||
"yt-dlp>=2025.1.15",
|
"yt-dlp>=2025.1.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
music = "music.__main__:cli"
|
music = "music.__main__:cli"
|
||||||
|
stream = "music.__main__:stream"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["hatchling"]
|
requires = ["hatchling"]
|
||||||
|
|||||||
@@ -1,53 +1,15 @@
|
|||||||
from datetime import datetime, timezone
|
from pydantic_settings import CliApp
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import click
|
from .cli import Cli
|
||||||
|
from .commands import Stream
|
||||||
from .save import yt
|
|
||||||
from .stream import Stream, local, play
|
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
|
||||||
def cli():
|
def cli():
|
||||||
pass
|
CliApp.run(Cli)
|
||||||
|
|
||||||
|
|
||||||
@cli.command("stream")
|
def stream():
|
||||||
@click.argument(
|
CliApp.run(Stream)
|
||||||
"stream", type=click.Choice([e.value for e in Stream]), default=Stream.KEXP
|
|
||||||
)
|
|
||||||
@click.option(
|
|
||||||
"-t",
|
|
||||||
"--timestamp",
|
|
||||||
"t",
|
|
||||||
type=click.DateTime(formats=["%Y-%m-%d %H:%M"]),
|
|
||||||
required=False,
|
|
||||||
)
|
|
||||||
def _stream(stream, t: Optional[datetime]):
|
|
||||||
"""play streams."""
|
|
||||||
|
|
||||||
if t is not None:
|
|
||||||
time = t.astimezone(tz=timezone.utc)
|
|
||||||
else:
|
|
||||||
time = None
|
|
||||||
play(stream, time)
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command("random")
|
|
||||||
def _random():
|
|
||||||
"""play randomly from local music."""
|
|
||||||
local()
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command("yt")
|
|
||||||
@click.argument("url", required=True)
|
|
||||||
def _save(url):
|
|
||||||
"""
|
|
||||||
download a youtube song from URL to current quarter dir.
|
|
||||||
|
|
||||||
URL: youtube url to download
|
|
||||||
"""
|
|
||||||
yt(url)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
24
src/music/cli.py
Normal file
24
src/music/cli.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
from pydantic import Field
|
||||||
|
from pydantic_settings import BaseSettings, CliApp, CliSubCommand, get_subcommand
|
||||||
|
|
||||||
|
from music.commands import Random, Stream, YtDownload
|
||||||
|
|
||||||
|
|
||||||
|
class Cli(BaseSettings):
|
||||||
|
model_config = {
|
||||||
|
"env_file": [".env"],
|
||||||
|
"cli_kebab_case": True,
|
||||||
|
"cli_use_class_docs_for_groups": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
download: CliSubCommand[YtDownload] = Field(alias="yt")
|
||||||
|
stream: CliSubCommand[Stream] = Field(alias="stream")
|
||||||
|
random: CliSubCommand[Random] = Field(alias="random")
|
||||||
|
|
||||||
|
def cli_cmd(self) -> None:
|
||||||
|
if (cmd := get_subcommand(self, is_required=False)) is not None:
|
||||||
|
CliApp.run_subcommand(self, cli_cmd_method_name="run")
|
||||||
|
else:
|
||||||
|
CliApp.run(Cli, cli_args=["--help"])
|
||||||
|
|
||||||
|
pass
|
||||||
5
src/music/commands/__init__.py
Normal file
5
src/music/commands/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from .download import YtDownload
|
||||||
|
from .random import Random
|
||||||
|
from .stream import Stream
|
||||||
|
|
||||||
|
__all__ = ["YtDownload", "Random", "Stream"]
|
||||||
143
src/music/commands/download.py
Normal file
143
src/music/commands/download.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import ClassVar
|
||||||
|
|
||||||
|
from pydantic_settings import CliImplicitFlag, CliPositionalArg
|
||||||
|
import structlog
|
||||||
|
import yt_dlp
|
||||||
|
from mutagen.easyid3 import EasyID3
|
||||||
|
from pydantic import Field, computed_field
|
||||||
|
from shazamio import Shazam
|
||||||
|
|
||||||
|
from music.dates import DateParse
|
||||||
|
from music.models import Command
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
def _clean(value) -> str | None:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if isinstance(value, list):
|
||||||
|
value = ", ".join(v for v in value if v)
|
||||||
|
value = str(value).strip()
|
||||||
|
return value or None
|
||||||
|
|
||||||
|
|
||||||
|
def _prompt(label: str, default: str | None = None) -> str | None:
|
||||||
|
suffix = f" [{default}]" if default else ""
|
||||||
|
try:
|
||||||
|
value = input(f"{label}{suffix}: ").strip()
|
||||||
|
except EOFError:
|
||||||
|
return default
|
||||||
|
return value or default
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_artist_track(info: dict) -> tuple[str | None, str | None]:
|
||||||
|
artist = (
|
||||||
|
_clean(info.get("artists"))
|
||||||
|
or _clean(info.get("artist"))
|
||||||
|
or _clean(info.get("creators"))
|
||||||
|
or _clean(info.get("creator"))
|
||||||
|
)
|
||||||
|
track = _clean(info.get("track")) or _clean(info.get("alt_title"))
|
||||||
|
|
||||||
|
title = _clean(info.get("title"))
|
||||||
|
if (artist is None or track is None) and title and " - " in title:
|
||||||
|
parsed_artist, parsed_track = title.split(" - ", 1)
|
||||||
|
parsed_track = re.sub(r"\s*[\(\[][^\)\]]*[\)\]]\s*$", "", parsed_track).strip()
|
||||||
|
if artist is None:
|
||||||
|
artist = _clean(parsed_artist)
|
||||||
|
if track is None:
|
||||||
|
track = _clean(parsed_track)
|
||||||
|
return artist, track
|
||||||
|
|
||||||
|
|
||||||
|
def _shazam_lookup(filepath: str) -> tuple[str | None, str | None]:
|
||||||
|
async def recognize() -> dict:
|
||||||
|
return await Shazam().recognize(filepath)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = asyncio.run(recognize())
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("shazam lookup failed", error=str(e))
|
||||||
|
return None, None
|
||||||
|
track_info = result.get("track") or {}
|
||||||
|
return _clean(track_info.get("subtitle")), _clean(track_info.get("title"))
|
||||||
|
|
||||||
|
|
||||||
|
def current_quarter() -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
|
||||||
|
class YtDownload(Command):
|
||||||
|
"""
|
||||||
|
download a youtube song from {URL} to current quarter dir.
|
||||||
|
"""
|
||||||
|
|
||||||
|
url: CliPositionalArg[str] = Field(description="youtube url to download")
|
||||||
|
ydl_opts: ClassVar = {
|
||||||
|
"format": "mp3/bestaudio/best",
|
||||||
|
"postprocessors": [
|
||||||
|
{
|
||||||
|
"key": "FFmpegExtractAudio",
|
||||||
|
"preferredcodec": "mp3",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"remote_components": ["ejs:github"],
|
||||||
|
}
|
||||||
|
parents: CliImplicitFlag[bool] = Field(
|
||||||
|
default=True, description="create the parent dirs if not exists"
|
||||||
|
)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def quarter_dir(self) -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
os.chdir(Path.home() / "Downloads")
|
||||||
|
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
||||||
|
info_dict = ydl.extract_info(self.url, download=True)
|
||||||
|
_ = ydl.prepare_filename(info_dict)
|
||||||
|
if info_dict is None:
|
||||||
|
raise ValueError("error downloading")
|
||||||
|
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||||
|
|
||||||
|
artist, track = _extract_artist_track(info_dict)
|
||||||
|
|
||||||
|
if artist is None or track is None:
|
||||||
|
log.info("trying shazam", file=filename)
|
||||||
|
shazam_artist, shazam_track = _shazam_lookup(filename)
|
||||||
|
artist = artist or shazam_artist
|
||||||
|
track = track or shazam_track
|
||||||
|
if shazam_artist or shazam_track:
|
||||||
|
log.info("shazam matched", artist=shazam_artist, track=shazam_track)
|
||||||
|
|
||||||
|
if artist is None or track is None:
|
||||||
|
title = info_dict.get("title") or Path(filename).stem
|
||||||
|
uploader = info_dict.get("uploader") or info_dict.get("channel")
|
||||||
|
log.info("could not parse metadata", title=title, uploader=uploader)
|
||||||
|
artist = _prompt("artist", default=artist or uploader)
|
||||||
|
track = _prompt("track", default=track or title)
|
||||||
|
|
||||||
|
base = self.quarter_dir
|
||||||
|
if not base.exists():
|
||||||
|
base.mkdir(parents=self.parents)
|
||||||
|
if track and artist:
|
||||||
|
name = f"{artist} - {track}.mp3"
|
||||||
|
new_filepath = base / name
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
|
tags = EasyID3(new_filepath)
|
||||||
|
tags["title"] = track
|
||||||
|
tags["artist"] = artist
|
||||||
|
tags.save()
|
||||||
|
else:
|
||||||
|
new_filepath = base / Path(filename).name
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
|
|
||||||
|
log.info("downloaded", path=str(new_filepath))
|
||||||
35
src/music/commands/random.py
Normal file
35
src/music/commands/random.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import subprocess
|
||||||
|
from music.models import Command
|
||||||
|
from music.paths import seasons
|
||||||
|
import random
|
||||||
|
|
||||||
|
# import mpv
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class Random(Command):
|
||||||
|
def run(self):
|
||||||
|
"""play random local songs."""
|
||||||
|
songs = []
|
||||||
|
for d in seasons():
|
||||||
|
for f in d.iterdir():
|
||||||
|
if not f.suffix == ".mp3":
|
||||||
|
continue
|
||||||
|
songs.append(f)
|
||||||
|
random.shuffle(songs)
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||||
|
for song in songs:
|
||||||
|
tmp.write(f"{song}\n".encode())
|
||||||
|
playlist = tmp.name
|
||||||
|
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||||
|
# process = subprocess.Popen(
|
||||||
|
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||||
|
# )
|
||||||
|
|
||||||
|
# process.wait()
|
||||||
|
finally:
|
||||||
|
os.remove(playlist)
|
||||||
73
src/music/commands/stream.py
Normal file
73
src/music/commands/stream.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from datetime import datetime, timezone
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from pydantic import Field
|
||||||
|
import requests
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
from music.dates import DateParse
|
||||||
|
from music.models import Command
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class Source(str, Enum):
|
||||||
|
KEXP = "kexp"
|
||||||
|
KUGS = "kugs"
|
||||||
|
CLIS = "clis"
|
||||||
|
|
||||||
|
|
||||||
|
def current_quarter() -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
|
||||||
|
def url_for_time(t) -> str:
|
||||||
|
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
log.info("getting archive", time=t)
|
||||||
|
check_url = (
|
||||||
|
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||||
|
)
|
||||||
|
response = requests.get(check_url)
|
||||||
|
if response.status_code == 200:
|
||||||
|
found = response.json().get("sg-url")
|
||||||
|
if found is None:
|
||||||
|
log.error("error getting archive", time=t)
|
||||||
|
return default
|
||||||
|
else:
|
||||||
|
log.error("error getting archive", time=t, status=response.status_code)
|
||||||
|
return default
|
||||||
|
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
class Stream(Command):
|
||||||
|
"""play streams."""
|
||||||
|
|
||||||
|
source: Source = Field(default=Source.KEXP)
|
||||||
|
for_date: datetime | None = Field(default=None)
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
if self.for_date is not None:
|
||||||
|
time = self.for_date.astimezone(tz=timezone.utc)
|
||||||
|
else:
|
||||||
|
time = None
|
||||||
|
self.play(self.source, time)
|
||||||
|
|
||||||
|
def play(self, stream, t=None):
|
||||||
|
"""Play streams using mpv."""
|
||||||
|
match stream:
|
||||||
|
case Source.KEXP:
|
||||||
|
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
if t:
|
||||||
|
url = url_for_time(t)
|
||||||
|
case Source.KUGS:
|
||||||
|
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||||
|
case Source.CLIS:
|
||||||
|
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||||
|
case _:
|
||||||
|
raise ValueError(f"unrecognized stream: {stream}")
|
||||||
|
subprocess.run(["mpv", url])
|
||||||
@@ -1,6 +1,8 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional, Tuple
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, computed_field
|
||||||
|
|
||||||
|
|
||||||
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
||||||
if for_date is None:
|
if for_date is None:
|
||||||
@@ -16,3 +18,27 @@ def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
|||||||
else:
|
else:
|
||||||
quarter = "fall"
|
quarter = "fall"
|
||||||
return quarter, year
|
return quarter, year
|
||||||
|
|
||||||
|
|
||||||
|
class DateParse(BaseModel):
|
||||||
|
date: datetime = Field(default_factory=datetime.now)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def quarter(self) -> str:
|
||||||
|
match self.date.month:
|
||||||
|
case 12 | 1 | 2:
|
||||||
|
return "winter"
|
||||||
|
case 3 | 4 | 5:
|
||||||
|
return "spring"
|
||||||
|
case 6 | 7 | 8:
|
||||||
|
return "summer"
|
||||||
|
case 9 | 10 | 11:
|
||||||
|
return "fall"
|
||||||
|
case _:
|
||||||
|
raise ValueError(self.date.month)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def year(self) -> int:
|
||||||
|
return self.date.year
|
||||||
|
|||||||
9
src/music/models.py
Normal file
9
src/music/models.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseSettings, ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def run(self) -> None:
|
||||||
|
pass
|
||||||
@@ -3,11 +3,14 @@ from enum import Enum
|
|||||||
from .paths import seasons
|
from .paths import seasons
|
||||||
import random
|
import random
|
||||||
import requests
|
import requests
|
||||||
|
import structlog
|
||||||
|
|
||||||
# import mpv
|
# import mpv
|
||||||
import tempfile
|
import tempfile
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
class Stream(str, Enum):
|
class Stream(str, Enum):
|
||||||
KEXP = "kexp"
|
KEXP = "kexp"
|
||||||
@@ -18,7 +21,7 @@ class Stream(str, Enum):
|
|||||||
def url_for_time(t) -> str:
|
def url_for_time(t) -> str:
|
||||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
print(f"getting archive: {t}")
|
log.info("getting archive", time=t)
|
||||||
check_url = (
|
check_url = (
|
||||||
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||||
)
|
)
|
||||||
@@ -26,10 +29,10 @@ def url_for_time(t) -> str:
|
|||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
found = response.json().get("sg-url")
|
found = response.json().get("sg-url")
|
||||||
if found is None:
|
if found is None:
|
||||||
print(f"error getting archive: {t}")
|
log.error("error getting archive", time=t)
|
||||||
return default
|
return default
|
||||||
else:
|
else:
|
||||||
print(f"error getting archive: {t}")
|
log.error("error getting archive", time=t, status=response.status_code)
|
||||||
return default
|
return default
|
||||||
|
|
||||||
return found
|
return found
|
||||||
|
|||||||
Reference in New Issue
Block a user