refactor: move main functions to app.py then add some features

* Rename functions
* Multithearding
* Display progress bar
This commit is contained in:
Puqns67 2024-10-24 00:01:53 +08:00
parent 0c4a81c9d3
commit 47c9487516
Signed by: Puqns67
GPG Key ID: 9669DF042554F536
7 changed files with 232 additions and 163 deletions

View File

@ -1,28 +1,9 @@
from pathlib import Path
from click import Path as clickPath
from click import argument, command, confirm, option
from rich.console import Console
from rich.theme import Theme
from click import argument, command, option
from .api import NCMApi
from .app import NCMLyricsApp
from .enum import LinkType
from .error import UnsupportedLinkError
from .lrc import Lrc
from .util import parseLink, pickOutput
NCMLyricsAppTheme = Theme(
{
"songTitle": "bold chartreuse1",
"songArrow": "chartreuse3",
"albumTitle": "bold orchid1",
"albumArrow": "orchid2",
"playListTitle": "bold aquamarine1",
"playListArrow": "aquamarine3",
"warning": "bold red1",
}
)
@command
@ -40,72 +21,8 @@ NCMLyricsAppTheme = Theme(
"links",
nargs=-1,
)
def main(outputs: list[Path], exist: bool, overwrite: bool, quiet: bool, links: list[str]) -> int:
console = Console(theme=NCMLyricsAppTheme, highlight=False)
if len(links) == 0:
console.print(
"请输入至少一个链接以解析曲目以获取其歌词!支持输入单曲,专辑与歌单的分享或网页链接。", style="warning"
)
return 1
api = NCMApi()
app = NCMLyricsApp(console=console, outputs=outputs, exist=exist, overwrite=overwrite, quiet=quiet, tracks=[])
for link in links:
try:
parsed = parseLink(link)
except UnsupportedLinkError:
continue
match parsed.type:
case LinkType.Track:
newTrack = api.getDetailsForTrack(parsed.id)
savePath = pickOutput(newTrack, outputs, exist)
if not quiet:
console.print("-- 单曲 -->", style="songTitle", end=" ")
console.print(f"{"/".join(newTrack.artists)} - {newTrack.name}", style=f"link {newTrack.link()}")
app.addWithPath(newTrack, savePath, "songArrow")
case LinkType.Album:
newAlbum = api.getDetailsForAlbum(parsed.id)
if not quiet:
console.print("== 专辑 ==>", style="albumTitle", end=" ")
console.print(newAlbum.name, style=f"link {newAlbum.link()}")
for newTrack in newAlbum.tracks:
app.add(newTrack, "albumArrow")
case LinkType.Playlist:
newPlaylist = api.getDetailsForPlaylist(parsed.id)
newPlaylist.fillDetailsOfTracks(api)
if not quiet:
console.print("== 歌单 ==>", style="playListTitle", end=" ")
console.print(newPlaylist.name, style=f"link {newPlaylist.link()}")
for newTrack in newPlaylist.tracks:
app.add(newTrack, "playListArrow")
if len(app.tracks) == 0:
console.print("无曲目的歌词可被获取,请检查上方的输出!", style="warning")
return 1
if not quiet:
confirm("继续操作?", default=True, abort=True)
for track, path in app.tracks:
ncmlyrics = api.getLyricsByTrack(track.id)
if ncmlyrics.isPureMusic:
console.print(f"曲目 {track.name} 为纯音乐, 跳过此曲目")
else:
Lrc.fromNCMLyrics(ncmlyrics).saveAs(path)
console.print(f"--> {str(path)}")
api.saveCookies()
def main(exist: bool, overwrite: bool, quiet: bool, outputs: list[Path], links: list[str]) -> None:
NCMLyricsApp(exist=exist, overwrite=overwrite, quiet=quiet, outputs=outputs, links=links).run()
if __name__ == "__main__":

View File

@ -58,7 +58,7 @@ class NCMApi:
)
def _fetch(self, request: HttpXRequest, retry: int | None = 4) -> HttpXResponse:
if retry: # None => Disable retry
if retry is not None: # None => Disable retry
if retry < 0:
retry = 0

View File

