Compare commits
8 Commits
404deff6e4
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bde1ddf465 | ||
|
|
c36a6a7f62 | ||
|
|
ce66005bb3 | ||
|
|
7982c7700b | ||
|
|
cd7f93100b | ||
|
|
323d6c0499 | ||
|
|
bacf927d4b | ||
|
|
27453f505e |
175
.gitignore
vendored
Normal file
175
.gitignore
vendored
Normal file
@@ -0,0 +1,175 @@
|
|||||||
|
# Byte-compiled / optimized / DLL files
|
||||||
|
__pycache__/
|
||||||
|
*.py[cod]
|
||||||
|
*$py.class
|
||||||
|
|
||||||
|
# C extensions
|
||||||
|
*.so
|
||||||
|
|
||||||
|
# Distribution / packaging
|
||||||
|
.Python
|
||||||
|
build/
|
||||||
|
develop-eggs/
|
||||||
|
dist/
|
||||||
|
downloads/
|
||||||
|
eggs/
|
||||||
|
.eggs/
|
||||||
|
lib/
|
||||||
|
lib64/
|
||||||
|
parts/
|
||||||
|
sdist/
|
||||||
|
var/
|
||||||
|
wheels/
|
||||||
|
share/python-wheels/
|
||||||
|
*.egg-info/
|
||||||
|
.installed.cfg
|
||||||
|
*.egg
|
||||||
|
MANIFEST
|
||||||
|
|
||||||
|
# PyInstaller
|
||||||
|
# Usually these files are written by a python script from a template
|
||||||
|
# before PyInstaller builds the exe, so as to inject date/other infos into it.
|
||||||
|
*.manifest
|
||||||
|
*.spec
|
||||||
|
|
||||||
|
# Installer logs
|
||||||
|
pip-log.txt
|
||||||
|
pip-delete-this-directory.txt
|
||||||
|
|
||||||
|
# Unit test / coverage reports
|
||||||
|
htmlcov/
|
||||||
|
.tox/
|
||||||
|
.nox/
|
||||||
|
.coverage
|
||||||
|
.coverage.*
|
||||||
|
.cache
|
||||||
|
nosetests.xml
|
||||||
|
coverage.xml
|
||||||
|
*.cover
|
||||||
|
*.py,cover
|
||||||
|
.hypothesis/
|
||||||
|
.pytest_cache/
|
||||||
|
cover/
|
||||||
|
|
||||||
|
# Translations
|
||||||
|
*.mo
|
||||||
|
*.pot
|
||||||
|
|
||||||
|
# Django stuff:
|
||||||
|
*.log
|
||||||
|
local_settings.py
|
||||||
|
db.sqlite3
|
||||||
|
db.sqlite3-journal
|
||||||
|
|
||||||
|
# Flask stuff:
|
||||||
|
instance/
|
||||||
|
.webassets-cache
|
||||||
|
|
||||||
|
# Scrapy stuff:
|
||||||
|
.scrapy
|
||||||
|
|
||||||
|
# Sphinx documentation
|
||||||
|
docs/_build/
|
||||||
|
|
||||||
|
# PyBuilder
|
||||||
|
.pybuilder/
|
||||||
|
target/
|
||||||
|
|
||||||
|
# Jupyter Notebook
|
||||||
|
.ipynb_checkpoints
|
||||||
|
|
||||||
|
# IPython
|
||||||
|
profile_default/
|
||||||
|
ipython_config.py
|
||||||
|
|
||||||
|
# pyenv
|
||||||
|
# For a library or package, you might want to ignore these files since the code is
|
||||||
|
# intended to run in multiple environments; otherwise, check them in:
|
||||||
|
# .python-version
|
||||||
|
|
||||||
|
# pipenv
|
||||||
|
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
|
||||||
|
# However, in case of collaboration, if having platform-specific dependencies or dependencies
|
||||||
|
# having no cross-platform support, pipenv may install dependencies that don't work, or not
|
||||||
|
# install all needed dependencies.
|
||||||
|
#Pipfile.lock
|
||||||
|
|
||||||
|
# UV
|
||||||
|
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
|
||||||
|
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||||
|
# commonly ignored for libraries.
|
||||||
|
#uv.lock
|
||||||
|
|
||||||
|
# poetry
|
||||||
|
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
|
||||||
|
# This is especially recommended for binary packages to ensure reproducibility, and is more
|
||||||
|
# commonly ignored for libraries.
|
||||||
|
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
|
||||||
|
#poetry.lock
|
||||||
|
|
||||||
|
# pdm
|
||||||
|
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
|
||||||
|
#pdm.lock
|
||||||
|
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
|
||||||
|
# in version control.
|
||||||
|
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
|
||||||
|
.pdm.toml
|
||||||
|
.pdm-python
|
||||||
|
.pdm-build/
|
||||||
|
|
||||||
|
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
|
||||||
|
__pypackages__/
|
||||||
|
|
||||||
|
# Celery stuff
|
||||||
|
celerybeat-schedule
|
||||||
|
celerybeat.pid
|
||||||
|
|
||||||
|
# SageMath parsed files
|
||||||
|
*.sage.py
|
||||||
|
|
||||||
|
# Environments
|
||||||
|
.env
|
||||||
|
.venv
|
||||||
|
env/
|
||||||
|
venv/
|
||||||
|
ENV/
|
||||||
|
env.bak/
|
||||||
|
venv.bak/
|
||||||
|
|
||||||
|
# Spyder project settings
|
||||||
|
.spyderproject
|
||||||
|
.spyproject
|
||||||
|
|
||||||
|
# Rope project settings
|
||||||
|
.ropeproject
|
||||||
|
|
||||||
|
# mkdocs documentation
|
||||||
|
/site
|
||||||
|
|
||||||
|
# mypy
|
||||||
|
.mypy_cache/
|
||||||
|
.dmypy.json
|
||||||
|
dmypy.json
|
||||||
|
|
||||||
|
# Pyre type checker
|
||||||
|
.pyre/
|
||||||
|
|
||||||
|
# pytype static type analyzer
|
||||||
|
.pytype/
|
||||||
|
|
||||||
|
# Cython debug symbols
|
||||||
|
cython_debug/
|
||||||
|
|
||||||
|
# PyCharm
|
||||||
|
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
|
||||||
|
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||||
|
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||||
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
|
#.idea/
|
||||||
|
|
||||||
|
# Ruff stuff:
|
||||||
|
.ruff_cache/
|
||||||
|
|
||||||
|
# PyPI configuration file
|
||||||
|
.pypirc
|
||||||
|
|
||||||
@@ -4,15 +4,23 @@ version = "0.1.0"
|
|||||||
description = "music utilies"
|
description = "music utilies"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
authors = [{ name = "publicmatt", email = "git@publicmatt.com" }]
|
||||||
requires-python = ">=3.13"
|
requires-python = ">=3.12"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"audioop-lts>=0.2.2 ; python_full_version >= '3.13'",
|
||||||
"click>=8.1.8",
|
"click>=8.1.8",
|
||||||
"joblib>=1.4.2",
|
"mpv>=1.0.7",
|
||||||
"python-dotenv>=1.0.1",
|
"mutagen>=1.47.0",
|
||||||
|
"pydantic>=2.0",
|
||||||
|
"pydantic-settings>=2.12.0",
|
||||||
|
"requests>=2.32.3",
|
||||||
|
"shazamio>=0.8.1",
|
||||||
|
"structlog>=25.5.0",
|
||||||
|
"yt-dlp>=2025.1.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.scripts]
|
[project.scripts]
|
||||||
music = "music.__main__:cli"
|
music = "music.__main__:cli"
|
||||||
|
stream = "music.__main__:stream"
|
||||||
|
|
||||||
[build-system]
|
[build-system]
|
||||||
requires = ["hatchling"]
|
requires = ["hatchling"]
|
||||||
|
|||||||
@@ -1,7 +0,0 @@
|
|||||||
asyncio
|
|
||||||
shazamio
|
|
||||||
click
|
|
||||||
python_dotenv
|
|
||||||
tqdm
|
|
||||||
joblib
|
|
||||||
mutagen
|
|
||||||
@@ -1,2 +0,0 @@
|
|||||||
def main() -> None:
|
|
||||||
print("Hello from music!")
|
|
||||||
|
|||||||
@@ -1,53 +1,15 @@
|
|||||||
from enum import Enum
|
from pydantic_settings import CliApp
|
||||||
import click
|
|
||||||
import subprocess
|
|
||||||
# from .shazam import rename
|
|
||||||
|
|
||||||
from click.exceptions import ClickException
|
from .cli import Cli
|
||||||
|
from .commands import Stream
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
|
||||||
def cli():
|
def cli():
|
||||||
pass
|
CliApp.run(Cli)
|
||||||
|
|
||||||
|
|
||||||
class Stream(str, Enum):
|
def stream():
|
||||||
KEXP = "kexp"
|
CliApp.run(Stream)
|
||||||
KUGS = "kugs"
|
|
||||||
CLIS = "clis"
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command("stream")
|
|
||||||
@click.option(
|
|
||||||
"-s", "--stream", "stream", type=click.Choice(Stream), default=Stream.KEXP
|
|
||||||
)
|
|
||||||
def play_stream(stream):
|
|
||||||
"""Play the KEXP stream using mpv."""
|
|
||||||
match stream:
|
|
||||||
case Stream.KEXP:
|
|
||||||
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
|
||||||
case Stream.KUGS:
|
|
||||||
url = "https://peridot.streamguys1.com:7175/kugs-mp3""
|
|
||||||
case Stream.CLIS:
|
|
||||||
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
|
||||||
case _:
|
|
||||||
raise ClickException(f"unrecognized stream: {stream}")
|
|
||||||
subprocess.run(["mpv", url])
|
|
||||||
|
|
||||||
@cli.command("random")
|
|
||||||
def play_random():
|
|
||||||
"""todo."""
|
|
||||||
raise NotImplementedError("todo")
|
|
||||||
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
|
||||||
|
|
||||||
@cli.command("save")
|
|
||||||
@click.option( "-u", "--url", "url", required=True)
|
|
||||||
def save_song(url):
|
|
||||||
"""todo."""
|
|
||||||
raise NotImplementedError("todo")
|
|
||||||
|
|
||||||
# FILENAME=$(yt-dlp {{url}} -x --audio-format mp3 | tee /dev/tty | grep 'ExtractAudio' | cut -d ' ' -f 3-)
|
|
||||||
# $HOME/.local/lib/shazam/bin/shazam.py rename --song "$FILENAME"
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
24
src/music/cli.py
Normal file
24
src/music/cli.py
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
from pydantic import Field
|
||||||
|
from pydantic_settings import BaseSettings, CliApp, CliSubCommand, get_subcommand
|
||||||
|
|
||||||
|
from music.commands import Random, Stream, YtDownload
|
||||||
|
|
||||||
|
|
||||||
|
class Cli(BaseSettings):
|
||||||
|
model_config = {
|
||||||
|
"env_file": [".env"],
|
||||||
|
"cli_kebab_case": True,
|
||||||
|
"cli_use_class_docs_for_groups": True,
|
||||||
|
}
|
||||||
|
|
||||||
|
download: CliSubCommand[YtDownload] = Field(alias="yt")
|
||||||
|
stream: CliSubCommand[Stream] = Field(alias="stream")
|
||||||
|
random: CliSubCommand[Random] = Field(alias="random")
|
||||||
|
|
||||||
|
def cli_cmd(self) -> None:
|
||||||
|
if (cmd := get_subcommand(self, is_required=False)) is not None:
|
||||||
|
CliApp.run_subcommand(self, cli_cmd_method_name="run")
|
||||||
|
else:
|
||||||
|
CliApp.run(Cli, cli_args=["--help"])
|
||||||
|
|
||||||
|
pass
|
||||||
5
src/music/commands/__init__.py
Normal file
5
src/music/commands/__init__.py
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
from .download import YtDownload
|
||||||
|
from .random import Random
|
||||||
|
from .stream import Stream
|
||||||
|
|
||||||
|
__all__ = ["YtDownload", "Random", "Stream"]
|
||||||
143
src/music/commands/download.py
Normal file
143
src/music/commands/download.py
Normal file
@@ -0,0 +1,143 @@
|
|||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import ClassVar
|
||||||
|
|
||||||
|
from pydantic_settings import CliImplicitFlag, CliPositionalArg
|
||||||
|
import structlog
|
||||||
|
import yt_dlp
|
||||||
|
from mutagen.easyid3 import EasyID3
|
||||||
|
from pydantic import Field, computed_field
|
||||||
|
from shazamio import Shazam
|
||||||
|
|
||||||
|
from music.dates import DateParse
|
||||||
|
from music.models import Command
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
def _clean(value) -> str | None:
|
||||||
|
if value is None:
|
||||||
|
return None
|
||||||
|
if isinstance(value, list):
|
||||||
|
value = ", ".join(v for v in value if v)
|
||||||
|
value = str(value).strip()
|
||||||
|
return value or None
|
||||||
|
|
||||||
|
|
||||||
|
def _prompt(label: str, default: str | None = None) -> str | None:
|
||||||
|
suffix = f" [{default}]" if default else ""
|
||||||
|
try:
|
||||||
|
value = input(f"{label}{suffix}: ").strip()
|
||||||
|
except EOFError:
|
||||||
|
return default
|
||||||
|
return value or default
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_artist_track(info: dict) -> tuple[str | None, str | None]:
|
||||||
|
artist = (
|
||||||
|
_clean(info.get("artists"))
|
||||||
|
or _clean(info.get("artist"))
|
||||||
|
or _clean(info.get("creators"))
|
||||||
|
or _clean(info.get("creator"))
|
||||||
|
)
|
||||||
|
track = _clean(info.get("track")) or _clean(info.get("alt_title"))
|
||||||
|
|
||||||
|
title = _clean(info.get("title"))
|
||||||
|
if (artist is None or track is None) and title and " - " in title:
|
||||||
|
parsed_artist, parsed_track = title.split(" - ", 1)
|
||||||
|
parsed_track = re.sub(r"\s*[\(\[][^\)\]]*[\)\]]\s*$", "", parsed_track).strip()
|
||||||
|
if artist is None:
|
||||||
|
artist = _clean(parsed_artist)
|
||||||
|
if track is None:
|
||||||
|
track = _clean(parsed_track)
|
||||||
|
return artist, track
|
||||||
|
|
||||||
|
|
||||||
|
def _shazam_lookup(filepath: str) -> tuple[str | None, str | None]:
|
||||||
|
async def recognize() -> dict:
|
||||||
|
return await Shazam().recognize(filepath)
|
||||||
|
|
||||||
|
try:
|
||||||
|
result = asyncio.run(recognize())
|
||||||
|
except Exception as e:
|
||||||
|
log.warning("shazam lookup failed", error=str(e))
|
||||||
|
return None, None
|
||||||
|
track_info = result.get("track") or {}
|
||||||
|
return _clean(track_info.get("subtitle")), _clean(track_info.get("title"))
|
||||||
|
|
||||||
|
|
||||||
|
def current_quarter() -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
|
||||||
|
class YtDownload(Command):
|
||||||
|
"""
|
||||||
|
download a youtube song from {URL} to current quarter dir.
|
||||||
|
"""
|
||||||
|
|
||||||
|
url: CliPositionalArg[str] = Field(description="youtube url to download")
|
||||||
|
ydl_opts: ClassVar = {
|
||||||
|
"format": "mp3/bestaudio/best",
|
||||||
|
"postprocessors": [
|
||||||
|
{
|
||||||
|
"key": "FFmpegExtractAudio",
|
||||||
|
"preferredcodec": "mp3",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"remote_components": ["ejs:github"],
|
||||||
|
}
|
||||||
|
parents: CliImplicitFlag[bool] = Field(
|
||||||
|
default=True, description="create the parent dirs if not exists"
|
||||||
|
)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def quarter_dir(self) -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
os.chdir(Path.home() / "Downloads")
|
||||||
|
with yt_dlp.YoutubeDL(self.ydl_opts) as ydl:
|
||||||
|
info_dict = ydl.extract_info(self.url, download=True)
|
||||||
|
_ = ydl.prepare_filename(info_dict)
|
||||||
|
if info_dict is None:
|
||||||
|
raise ValueError("error downloading")
|
||||||
|
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||||
|
|
||||||
|
artist, track = _extract_artist_track(info_dict)
|
||||||
|
|
||||||
|
if artist is None or track is None:
|
||||||
|
log.info("trying shazam", file=filename)
|
||||||
|
shazam_artist, shazam_track = _shazam_lookup(filename)
|
||||||
|
artist = artist or shazam_artist
|
||||||
|
track = track or shazam_track
|
||||||
|
if shazam_artist or shazam_track:
|
||||||
|
log.info("shazam matched", artist=shazam_artist, track=shazam_track)
|
||||||
|
|
||||||
|
if artist is None or track is None:
|
||||||
|
title = info_dict.get("title") or Path(filename).stem
|
||||||
|
uploader = info_dict.get("uploader") or info_dict.get("channel")
|
||||||
|
log.info("could not parse metadata", title=title, uploader=uploader)
|
||||||
|
artist = _prompt("artist", default=artist or uploader)
|
||||||
|
track = _prompt("track", default=track or title)
|
||||||
|
|
||||||
|
base = self.quarter_dir
|
||||||
|
if not base.exists():
|
||||||
|
base.mkdir(parents=self.parents)
|
||||||
|
if track and artist:
|
||||||
|
name = f"{artist} - {track}.mp3"
|
||||||
|
new_filepath = base / name
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
|
tags = EasyID3(new_filepath)
|
||||||
|
tags["title"] = track
|
||||||
|
tags["artist"] = artist
|
||||||
|
tags.save()
|
||||||
|
else:
|
||||||
|
new_filepath = base / Path(filename).name
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
|
|
||||||
|
log.info("downloaded", path=str(new_filepath))
|
||||||
35
src/music/commands/random.py
Normal file
35
src/music/commands/random.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
import subprocess
|
||||||
|
from music.models import Command
|
||||||
|
from music.paths import seasons
|
||||||
|
import random
|
||||||
|
|
||||||
|
# import mpv
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
class Random(Command):
|
||||||
|
def run(self):
|
||||||
|
"""play random local songs."""
|
||||||
|
songs = []
|
||||||
|
for d in seasons():
|
||||||
|
for f in d.iterdir():
|
||||||
|
if not f.suffix == ".mp3":
|
||||||
|
continue
|
||||||
|
songs.append(f)
|
||||||
|
random.shuffle(songs)
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||||
|
for song in songs:
|
||||||
|
tmp.write(f"{song}\n".encode())
|
||||||
|
playlist = tmp.name
|
||||||
|
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||||
|
# process = subprocess.Popen(
|
||||||
|
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||||
|
# )
|
||||||
|
|
||||||
|
# process.wait()
|
||||||
|
finally:
|
||||||
|
os.remove(playlist)
|
||||||
73
src/music/commands/stream.py
Normal file
73
src/music/commands/stream.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
from datetime import datetime, timezone
|
||||||
|
from enum import Enum
|
||||||
|
from pathlib import Path
|
||||||
|
import subprocess
|
||||||
|
|
||||||
|
from pydantic import Field
|
||||||
|
import requests
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
from music.dates import DateParse
|
||||||
|
from music.models import Command
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class Source(str, Enum):
|
||||||
|
KEXP = "kexp"
|
||||||
|
KUGS = "kugs"
|
||||||
|
CLIS = "clis"
|
||||||
|
|
||||||
|
|
||||||
|
def current_quarter() -> Path:
|
||||||
|
date = DateParse()
|
||||||
|
return Path().home() / "Music" / f"{date.quarter}_{date.year}"
|
||||||
|
|
||||||
|
|
||||||
|
def url_for_time(t) -> str:
|
||||||
|
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
log.info("getting archive", time=t)
|
||||||
|
check_url = (
|
||||||
|
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||||
|
)
|
||||||
|
response = requests.get(check_url)
|
||||||
|
if response.status_code == 200:
|
||||||
|
found = response.json().get("sg-url")
|
||||||
|
if found is None:
|
||||||
|
log.error("error getting archive", time=t)
|
||||||
|
return default
|
||||||
|
else:
|
||||||
|
log.error("error getting archive", time=t, status=response.status_code)
|
||||||
|
return default
|
||||||
|
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
class Stream(Command):
|
||||||
|
"""play streams."""
|
||||||
|
|
||||||
|
source: Source = Field(default=Source.KEXP)
|
||||||
|
for_date: datetime | None = Field(default=None)
|
||||||
|
|
||||||
|
def run(self) -> None:
|
||||||
|
if self.for_date is not None:
|
||||||
|
time = self.for_date.astimezone(tz=timezone.utc)
|
||||||
|
else:
|
||||||
|
time = None
|
||||||
|
self.play(self.source, time)
|
||||||
|
|
||||||
|
def play(self, stream, t=None):
|
||||||
|
"""Play streams using mpv."""
|
||||||
|
match stream:
|
||||||
|
case Source.KEXP:
|
||||||
|
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
if t:
|
||||||
|
url = url_for_time(t)
|
||||||
|
case Source.KUGS:
|
||||||
|
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||||
|
case Source.CLIS:
|
||||||
|
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||||
|
case _:
|
||||||
|
raise ValueError(f"unrecognized stream: {stream}")
|
||||||
|
subprocess.run(["mpv", url])
|
||||||
44
src/music/dates.py
Normal file
44
src/music/dates.py
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, Tuple
|
||||||
|
|
||||||
|
from pydantic import BaseModel, Field, computed_field
|
||||||
|
|
||||||
|
|
||||||
|
def quarter_year(for_date: Optional[datetime] = None) -> Tuple[str, int]:
|
||||||
|
if for_date is None:
|
||||||
|
for_date = datetime.now()
|
||||||
|
year = for_date.year
|
||||||
|
month = for_date.month
|
||||||
|
if month in [12, 1, 2]:
|
||||||
|
quarter = "winter"
|
||||||
|
elif month in [3, 4, 5]:
|
||||||
|
quarter = "spring"
|
||||||
|
elif month in [6, 7, 8]:
|
||||||
|
quarter = "summer"
|
||||||
|
else:
|
||||||
|
quarter = "fall"
|
||||||
|
return quarter, year
|
||||||
|
|
||||||
|
|
||||||
|
class DateParse(BaseModel):
|
||||||
|
date: datetime = Field(default_factory=datetime.now)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def quarter(self) -> str:
|
||||||
|
match self.date.month:
|
||||||
|
case 12 | 1 | 2:
|
||||||
|
return "winter"
|
||||||
|
case 3 | 4 | 5:
|
||||||
|
return "spring"
|
||||||
|
case 6 | 7 | 8:
|
||||||
|
return "summer"
|
||||||
|
case 9 | 10 | 11:
|
||||||
|
return "fall"
|
||||||
|
case _:
|
||||||
|
raise ValueError(self.date.month)
|
||||||
|
|
||||||
|
@computed_field()
|
||||||
|
@property
|
||||||
|
def year(self) -> int:
|
||||||
|
return self.date.year
|
||||||
9
src/music/models.py
Normal file
9
src/music/models.py
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
from abc import ABC, abstractmethod
|
||||||
|
|
||||||
|
from pydantic_settings import BaseSettings
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseSettings, ABC):
|
||||||
|
@abstractmethod
|
||||||
|
def run(self) -> None:
|
||||||
|
pass
|
||||||
21
src/music/paths.py
Normal file
21
src/music/paths.py
Normal file
@@ -0,0 +1,21 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
from typing import Generator
|
||||||
|
|
||||||
|
from music.dates import quarter_year
|
||||||
|
|
||||||
|
|
||||||
|
def seasons() -> Generator[Path, None, None]:
|
||||||
|
prefix = ("fall_", "winter_", "spring_", "summer_")
|
||||||
|
music_dir = Path().home() / "Music"
|
||||||
|
for d in music_dir.iterdir():
|
||||||
|
if not d.is_dir():
|
||||||
|
continue
|
||||||
|
if not d.name.startswith(prefix):
|
||||||
|
continue
|
||||||
|
yield d
|
||||||
|
yield from []
|
||||||
|
|
||||||
|
|
||||||
|
def current_quarter() -> Path:
|
||||||
|
quarter, year = quarter_year()
|
||||||
|
return Path().home() / "Music" / f"{quarter}_{year}"
|
||||||
48
src/music/save.py
Normal file
48
src/music/save.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
from mutagen.easyid3 import EasyID3
|
||||||
|
import yt_dlp
|
||||||
|
from .dates import quarter_year
|
||||||
|
from .paths import current_quarter
|
||||||
|
import os
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def yt(url):
|
||||||
|
ydl_opts = {
|
||||||
|
"format": "mp3/bestaudio/best",
|
||||||
|
# ℹ️ See help(yt_dlp.postprocessor) for a list of available Postprocessors and their arguments
|
||||||
|
"postprocessors": [
|
||||||
|
{ # Extract audio using ffmpeg
|
||||||
|
"key": "FFmpegExtractAudio",
|
||||||
|
"preferredcodec": "mp3",
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
|
||||||
|
os.chdir(Path.home() / "Downloads")
|
||||||
|
# Download the audio and get the filename
|
||||||
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||||
|
info_dict = ydl.extract_info(url, download=True)
|
||||||
|
_ = ydl.prepare_filename(info_dict)
|
||||||
|
if info_dict is None:
|
||||||
|
raise ValueError("error downloading")
|
||||||
|
filename = info_dict["requested_downloads"][0]["filepath"]
|
||||||
|
|
||||||
|
track = info_dict.get("track")
|
||||||
|
artist = info_dict.get("artist")
|
||||||
|
|
||||||
|
parent = current_quarter()
|
||||||
|
if not parent.exists():
|
||||||
|
parent.mkdir(parents=True)
|
||||||
|
if track is not None and artist is not None:
|
||||||
|
name = f"{artist} - {track}.mp3"
|
||||||
|
new_filepath = parent / name
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
|
tags = EasyID3(new_filepath)
|
||||||
|
tags["title"] = track
|
||||||
|
tags["artist"] = artist
|
||||||
|
tags.save()
|
||||||
|
|
||||||
|
else:
|
||||||
|
new_filepath = parent / filename
|
||||||
|
os.rename(filename, new_filepath)
|
||||||
@@ -1,137 +0,0 @@
|
|||||||
#! /home/user/.venv/bin/python
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
from shazamio import Shazam, Serialize
|
|
||||||
from pathlib import Path
|
|
||||||
import click
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
import os
|
|
||||||
from tqdm import tqdm
|
|
||||||
from typing import Optional
|
|
||||||
import sys
|
|
||||||
from collections import namedtuple
|
|
||||||
from joblib import Memory
|
|
||||||
from pydantic import BaseModel, Field
|
|
||||||
|
|
||||||
base_path = Path(__file__).parent.parent
|
|
||||||
memory = Memory(base_path / "./.cache", verbose=0)
|
|
||||||
|
|
||||||
|
|
||||||
class Match(BaseModel):
|
|
||||||
old: Path
|
|
||||||
new: Optional[Path] = Field(default=None)
|
|
||||||
title: Optional[str] = Field(default=None)
|
|
||||||
artist: Optional[str] = Field(default=None)
|
|
||||||
|
|
||||||
|
|
||||||
@click.group()
|
|
||||||
def cli():
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command()
|
|
||||||
@click.option(
|
|
||||||
"-s",
|
|
||||||
"--song",
|
|
||||||
required=True,
|
|
||||||
type=click.Path(exists=True, dir_okay=False, path_type=Path),
|
|
||||||
)
|
|
||||||
def rename(song: Path):
|
|
||||||
change = get_match(song)
|
|
||||||
if change and change.new is not None:
|
|
||||||
prompt_rename(change)
|
|
||||||
|
|
||||||
|
|
||||||
def songs(root: Path):
|
|
||||||
seasons = ["winter", "spring", "summer", "fall"]
|
|
||||||
pattern = "{}_*/*.mp3"
|
|
||||||
matches = []
|
|
||||||
for season in seasons:
|
|
||||||
for file_path in root.rglob(pattern.format(season)):
|
|
||||||
matches.append(file_path)
|
|
||||||
|
|
||||||
def small(f: Path, max_bytes: int = 10_000_000):
|
|
||||||
return f.stat().st_size < max_bytes
|
|
||||||
|
|
||||||
return filter(small, matches)
|
|
||||||
|
|
||||||
|
|
||||||
def needs_rename(song: Match):
|
|
||||||
if song.new is None:
|
|
||||||
print(f"\nno match: {song.old.name}\n")
|
|
||||||
return False
|
|
||||||
if song.old.name == song.new.name:
|
|
||||||
print(f"\nname correct: {song.old.name}\n")
|
|
||||||
return False
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
@cli.command("main")
|
|
||||||
def main():
|
|
||||||
root = Path(os.getenv("MUSIC_PATH", "~/Music"))
|
|
||||||
if not root.exists():
|
|
||||||
raise ValueError(f"{str(root)} does not exist")
|
|
||||||
|
|
||||||
renames = []
|
|
||||||
for song in tqdm(songs(root), position=0, leave=True):
|
|
||||||
rename = get_match(song)
|
|
||||||
if rename and rename.new is not None:
|
|
||||||
renames.append(rename)
|
|
||||||
|
|
||||||
for song in filter(needs_rename, renames):
|
|
||||||
prompt_rename(song)
|
|
||||||
|
|
||||||
sys.exit(127)
|
|
||||||
|
|
||||||
|
|
||||||
def prompt_rename(song: Match):
|
|
||||||
old = song.old
|
|
||||||
answer = input(
|
|
||||||
f'\nrename: "{old.name}"\nto: "{song.new.name}"?\n[y[es]/n[o]/m[anual]] > '
|
|
||||||
)
|
|
||||||
if answer in ["n", "N"]:
|
|
||||||
print(f"\nskipping: {old.name}\n")
|
|
||||||
return
|
|
||||||
if answer in ["m"]:
|
|
||||||
manual = input("\nnew name: ")
|
|
||||||
new = old.parent / manual
|
|
||||||
assert (
|
|
||||||
new.suffix == old.suffix
|
|
||||||
), f"filetype changed: {old.suffix} -> {new.suffix}"
|
|
||||||
else:
|
|
||||||
new = song.new
|
|
||||||
try:
|
|
||||||
old.rename(new)
|
|
||||||
from mutagen.easyid3 import EasyID3
|
|
||||||
|
|
||||||
tags = EasyID3(new)
|
|
||||||
tags["title"] = song.title
|
|
||||||
tags["artist"] = song.artist
|
|
||||||
tags.save()
|
|
||||||
except FileNotFoundError as e:
|
|
||||||
print(f"error renaming: {e}")
|
|
||||||
print(f'old: "{old}"\nnew: "{new}"')
|
|
||||||
return
|
|
||||||
|
|
||||||
|
|
||||||
@memory.cache
|
|
||||||
def get_match(original: Path) -> Match:
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
shazam = Shazam()
|
|
||||||
try:
|
|
||||||
out = loop.run_until_complete(shazam.recognize(str(original)))
|
|
||||||
serialized = Serialize.track(out["track"])
|
|
||||||
rename = f"{serialized.title} - {serialized.subtitle}{original.suffix}"
|
|
||||||
new = Path(original.parent) / rename
|
|
||||||
return Match(
|
|
||||||
old=original, new=new, title=serialized.title, artist=serialized.subtitle
|
|
||||||
)
|
|
||||||
except:
|
|
||||||
return Match(old=original, new=None)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
env = Path(__file__).parent.parent / ".env"
|
|
||||||
if not load_dotenv(env):
|
|
||||||
raise ValueError(".env not found: {env}")
|
|
||||||
cli()
|
|
||||||
80
src/music/stream.py
Normal file
80
src/music/stream.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
import subprocess
|
||||||
|
from enum import Enum
|
||||||
|
from .paths import seasons
|
||||||
|
import random
|
||||||
|
import requests
|
||||||
|
import structlog
|
||||||
|
|
||||||
|
# import mpv
|
||||||
|
import tempfile
|
||||||
|
import os
|
||||||
|
|
||||||
|
log = structlog.get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
class Stream(str, Enum):
|
||||||
|
KEXP = "kexp"
|
||||||
|
KUGS = "kugs"
|
||||||
|
CLIS = "clis"
|
||||||
|
|
||||||
|
|
||||||
|
def url_for_time(t) -> str:
|
||||||
|
default = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
tz = t.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||||
|
log.info("getting archive", time=t)
|
||||||
|
check_url = (
|
||||||
|
f"https://api.kexp.org/get_streaming_url/?bitrate=128×tamp={tz}&location=1"
|
||||||
|
)
|
||||||
|
response = requests.get(check_url)
|
||||||
|
if response.status_code == 200:
|
||||||
|
found = response.json().get("sg-url")
|
||||||
|
if found is None:
|
||||||
|
log.error("error getting archive", time=t)
|
||||||
|
return default
|
||||||
|
else:
|
||||||
|
log.error("error getting archive", time=t, status=response.status_code)
|
||||||
|
return default
|
||||||
|
|
||||||
|
return found
|
||||||
|
|
||||||
|
|
||||||
|
def play(stream, t=None):
|
||||||
|
"""Play streams using mpv."""
|
||||||
|
match stream:
|
||||||
|
case Stream.KEXP:
|
||||||
|
url = "https://kexp-mp3-128.streamguys1.com/kexp128.mp3"
|
||||||
|
if t:
|
||||||
|
url = url_for_time(t)
|
||||||
|
case Stream.KUGS:
|
||||||
|
url = "https://peridot.streamguys1.com:7175/kugs-mp3"
|
||||||
|
case Stream.CLIS:
|
||||||
|
url = "https://stream2.statsradio.com:8012/stream?=&&___cb=759135934160766"
|
||||||
|
case _:
|
||||||
|
raise ValueError(f"unrecognized stream: {stream}")
|
||||||
|
subprocess.run(["mpv", url])
|
||||||
|
|
||||||
|
|
||||||
|
def local():
|
||||||
|
"""play random local songs."""
|
||||||
|
songs = []
|
||||||
|
for d in seasons():
|
||||||
|
for f in d.iterdir():
|
||||||
|
if not f.suffix == ".mp3":
|
||||||
|
continue
|
||||||
|
songs.append(f)
|
||||||
|
random.shuffle(songs)
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp:
|
||||||
|
for song in songs:
|
||||||
|
tmp.write(f"{song}\n".encode())
|
||||||
|
playlist = tmp.name
|
||||||
|
# fd -t file -e mp3 -p -a "fall|winter|spring|summer" $HOME/Music/ | mpv --playlist=- --shuffle --no-video
|
||||||
|
|
||||||
|
try:
|
||||||
|
subprocess.run(["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"])
|
||||||
|
# process = subprocess.Popen(
|
||||||
|
# ["mpv", f"--playlist={playlist}", "--shuffle", "--no-video"],
|
||||||
|
# )
|
||||||
|
|
||||||
|
# process.wait()
|
||||||
|
finally:
|
||||||
|
os.remove(playlist)
|
||||||
Reference in New Issue
Block a user