Compare commits
8 Commits
404deff6e4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bde1ddf465 | ||
|
|
c36a6a7f62 | ||
|
|
ce66005bb3 | ||
|
|
7982c7700b | ||
|
|
cd7f93100b | ||
|
|
323d6c0499 | ||
|
|
bacf927d4b | ||
|
|
27453f505e |
175
.gitignore
vendored
Normal file
175
.gitignore
vendored
Normal file
@@ -0,0 +1,175 @@
|
||||
# Byte-compiled / optimized / DLL files
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
*$py.class
|
||||
|
||||
# C extensions
|
||||
*.so
|
||||
|
||||
# Distribution / packaging
|
||||
.Python
|
||||
build/
|
||||
develop-eggs/
|
||||
dist/
|
||||
downloads/
|
||||
eggs/
|
||||
.eggs/
|
||||
lib/
|
||||
lib64/
|
||||
parts/
|
||||
sdist/
|
||||
var/
|
||||
wheels/
|
||||
share/python-wheels/
|
||||
*.egg-info/
|
||||
.installed.cfg
|
||||
*.egg
|
||||
MANIFEST
|
||||
|
||||
# PyInstaller
|
||||
# Usually these files are written by a python script from a template
|
||||
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||
*.manifest
|
||||
*.spec
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
pip-delete-this-directory.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
htmlcov/
|
||||
.tox/
|
||||
.nox/
|
||||
.coverage
|
||||
.coverage.*
|
||||
.cache
|
||||
nosetests.xml
|
||||
coverage.xml
|
||||
*.cover
|
||||
*.py,cover
|
||||
.hypothesis/
|
||||
.pytest_cache/
|
||||
cover/
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
*.pot
|
||||
|
||||
# Django stuff:
|
||||
*.log
|
||||
local_settings.py
|
||||
db.sqlite3
|
||||
db.sqlite3-journal
|
||||
|
||||
# Flask stuff:
|
||||
instance/
|
||||
.webassets-cache
|
||||
|
||||
# Scrapy stuff:
|
||||
.scrapy
|
||||
|
||||
# Sphinx documentation
|
||||
docs/_build/
|
||||
|
||||
# PyBuilder
|
||||
.pybuilder/
|
||||
target/
|
||||
|
||||
# Jupyter Notebook
|
||||
.ipynb_checkpoints
|
||||
|
||||
# IPython
|
||||
profile_default/
|
||||
ipython_config.py
|
||||
|
||||
# pyenv
|
||||
# For a library or package, you might want to ignore these files since the code is
|
||||
# intended to run in multiple environments; otherwise, check them in:
|
||||
# .python-version
|
||||
|
||||
# pipenv
|
||||
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||
# install all needed dependencies.
|
||||
#Pipfile.lock
|
||||
|
||||
# UV
|
||||
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
#uv.lock
|
||||
|
||||
# poetry
|
||||
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||
# commonly ignored for libraries.
|
||||
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||
#poetry.lock
|
||||
|
||||
# pdm
|
||||
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||
#pdm.lock
|
||||
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||
# in version control.
|
||||
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
|
||||
.pdm.toml
|
||||
.pdm-python
|
||||
.pdm-build/
|
||||
|
||||
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||
__pypackages__/
|
||||
|
||||
# Celery stuff
|
||||
celerybeat-schedule
|
||||
celerybeat.pid
|
||||
|
||||
# SageMath parsed files
|
||||
*.sage.py
|
||||
|
||||
# Environments
|
||||
.env
|
||||
.venv
|
||||
env/
|
||||
venv/
|
||||
ENV/
|
||||
env.bak/
|
||||
venv.bak/
|
||||
|
||||
# Spyder project settings
|
||||
.spyderproject
|
||||
.spyproject
|
||||
|
||||
# Rope project settings
|
||||
.ropeproject
|
||||
|
||||
# mkdocs documentation
|
||||
/site
|
||||
|
||||
# mypy
|
||||
.mypy_cache/
|
||||
.dmypy.json
|
||||
dmypy.json
|
||||
|
||||
# Pyre type checker
|
||||
.pyre/
|
||||
|
||||
# pytype static type analyzer
|
||||
.pytype/
|
||||
|
||||
# Cython debug symbols
|
||||
cython_debug/
|
||||
|
||||
# PyCharm
|
||||
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
# Ruff stuff:
|
||||
.ruff_cache/
|
||||
|
||||
# PyPI configuration file
|
||||
.pypirc
|
||||
|
||||
@@ -4,15 +4,23 @@ version = "0.1.0"
|
||||
description = "music utilies"
|
||||
readme = "README.md"
|
||||
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
||||
requires-python = ">=3.13"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"audioop-lts>=0.2.2 ; python_full_version >= '3.13'",
|
||||
"click>=8.1.8",
|
||||
"joblib>=1.4.2",
|
||||
"python-dotenv>=1.0.1",
|
||||
"mpv>=1.0.7",
|
||||
"mutagen>=1.47.0",
|
||||
"pydantic>=2.0",
|
||||
"pydantic-settings>=2.12.0",
|
||||
"requests>=2.32.3",
|
||||
"shazamio>=0.8.1",
|
||||
"structlog>=25.5.0",
|
||||
"yt-dlp>=2025.1.15",
|
||||
]
|
||||
|
||||
[project.scripts]
|
||||
music = "music.__main__:cli"
|
||||
stream = "music.__main__:stream"
|
||||
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
asyncio
|
||||
shazamio
|
||||
click
|
||||
python_dotenv
|
||||
tqdm
|
||||
joblib
|
||||
mutagen
|
||||
@@ -1,2 +0,0 @@
|
||||
def main() -> None:
|
||||
print("Hello from music!")
|
||||
|
||||
@@ -1,53 +1,15 @@
|
||||
from enum import Enum
|
||||
import click
|
||||
import subprocess
|
||||
# from .shazam import rename
|
||||
from pydantic_settings import CliApp
|
||||
|
||||
from click.exceptions import ClickException
|
||||
from .cli import Cli
|
||||
from .commands import Stream
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
CliApp.run(Cli)
|
||||
|
||||
|
||||
class Stream(str, Enum):
|
||||
KEXP = "kexp"
|
||||
KUGS = "kugs"
|
||||
CLIS = "clis"
|
||||
|
||||
|
||||
@cli.command("stream")
|
||||
@click.option(
|
||||
"-s", "--stream", "stream", type=click.Choice(Stream), default=Stream.KEXP
|
||||
)
|
||||
def play_stream(stream):
|
||||
"""Play the KEXP stream using mpv."""
|
||||
match stream:
|
||||
case Stream.KEXP:
|
||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
case Stream.KUGS:
|
||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3""
|
||||
case Stream.CLIS:
|
||||
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||
case _:
|
||||
raise ClickException(f"unrecognized stream: {stream}")
|
||||
subprocess.run(["mpv", url])
|
||||
|
||||
@cli.command("random")
|
||||
def play_random():
|
||||
"""todo."""
|
||||
raise NotImplementedError("todo")
|
||||
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||
|
||||
@cli.command("save")
|
||||
@click.option( "-u", "--url", "url", required=True)
|
||||
def save_song(url):
|
||||
"""todo."""
|
||||
raise NotImplementedError("todo")
|
||||
|
||||
# FILENAME=$(yt-dlp {{url}} -x --audio-format mp3 | tee /dev/tty | grep 'ExtractAudio' | cut -d ' ' -f 3-)
|
||||
# $HOME/.local/lib/shazam/bin/shazam.py rename --song "$FILENAME"
|
||||
def stream():
|
||||
CliApp.run(Stream)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
24
src/music/cli.py
Normal file
24
src/music/cli.py
Normal file
@@ -0,0 +1,24 @@
|
||||
from pydantic import Field
|
||||
from pydantic_settings import BaseSettings, CliApp, CliSubCommand, get_subcommand
|
||||
|
||||
from music.commands import Random, Stream, YtDownload
|
||||
|
||||
|
||||
class Cli(BaseSettings):
|
||||
model_config = {
|
||||
"env_file": [".env"],
|
||||
"cli_kebab_case": True,
|
||||
"cli_use_class_docs_for_groups": True,
|
||||
}
|
||||
|
||||
download: CliSubCommand[YtDownload] = Field(alias="yt")
|
||||
stream: CliSubCommand[Stream] = Field(alias="stream")
|
||||
random: CliSubCommand[Random] = Field(alias="random")
|
||||
|
||||
def cli_cmd(self) -> None:
|
||||
if (cmd := get_subcommand(self, is_required=False)) is not None:
|
||||
CliApp.run_subcommand(self, cli_cmd_method_name="run")
|
||||
else:
|
||||
CliApp.run(Cli, cli_args=["--help"])
|
||||
|
||||
pass
|
||||
5
src/music/commands/__init__.py
Normal file
5
src/music/commands/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
||||
from .download import YtDownload
|
||||
from .random import Random
|
||||
from .stream import Stream
|
||||
|
||||
__all__ = ["YtDownload", "Random", "Stream"]
|
||||
143
src/music/commands/download.py
Normal file
143
src/music/commands/download.py
Normal file
@@ -0,0 +1,143 @@
|
||||
import asyncio
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import ClassVar
|
||||
|
||||
from pydantic_settings import CliImplicitFlag, CliPositionalArg
|
||||
import structlog
|
||||
import yt_dlp
|
||||
from mutagen.easyid3 import EasyID3
|
||||
from pydantic import Field, computed_field
|
||||
from shazamio import Shazam
|
||||
|
||||
from music.dates import DateParse
|
||||
from music.models import Command
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
def _clean(value) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, list):
|
||||
value = ", ".join(v for v in value if v)
|
||||
value = str(value).strip()
|
||||
return value or None
|
||||
|
||||
|
||||
def _prompt(label: str, default: str | None = None) -> str | None:
|
||||
suffix = f" [{default}]" if default else ""
|
||||
try:
|
||||
value = input(f"{label}{suffix}: ").strip()
|
||||
except EOFError:
|
||||
return default
|
||||
return value or default
|
||||
|
||||
|
||||
def _extract_artist_track(info: dict) -> tuple[str | None, str | None]:
|
||||
artist = (
|
||||
_clean(info.get("artists"))
|
||||
or _clean(info.get("artist"))
|
||||
or _clean(info.get("creators"))
|
||||
or _clean(info.get("creator"))
|
||||
)
|
||||
track = _clean(info.get("track")) or _clean(info.get("alt_title"))
|
||||
|
||||
title = _clean(info.get("title"))
|
||||
if (artist is None or track is None) and title and " - " in title:
|
||||
parsed_artist, parsed_track = title.split(" - ", 1)
|
||||
parsed_track = re.sub(r"\s*[\(\[][^\)\]]*[\)\]]\s*$", "", parsed_track).strip()
|
||||
if artist is None:
|
||||
artist = _clean(parsed_artist)
|
||||
if track is None:
|
||||
track = _clean(parsed_track)
|
||||
return artist, track
|
||||
|
||||
|
||||
def _shazam_lookup(filepath: str) -> tuple[str | None, str | None]:
|
||||
async def recognize() -> dict:
|
||||
return await Shazam().recognize(filepath)
|
||||
|
||||
try:
|
||||
result = asyncio.run(recognize())
|
||||
except Exception as e:
|
||||
log.warning("shazam lookup failed", error=str(e))
|
||||
return None, None
|
||||
track_info = result.get("track") or {}
|
||||
return _clean(track_info.get("subtitle")), _clean(track_info.get("title"))
|
||||
|
||||
|
||||
def current_quarter() -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
|
||||
class YtDownload(Command):
|
||||
"""
|
||||
download a youtube song from {URL} to current quarter dir.
|
||||
"""
|
||||
|
||||
url: CliPositionalArg[str] = Field(description="youtube url to download")
|
||||
ydl_opts: ClassVar = {
|
||||
"format": "mp3/bestaudio/best",
|
||||
"postprocessors": [
|
||||
{
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "mp3",
|
||||
}
|
||||
],
|
||||
"remote_components": ["ejs:github"],
|
||||
}
|
||||
parents: CliImplicitFlag[bool] = Field(
|
||||
default=True, description="create the parent dirs if not exists"
|
||||
)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def quarter_dir(self) -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
def run(self) -> None:
|
||||
os.chdir(Path.home() / "Downloads")
|
||||
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
||||
info_dict = ydl.extract_info(self.url, download=True)
|
||||
_ = ydl.prepare_filename(info_dict)
|
||||
if info_dict is None:
|
||||
raise ValueError("error downloading")
|
||||
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||
|
||||
artist, track = _extract_artist_track(info_dict)
|
||||
|
||||
if artist is None or track is None:
|
||||
log.info("trying shazam", file=filename)
|
||||
shazam_artist, shazam_track = _shazam_lookup(filename)
|
||||
artist = artist or shazam_artist
|
||||
track = track or shazam_track
|
||||
if shazam_artist or shazam_track:
|
||||
log.info("shazam matched", artist=shazam_artist, track=shazam_track)
|
||||
|
||||
if artist is None or track is None:
|
||||
title = info_dict.get("title") or Path(filename).stem
|
||||
uploader = info_dict.get("uploader") or info_dict.get("channel")
|
||||
log.info("could not parse metadata", title=title, uploader=uploader)
|
||||
artist = _prompt("artist", default=artist or uploader)
|
||||
track = _prompt("track", default=track or title)
|
||||
|
||||
base = self.quarter_dir
|
||||
if not base.exists():
|
||||
base.mkdir(parents=self.parents)
|
||||
if track and artist:
|
||||
name = f"{artist} - {track}.mp3"
|
||||
new_filepath = base / name
|
||||
os.rename(filename, new_filepath)
|
||||
tags = EasyID3(new_filepath)
|
||||
tags["title"] = track
|
||||
tags["artist"] = artist
|
||||
tags.save()
|
||||
else:
|
||||
new_filepath = base / Path(filename).name
|
||||
os.rename(filename, new_filepath)
|
||||
|
||||
log.info("downloaded", path=str(new_filepath))
|
||||
35
src/music/commands/random.py
Normal file
35
src/music/commands/random.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import subprocess
|
||||
from music.models import Command
|
||||
from music.paths import seasons
|
||||
import random
|
||||
|
||||
# import mpv
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
|
||||
class Random(Command):
|
||||
def run(self):
|
||||
"""play random local songs."""
|
||||
songs = []
|
||||
for d in seasons():
|
||||
for f in d.iterdir():
|
||||
if not f.suffix == ".mp3":
|
||||
continue
|
||||
songs.append(f)
|
||||
random.shuffle(songs)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||
for song in songs:
|
||||
tmp.write(f"{song}\n".encode())
|
||||
playlist = tmp.name
|
||||
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||
|
||||
try:
|
||||
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||
# process = subprocess.Popen(
|
||||
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||
# )
|
||||
|
||||
# process.wait()
|
||||
finally:
|
||||
os.remove(playlist)
|
||||
73
src/music/commands/stream.py
Normal file
73
src/music/commands/stream.py
Normal file
@@ -0,0 +1,73 @@
|
||||
from datetime import datetime, timezone
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
from pydantic import Field
|
||||
import requests
|
||||
import structlog
|
||||
|
||||
from music.dates import DateParse
|
||||
from music.models import Command
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
class Source(str, Enum):
|
||||
KEXP = "kexp"
|
||||
KUGS = "kugs"
|
||||
CLIS = "clis"
|
||||
|
||||
|
||||
def current_quarter() -> Path:
|
||||
date = DateParse()
|
||||
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||
|
||||
|
||||
def url_for_time(t) -> str:
|
||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
log.info("getting archive", time=t)
|
||||
check_url = (
|
||||
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||
)
|
||||
response = requests.get(check_url)
|
||||
if response.status_code == 200:
|
||||
found = response.json().get("sg-url")
|
||||
if found is None:
|
||||
log.error("error getting archive", time=t)
|
||||
return default
|
||||
else:
|
||||
log.error("error getting archive", time=t, status=response.status_code)
|
||||
return default
|
||||
|
||||
return found
|
||||
|
||||
|
||||
class Stream(Command):
|
||||
"""play streams."""
|
||||
|
||||
source: Source = Field(default=Source.KEXP)
|
||||
for_date: datetime | None = Field(default=None)
|
||||
|
||||
def run(self) -> None:
|
||||
if self.for_date is not None:
|
||||
time = self.for_date.astimezone(tz=timezone.utc)
|
||||
else:
|
||||
time = None
|
||||
self.play(self.source, time)
|
||||
|
||||
def play(self, stream, t=None):
|
||||
"""Play streams using mpv."""
|
||||
match stream:
|
||||
case Source.KEXP:
|
||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
if t:
|
||||
url = url_for_time(t)
|
||||
case Source.KUGS:
|
||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||
case Source.CLIS:
|
||||
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||
case _:
|
||||
raise ValueError(f"unrecognized stream: {stream}")
|
||||
subprocess.run(["mpv", url])
|
||||
44
src/music/dates.py
Normal file
44
src/music/dates.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from pydantic import BaseModel, Field, computed_field
|
||||
|
||||
|
||||
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
||||
if for_date is None:
|
||||
for_date = datetime.now()
|
||||
year = for_date.year
|
||||
month = for_date.month
|
||||
if month in [12, 1, 2]:
|
||||
quarter = "winter"
|
||||
elif month in [3, 4, 5]:
|
||||
quarter = "spring"
|
||||
elif month in [6, 7, 8]:
|
||||
quarter = "summer"
|
||||
else:
|
||||
quarter = "fall"
|
||||
return quarter, year
|
||||
|
||||
|
||||
class DateParse(BaseModel):
|
||||
date: datetime = Field(default_factory=datetime.now)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def quarter(self) -> str:
|
||||
match self.date.month:
|
||||
case 12 | 1 | 2:
|
||||
return "winter"
|
||||
case 3 | 4 | 5:
|
||||
return "spring"
|
||||
case 6 | 7 | 8:
|
||||
return "summer"
|
||||
case 9 | 10 | 11:
|
||||
return "fall"
|
||||
case _:
|
||||
raise ValueError(self.date.month)
|
||||
|
||||
@computed_field()
|
||||
@property
|
||||
def year(self) -> int:
|
||||
return self.date.year
|
||||
9
src/music/models.py
Normal file
9
src/music/models.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from pydantic_settings import BaseSettings
|
||||
|
||||
|
||||
class Command(BaseSettings, ABC):
|
||||
@abstractmethod
|
||||
def run(self) -> None:
|
||||
pass
|
||||
21
src/music/paths.py
Normal file
21
src/music/paths.py
Normal file
@@ -0,0 +1,21 @@
|
||||
from pathlib import Path
|
||||
from typing import Generator
|
||||
|
||||
from music.dates import quarter_year
|
||||
|
||||
|
||||
def seasons() -> Generator[Path, None, None]:
|
||||
prefix = ("fall_", "winter_", "spring_", "summer_")
|
||||
music_dir = Path().home() / "Music"
|
||||
for d in music_dir.iterdir():
|
||||
if not d.is_dir():
|
||||
continue
|
||||
if not d.name.startswith(prefix):
|
||||
continue
|
||||
yield d
|
||||
yield from []
|
||||
|
||||
|
||||
def current_quarter() -> Path:
|
||||
quarter, year = quarter_year()
|
||||
return Path().home() / "Music" / f"{quarter}_{year}"
|
||||
48
src/music/save.py
Normal file
48
src/music/save.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from mutagen.easyid3 import EasyID3
|
||||
import yt_dlp
|
||||
from .dates import quarter_year
|
||||
from .paths import current_quarter
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def yt(url):
|
||||
ydl_opts = {
|
||||
"format": "mp3/bestaudio/best",
|
||||
# ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
|
||||
"postprocessors": [
|
||||
{ # Extract audio using ffmpeg
|
||||
"key": "FFmpegExtractAudio",
|
||||
"preferredcodec": "mp3",
|
||||
}
|
||||
],
|
||||
}
|
||||
|
||||
os.chdir(Path.home() / "Downloads")
|
||||
# Download the audio and get the filename
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
info_dict = ydl.extract_info(url, download=True)
|
||||
_ = ydl.prepare_filename(info_dict)
|
||||
if info_dict is None:
|
||||
raise ValueError("error downloading")
|
||||
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||
|
||||
track = info_dict.get("track")
|
||||
artist = info_dict.get("artist")
|
||||
|
||||
parent = current_quarter()
|
||||
if not parent.exists():
|
||||
parent.mkdir(parents=True)
|
||||
if track is not None and artist is not None:
|
||||
name = f"{artist} - {track}.mp3"
|
||||
new_filepath = parent / name
|
||||
os.rename(filename, new_filepath)
|
||||
tags = EasyID3(new_filepath)
|
||||
tags["title"] = track
|
||||
tags["artist"] = artist
|
||||
tags.save()
|
||||
|
||||
else:
|
||||
new_filepath = parent / filename
|
||||
os.rename(filename, new_filepath)
|
||||
@@ -1,137 +0,0 @@
|
||||
#! /home/user/.venv/bin/python
|
||||
|
||||
import asyncio
|
||||
from shazamio import Shazam, Serialize
|
||||
from pathlib import Path
|
||||
import click
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from tqdm import tqdm
|
||||
from typing import Optional
|
||||
import sys
|
||||
from collections import namedtuple
|
||||
from joblib import Memory
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
base_path = Path(__file__).parent.parent
|
||||
memory = Memory(base_path / "./.cache", verbose=0)
|
||||
|
||||
|
||||
class Match(BaseModel):
|
||||
old: Path
|
||||
new: Optional[Path] = Field(default=None)
|
||||
title: Optional[str] = Field(default=None)
|
||||
artist: Optional[str] = Field(default=None)
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
pass
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"-s",
|
||||
"--song",
|
||||
required=True,
|
||||
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
||||
)
|
||||
def rename(song: Path):
|
||||
change = get_match(song)
|
||||
if change and change.new is not None:
|
||||
prompt_rename(change)
|
||||
|
||||
|
||||
def songs(root: Path):
|
||||
seasons = ["winter", "spring", "summer", "fall"]
|
||||
pattern = "{}_*/*.mp3"
|
||||
matches = []
|
||||
for season in seasons:
|
||||
for file_path in root.rglob(pattern.format(season)):
|
||||
matches.append(file_path)
|
||||
|
||||
def small(f: Path, max_bytes: int = 10_000_000):
|
||||
return f.stat().st_size < max_bytes
|
||||
|
||||
return filter(small, matches)
|
||||
|
||||
|
||||
def needs_rename(song: Match):
|
||||
if song.new is None:
|
||||
print(f"\nno match: {song.old.name}\n")
|
||||
return False
|
||||
if song.old.name == song.new.name:
|
||||
print(f"\nname correct: {song.old.name}\n")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
@cli.command("main")
|
||||
def main():
|
||||
root = Path(os.getenv("MUSIC_PATH", "~/Music"))
|
||||
if not root.exists():
|
||||
raise ValueError(f"{str(root)} does not exist")
|
||||
|
||||
renames = []
|
||||
for song in tqdm(songs(root), position=0, leave=True):
|
||||
rename = get_match(song)
|
||||
if rename and rename.new is not None:
|
||||
renames.append(rename)
|
||||
|
||||
for song in filter(needs_rename, renames):
|
||||
prompt_rename(song)
|
||||
|
||||
sys.exit(127)
|
||||
|
||||
|
||||
def prompt_rename(song: Match):
|
||||
old = song.old
|
||||
answer = input(
|
||||
f'\nrename: "{old.name}"\nto: "{song.new.name}"?\n[y[es]/n[o]/m[anual]] > '
|
||||
)
|
||||
if answer in ["n", "N"]:
|
||||
print(f"\nskipping: {old.name}\n")
|
||||
return
|
||||
if answer in ["m"]:
|
||||
manual = input("\nnew name: ")
|
||||
new = old.parent / manual
|
||||
assert (
|
||||
new.suffix == old.suffix
|
||||
), f"filetype changed: {old.suffix} -> {new.suffix}"
|
||||
else:
|
||||
new = song.new
|
||||
try:
|
||||
old.rename(new)
|
||||
from mutagen.easyid3 import EasyID3
|
||||
|
||||
tags = EasyID3(new)
|
||||
tags["title"] = song.title
|
||||
tags["artist"] = song.artist
|
||||
tags.save()
|
||||
except FileNotFoundError as e:
|
||||
print(f"error renaming: {e}")
|
||||
print(f'old: "{old}"\nnew: "{new}"')
|
||||
return
|
||||
|
||||
|
||||
@memory.cache
|
||||
def get_match(original: Path) -> Match:
|
||||
loop = asyncio.get_event_loop()
|
||||
shazam = Shazam()
|
||||
try:
|
||||
out = loop.run_until_complete(shazam.recognize(str(original)))
|
||||
serialized = Serialize.track(out["track"])
|
||||
rename = f"{serialized.title} - {serialized.subtitle}{original.suffix}"
|
||||
new = Path(original.parent) / rename
|
||||
return Match(
|
||||
old=original, new=new, title=serialized.title, artist=serialized.subtitle
|
||||
)
|
||||
except:
|
||||
return Match(old=original, new=None)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
env = Path(__file__).parent.parent / ".env"
|
||||
if not load_dotenv(env):
|
||||
raise ValueError(".env not found: {env}")
|
||||
cli()
|
||||
80
src/music/stream.py
Normal file
80
src/music/stream.py
Normal file
@@ -0,0 +1,80 @@
|
||||
import subprocess
|
||||
from enum import Enum
|
||||
from .paths import seasons
|
||||
import random
|
||||
import requests
|
||||
import structlog
|
||||
|
||||
# import mpv
|
||||
import tempfile
|
||||
import os
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
class Stream(str, Enum):
|
||||
KEXP = "kexp"
|
||||
KUGS = "kugs"
|
||||
CLIS = "clis"
|
||||
|
||||
|
||||
def url_for_time(t) -> str:
|
||||
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
log.info("getting archive", time=t)
|
||||
check_url = (
|
||||
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||
)
|
||||
response = requests.get(check_url)
|
||||
if response.status_code == 200:
|
||||
found = response.json().get("sg-url")
|
||||
if found is None:
|
||||
log.error("error getting archive", time=t)
|
||||
return default
|
||||
else:
|
||||
log.error("error getting archive", time=t, status=response.status_code)
|
||||
return default
|
||||
|
||||
return found
|
||||
|
||||
|
||||
def play(stream, t=None):
|
||||
"""Play streams using mpv."""
|
||||
match stream:
|
||||
case Stream.KEXP:
|
||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||
if t:
|
||||
url = url_for_time(t)
|
||||
case Stream.KUGS:
|
||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||
case Stream.CLIS:
|
||||
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||
case _:
|
||||
raise ValueError(f"unrecognized stream: {stream}")
|
||||
subprocess.run(["mpv", url])
|
||||
|
||||
|
||||
def local():
|
||||
"""play random local songs."""
|
||||
songs = []
|
||||
for d in seasons():
|
||||
for f in d.iterdir():
|
||||
if not f.suffix == ".mp3":
|
||||
continue
|
||||
songs.append(f)
|
||||
random.shuffle(songs)
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||
for song in songs:
|
||||
tmp.write(f"{song}\n".encode())
|
||||
playlist = tmp.name
|
||||
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||
|
||||
try:
|
||||
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||
# process = subprocess.Popen(
|
||||
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||
# )
|
||||
|
||||
# process.wait()
|
||||
finally:
|
||||
os.remove(playlist)
|
||||
Reference in New Issue
Block a user