@ -1,40 +1,210 @@
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import wait as waitFuture
from pathlib import Path
from re import Pattern
from re import compile as compileRegex
from re import escape as escapeRegex
from typing import Any, Generator, Iterable
from click import confirm
from rich.console import Console
from rich.progress import Progress, TaskID
from rich.theme import Theme
from .object import NCMTrack
from .util import pickOutput
from .api import NCMApi
from .enum import LinkType
from .error import ParseLinkError, UnsupportedLinkError
from .lrc import Lrc
from .object import NCMAlbum, NCMPlaylist, NCMTrack
from .util import parseLink, safeFileName
__all__ = ["NCMLyricsApp"]
NCMLyricsAppTheme = Theme(
{
"tracktitle": "bold chartreuse1",
"trackarrow": "chartreuse3",
"albumtitle": "bold orchid1",
"albumarrow": "orchid2",
"playlisttitle": "bold aquamarine1",
"playlistarrow": "aquamarine3",
"info": "",
"warning": "orange1",
"error": "bold red1",
}
)
@dataclass
class NCMLyricsApp:
console: Console
def __init__(self, exist: bool, overwrite: bool, quiet: bool, outputs: list[Path], links: list[str]) -> None:
self.console = Console(theme=NCMLyricsAppTheme, highlight=False)
self.progress = Progress(console=self.console)
self.pool = ThreadPoolExecutor(max_workers=4)
outputs: list[Path]
exist: bool
overwrite: bool
quiet: bool
self.api = NCMApi()
tracks: list[tuple[NCMTrack, Path]]
def addWithPath(self, track: NCMTrack, savePath: Path | None, arrowStyle: str) -> None:
if savePath is None:
if not self.quiet:
self.console.print("--->", style=arrowStyle, end=" ")
self.console.print("未能找到源文件,将跳过此曲目。", style="warning")
elif not self.overwrite and savePath.exists():
if not self.quiet:
self.console.print("--->", style=arrowStyle, end=" ")
self.console.print("歌词文件已存在,将跳过此曲目。", style="warning")
self.exist = exist
self.overwrite = overwrite
self.quiet = quiet
if len(outputs) == 0:
self.outputs = [Path()]
else:
self.tracks.append((track, savePath))
self.outputs = outputs
def add(self, track: NCMTrack, arrowStyle: str) -> None:
savePath = pickOutput(track, self.outputs, self.exist)
self.links = links
self.tasks: list[NCMTrack | NCMAlbum | NCMPlaylist] = []
self.tracks: list[NCMTrack] = []
self.trackPairs: list[tuple[NCMTrack, Path | None]] = []
self.existedTargets: list[Path] = []
def run(self) -> None:
if len(self.links) == 0:
self.console.print(
"请给出至少一个链接以解析曲目以获取其歌词!支持输入单曲,专辑与歌单的分享或网页链接。", style="error"
)
return
progressId = self.progress.add_task("解析链接", total=len(self.links))
self.progress.start()
for task in self.pool.map(self.resolveLink, self._repeat(progressId), self.links):
self.tasks.append(task)
self.tracks.extend(task.tracks)
if not self.quiet:
self.console.print("-->", style=arrowStyle, end=" ")
self.console.print(f"{"/".join(track.artists)} - {track.name}", style=f"link {track.link()}")
self.progress.stop()
self.printTasks()
if not confirm("继续操作?", default=True):
self.console.print("任务已取消。", style="info")
return
self.addWithPath(track, savePath, arrowStyle)
self.progress.reset(progressId, description="获取已存在的歌曲列表", total=1)
if not self.quiet:
self.progress.start()
self.walkOutputs()
self.progress.advance(progressId)
self.progress.reset(progressId, description="解析保存路径", total=len(self.tracks))
for trackPair in self.pool.map(self.resolvePath, self._repeat(progressId), self.tracks):
self.trackPairs.append(trackPair)
self.progress.reset(progressId, description="输出 Lrc 文件", total=len(self.trackPairs))
waitFuture((self.pool.submit(self.exportLrc, progressId, *trackPair) for trackPair in self.trackPairs))
self.progress.stop()
self.api.saveCookies()
def printTasks(self):
def printTracks(tracks: Iterable[NCMTrack], arrowStyle: str | None = None) -> None:
for track in tracks:
self.console.print(
f"[{arrowStyle}]-->[/{arrowStyle}] [link={track.link()}]{track.prettyString()}[/link]"
)
for task in self.tasks:
match task:
case NCMTrack():
self.console.print(
f"[tracktitle]-- 单曲 -->[/tracktitle] [link={task.link()}]{task.prettyString()}[/link]"
)
case NCMAlbum():
self.console.print(f"[albumtitle]== 专辑 ==>[/albumtitle] [link={task.link()}]{task.name}[/link]")
printTracks(task.tracks, "albumarrow")
case NCMPlaylist():
self.console.print(
f"[playlisttitle]== 歌单 ==>[/playlisttitle] [link={task.link()}]{task.name}[/link]"
)
printTracks(task.tracks, "playlistarrow")
def walkOutputs(self):
for output in self.outputs:
output = output.absolute()
if not output.exists() or not output.is_dir():
continue
for content in output.iterdir():
if not content.is_file():
continue
if content.suffix in (".ncm", ".mp3", ".flac"):
self.existedTargets.append(content)
def resolveLink(self, progress: TaskID, link: str) -> NCMTrack | NCMAlbum | NCMPlaylist:
try:
parsed = parseLink(link)
except UnsupportedLinkError:
self.console.print(f"不支持的链接:{link}", style="error")
return
except ParseLinkError:
self.console.print_exception()
self.console.print(f"解析链接时出现错误:{link}", style="error")
return
match parsed.type:
case LinkType.Track:
result = self.api.getDetailsForTrack(parsed.id)
case LinkType.Album:
result = self.api.getDetailsForAlbum(parsed.id)
case LinkType.Playlist:
result = self.api.getDetailsForPlaylist(parsed.id)
result.fillDetailsOfTracks(self.api)
self.progress.advance(progress)
return result
def resolvePath(self, progress: TaskID, track: NCMTrack) -> tuple[NCMTrack, Path | None]:
regex: Pattern[str] | None = None
targetLrc: Path | None = None
for target in self.existedTargets:
if regex is None:
# "(?:," + ")?(?:,".join((escapeRegex(artist) for artist in track.artists[3:])) + ")?"
regex = compileRegex(
rf"^{escapeRegex(",".join(track.artists[:3])).replace(",", "(?:,| )")} - {escapeRegex(track.name.rstrip("."))}.+(ncm|mp3|flac)$"
)
matched = regex.match(target.name)
if matched is not None:
targetLrc = target.with_suffix(".lrc")
break
self.progress.advance(progress)
if targetLrc is None:
if self.exist:
return (track, None)
else:
targetLrc = self.outputs[-1] / safeFileName(f"{",".join(track.artists)} - {track.name}.lrc")
if not self.overwrite and targetLrc.exists():
return (track, None)
return (track, targetLrc)
def exportLrc(self, progress: TaskID, track: NCMTrack, path: Path | None) -> None:
if path is None:
self.console.print(
f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[dark_turquoise] [warning]对应的歌词文件已存在, 跳过此曲目。[/warning]"
)
self.progress.advance(progress)
return
ncmlyrics = self.api.getLyricsByTrack(track.id)
if ncmlyrics.isPureMusic:
self.console.print(
f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[/dark_turquoise] [warning]为纯音乐, 跳过此曲目。[/warning]"
)
else:
if not self.quiet:
self.console.print(
f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[/dark_turquoise] [info]{str(path)}[/info]"
)
Lrc.fromNCMLyrics(ncmlyrics).saveAs(path)
self.progress.advance(progress)
@staticmethod
def _repeat(content: Any) -> Generator[Any, None, None]:
while True:
yield content

View File

@ -8,7 +8,7 @@ class LrcType(Enum):
Translation = auto()
Romaji = auto()
def pretty(self) -> str:
def prettyString(self) -> str:
match self:
case LrcType.Origin:
return ""

View File

@ -2,7 +2,7 @@ from json import JSONDecodeError
from json import loads as loadJson
from pathlib import Path
from re import Match
from re import compile as reCompile
from re import compile as compileRegex
from typing import Generator, Iterable, Self
from .constant import CONFIG_LRC_AUTO_MERGE, CONFIG_LRC_AUTO_MERGE_OFFSET
@ -12,11 +12,11 @@ from .object import NCMLyrics
__all__ = ["Lrc"]
LRC_RE_COMMIT = reCompile(r"^\s*#")
LRC_RE_META = reCompile(r"^\s*\[(?P<type>ti|ar|al|au|length|by|offset):\s*(?P<content>.+?)\s*\]\s*$")
LRC_RE_META_NCM_SPECIAL = reCompile(r"^\s*\{.*\}\s*$")
LRC_RE_LYRIC = reCompile(r"^\s*(?P<timeLabels>(?:\s*\[\d{1,2}:\d{1,2}(?:\.\d{1,3})?\])+)\s*(?P<lyric>.+?)\s*$")
LRC_RE_LYRIC_TIMELABEL = reCompile(r"\[(?P<minutes>\d{1,2}):(?P<seconds>\d{1,2}(?:\.\d{1,3})?)\]")
LRC_RE_COMMIT = compileRegex(r"^\s*#")
LRC_RE_META = compileRegex(r"^\s*\[(?P<type>ti|ar|al|au|length|by|offset):\s*(?P<content>.+?)\s*\]\s*$")
LRC_RE_META_NCM_SPECIAL = compileRegex(r"^\s*\{.*\}\s*$")
LRC_RE_LYRIC = compileRegex(r"^\s*(?P<timeLabels>(?:\s*\[\d{1,2}:\d{1,2}(?:\.\d{1,3})?\])+)\s*(?P<lyric>.+?)\s*$")
LRC_RE_LYRIC_TIMELABEL = compileRegex(r"\[(?P<minutes>\d{1,2}):(?P<seconds>\d{1,2}(?:\.\d{1,3})?)\]")
class Lrc:
@ -147,7 +147,7 @@ class Lrc:
for metaType in LrcMetaType:
if metaType in self.metadata:
for lrcType in self.metadata[metaType].keys():
yield f"[{metaType.value}:{lrcType.pretty()}/{self.metadata[metaType][lrcType]}]"
yield f"[{metaType.value}:{lrcType.prettyString()}/{self.metadata[metaType][lrcType]}]"
for metaType, content in self.specials["metadata"]:
yield f"[{metaType.value}:{content}]"

View File

@ -28,7 +28,7 @@ class NCMTrack:
data = data.get("songs")
if data is None:
raise ObjectParseError("不存在单曲对应的结构")
raise ObjectParseError("不存在单曲对应的结构", data)
result = []
@ -48,9 +48,16 @@ class NCMTrack:
except KeyError as e:
raise ObjectParseError(f"需要的键不存在: {e}")
@property
def tracks(self) -> list[Self]:
return [self]
def link(self) -> str:
return f"https://music.163.com/song?id={self.id}"
def prettyString(self) -> str:
return f"{"/".join(self.artists)} - {self.name}"
@dataclass
class NCMAlbum:

View File

@ -1,21 +1,25 @@
from dataclasses import dataclass
from pathlib import Path
from re import compile as reCompile
from platform import system
from re import compile as compileRegex
from urllib.parse import parse_qs as parseQuery
from urllib.parse import urlparse as parseUrl
from httpx import get as httpGet
from httpx import get as getHttp
from .enum import LinkType
from .error import ParseLinkError, UnsupportedLinkError
from .object import NCMTrack
__all__ = ["Link", "parseLink", "testExistTrackSource", "pickOutput"]
__all__ = ["Link", "parseLink", "safeFileName"]
RE_SHARE_LINK_ID_BY_PATH = reCompile(r"^/?(?P<id>\d+)$")
RE_SHARE_LINK_ANDROID_ALBUM_PATH = reCompile(r"^/album/(?P<id>\d+)/?$")
RE_SAFE_FILENAME = reCompile(r"\*{2,}")
TRANSLATER_SAFE_FILENAME = str.maketrans({i: 0x2A for i in ("<", ">", ":", '"', "/", "\\", "|", "?")})
RE_SHARE_LINK_ID_BY_PATH = compileRegex(r"^/?(?P<id>\d+)$")
RE_SHARE_LINK_ANDROID_ALBUM_PATH = compileRegex(r"^/album/(?P<id>\d+)/?$")
if system() == "Windows":
TRANSLATER_SAFE_FILENAME = str.maketrans(
{i: 0x5F for i in (0x2F, 0x5C, 0x3A, 0x2A, 0x3F, 0x22, 0x3C, 0x3E, 0x7C)}
) # /, \, :, *, ?, ", <, >, | => _
else:
TRANSLATER_SAFE_FILENAME = str.maketrans({0x2F: 0x5F}) # / => _
@dataclass
@ -57,7 +61,7 @@ def parseLink(url: str) -> Link:
case _:
raise UnsupportedLinkError(parsedUrl)
case "163cn.tv":
response = httpGet(url)
response = getHttp(url)
if response.status_code != 302:
raise ParseLinkError(f"未知的 Api 响应: {response.status_code}")
newUrl = response.headers.get("Location")
@ -107,34 +111,5 @@ def parseLink(url: str) -> Link:
return Link(contentType, contentId)
def testExistTrackSource(track: NCMTrack, path: Path) -> Path | None:
safeFileName = RE_SAFE_FILENAME.sub(
"*", f"{"?".join(track.artists[:3])} - {track.name.rstrip(".")}.*".translate(TRANSLATER_SAFE_FILENAME)
)
globing = path.glob(safeFileName, case_sensitive=False)
try:
return next(globing)
except StopIteration:
return None
def pickOutput(track: NCMTrack, outputs: list[Path], forceSourceExists: bool = False) -> Path | None:
match len(outputs):
case 0:
result = testExistTrackSource(track, Path())
if result is not None:
return result.with_suffix(".lrc")
return None if forceSourceExists else Path(f"{",".join(track.artists)} - {track.name}.lrc")
case 1:
result = testExistTrackSource(track, outputs[0])
if result is not None:
return result.with_suffix(".lrc")
return None if forceSourceExists else outputs[0] / f"{",".join(track.artists)} - {track.name}.lrc"
case _:
for output in outputs:
result = testExistTrackSource(track, output)
if result is not None:
return result.with_suffix(".lrc")
return None if forceSourceExists else outputs[-1] / f"{",".join(track.artists)} - {track.name}.lrc"
def safeFileName(filename: str) -> str:
return filename.translate(TRANSLATER_SAFE_FILENAME)