From 47c9487516d19b0c737f2b4dd1b735fac1346484 Mon Sep 17 00:00:00 2001 From: Puqns67 Date: Thu, 24 Oct 2024 00:01:53 +0800 Subject: [PATCH] refactor: move main functions to app.py then add some features * Rename functions * Multithearding * Display progress bar --- ncmlyrics/__main__.py | 89 +---------------- ncmlyrics/api.py | 2 +- ncmlyrics/app.py | 222 +++++++++++++++++++++++++++++++++++++----- ncmlyrics/enum.py | 2 +- ncmlyrics/lrc.py | 14 +-- ncmlyrics/object.py | 9 +- ncmlyrics/util.py | 57 +++-------- 7 files changed, 232 insertions(+), 163 deletions(-) diff --git a/ncmlyrics/__main__.py b/ncmlyrics/__main__.py index e791137..8ffb19a 100644 --- a/ncmlyrics/__main__.py +++ b/ncmlyrics/__main__.py @@ -1,28 +1,9 @@ from pathlib import Path from click import Path as clickPath -from click import argument, command, confirm, option -from rich.console import Console -from rich.theme import Theme +from click import argument, command, option -from .api import NCMApi from .app import NCMLyricsApp -from .enum import LinkType -from .error import UnsupportedLinkError -from .lrc import Lrc -from .util import parseLink, pickOutput - -NCMLyricsAppTheme = Theme( - { - "songTitle": "bold chartreuse1", - "songArrow": "chartreuse3", - "albumTitle": "bold orchid1", - "albumArrow": "orchid2", - "playListTitle": "bold aquamarine1", - "playListArrow": "aquamarine3", - "warning": "bold red1", - } -) @command @@ -40,72 +21,8 @@ NCMLyricsAppTheme = Theme( "links", nargs=-1, ) -def main(outputs: list[Path], exist: bool, overwrite: bool, quiet: bool, links: list[str]) -> int: - console = Console(theme=NCMLyricsAppTheme, highlight=False) - - if len(links) == 0: - console.print( - "请输入至少一个链接以解析曲目以获取其歌词!支持输入单曲,专辑与歌单的分享或网页链接。", style="warning" - ) - return 1 - - api = NCMApi() - app = NCMLyricsApp(console=console, outputs=outputs, exist=exist, overwrite=overwrite, quiet=quiet, tracks=[]) - - for link in links: - try: - parsed = parseLink(link) - except UnsupportedLinkError: - continue - - match parsed.type: - case LinkType.Track: - newTrack = api.getDetailsForTrack(parsed.id) - savePath = pickOutput(newTrack, outputs, exist) - - if not quiet: - console.print("-- 单曲 -->", style="songTitle", end=" ") - console.print(f"{"/".join(newTrack.artists)} - {newTrack.name}", style=f"link {newTrack.link()}") - - app.addWithPath(newTrack, savePath, "songArrow") - - case LinkType.Album: - newAlbum = api.getDetailsForAlbum(parsed.id) - - if not quiet: - console.print("== 专辑 ==>", style="albumTitle", end=" ") - console.print(newAlbum.name, style=f"link {newAlbum.link()}") - - for newTrack in newAlbum.tracks: - app.add(newTrack, "albumArrow") - - case LinkType.Playlist: - newPlaylist = api.getDetailsForPlaylist(parsed.id) - newPlaylist.fillDetailsOfTracks(api) - - if not quiet: - console.print("== 歌单 ==>", style="playListTitle", end=" ") - console.print(newPlaylist.name, style=f"link {newPlaylist.link()}") - - for newTrack in newPlaylist.tracks: - app.add(newTrack, "playListArrow") - - if len(app.tracks) == 0: - console.print("无曲目的歌词可被获取,请检查上方的输出!", style="warning") - return 1 - - if not quiet: - confirm("继续操作?", default=True, abort=True) - - for track, path in app.tracks: - ncmlyrics = api.getLyricsByTrack(track.id) - if ncmlyrics.isPureMusic: - console.print(f"曲目 {track.name} 为纯音乐, 跳过此曲目") - else: - Lrc.fromNCMLyrics(ncmlyrics).saveAs(path) - console.print(f"--> {str(path)}") - - api.saveCookies() +def main(exist: bool, overwrite: bool, quiet: bool, outputs: list[Path], links: list[str]) -> None: + NCMLyricsApp(exist=exist, overwrite=overwrite, quiet=quiet, outputs=outputs, links=links).run() if __name__ == "__main__": diff --git a/ncmlyrics/api.py b/ncmlyrics/api.py index a673dfc..95ce7c1 100644 --- a/ncmlyrics/api.py +++ b/ncmlyrics/api.py @@ -58,7 +58,7 @@ class NCMApi: ) def _fetch(self, request: HttpXRequest, retry: int | None = 4) -> HttpXResponse: - if retry: # None => Disable retry + if retry is not None: # None => Disable retry if retry < 0: retry = 0 diff --git a/ncmlyrics/app.py b/ncmlyrics/app.py index 3888660..ed69a40 100644 --- a/ncmlyrics/app.py +++ b/ncmlyrics/app.py @@ -1,40 +1,210 @@ -from dataclasses import dataclass +from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import wait as waitFuture from pathlib import Path +from re import Pattern +from re import compile as compileRegex +from re import escape as escapeRegex +from typing import Any, Generator, Iterable +from click import confirm from rich.console import Console +from rich.progress import Progress, TaskID +from rich.theme import Theme -from .object import NCMTrack -from .util import pickOutput +from .api import NCMApi +from .enum import LinkType +from .error import ParseLinkError, UnsupportedLinkError +from .lrc import Lrc +from .object import NCMAlbum, NCMPlaylist, NCMTrack +from .util import parseLink, safeFileName + +__all__ = ["NCMLyricsApp"] + +NCMLyricsAppTheme = Theme( + { + "tracktitle": "bold chartreuse1", + "trackarrow": "chartreuse3", + "albumtitle": "bold orchid1", + "albumarrow": "orchid2", + "playlisttitle": "bold aquamarine1", + "playlistarrow": "aquamarine3", + "info": "", + "warning": "orange1", + "error": "bold red1", + } +) -@dataclass class NCMLyricsApp: - console: Console + def __init__(self, exist: bool, overwrite: bool, quiet: bool, outputs: list[Path], links: list[str]) -> None: + self.console = Console(theme=NCMLyricsAppTheme, highlight=False) + self.progress = Progress(console=self.console) + self.pool = ThreadPoolExecutor(max_workers=4) - outputs: list[Path] - exist: bool - overwrite: bool - quiet: bool + self.api = NCMApi() - tracks: list[tuple[NCMTrack, Path]] - - def addWithPath(self, track: NCMTrack, savePath: Path | None, arrowStyle: str) -> None: - if savePath is None: - if not self.quiet: - self.console.print("--->", style=arrowStyle, end=" ") - self.console.print("未能找到源文件,将跳过此曲目。", style="warning") - elif not self.overwrite and savePath.exists(): - if not self.quiet: - self.console.print("--->", style=arrowStyle, end=" ") - self.console.print("歌词文件已存在,将跳过此曲目。", style="warning") + self.exist = exist + self.overwrite = overwrite + self.quiet = quiet + if len(outputs) == 0: + self.outputs = [Path()] else: - self.tracks.append((track, savePath)) + self.outputs = outputs - def add(self, track: NCMTrack, arrowStyle: str) -> None: - savePath = pickOutput(track, self.outputs, self.exist) + self.links = links + + self.tasks: list[NCMTrack | NCMAlbum | NCMPlaylist] = [] + self.tracks: list[NCMTrack] = [] + self.trackPairs: list[tuple[NCMTrack, Path | None]] = [] + + self.existedTargets: list[Path] = [] + + def run(self) -> None: + if len(self.links) == 0: + self.console.print( + "请给出至少一个链接以解析曲目以获取其歌词!支持输入单曲,专辑与歌单的分享或网页链接。", style="error" + ) + return + + progressId = self.progress.add_task("解析链接", total=len(self.links)) + self.progress.start() + + for task in self.pool.map(self.resolveLink, self._repeat(progressId), self.links): + self.tasks.append(task) + self.tracks.extend(task.tracks) if not self.quiet: - self.console.print("-->", style=arrowStyle, end=" ") - self.console.print(f"{"/".join(track.artists)} - {track.name}", style=f"link {track.link()}") + self.progress.stop() + self.printTasks() + if not confirm("继续操作?", default=True): + self.console.print("任务已取消。", style="info") + return - self.addWithPath(track, savePath, arrowStyle) + self.progress.reset(progressId, description="获取已存在的歌曲列表", total=1) + + if not self.quiet: + self.progress.start() + self.walkOutputs() + self.progress.advance(progressId) + + self.progress.reset(progressId, description="解析保存路径", total=len(self.tracks)) + + for trackPair in self.pool.map(self.resolvePath, self._repeat(progressId), self.tracks): + self.trackPairs.append(trackPair) + + self.progress.reset(progressId, description="输出 Lrc 文件", total=len(self.trackPairs)) + + waitFuture((self.pool.submit(self.exportLrc, progressId, *trackPair) for trackPair in self.trackPairs)) + + self.progress.stop() + self.api.saveCookies() + + def printTasks(self): + def printTracks(tracks: Iterable[NCMTrack], arrowStyle: str | None = None) -> None: + for track in tracks: + self.console.print( + f"[{arrowStyle}]-->[/{arrowStyle}] [link={track.link()}]{track.prettyString()}[/link]" + ) + + for task in self.tasks: + match task: + case NCMTrack(): + self.console.print( + f"[tracktitle]-- 单曲 -->[/tracktitle] [link={task.link()}]{task.prettyString()}[/link]" + ) + case NCMAlbum(): + self.console.print(f"[albumtitle]== 专辑 ==>[/albumtitle] [link={task.link()}]{task.name}[/link]") + printTracks(task.tracks, "albumarrow") + case NCMPlaylist(): + self.console.print( + f"[playlisttitle]== 歌单 ==>[/playlisttitle] [link={task.link()}]{task.name}[/link]" + ) + printTracks(task.tracks, "playlistarrow") + + def walkOutputs(self): + for output in self.outputs: + output = output.absolute() + if not output.exists() or not output.is_dir(): + continue + for content in output.iterdir(): + if not content.is_file(): + continue + if content.suffix in (".ncm", ".mp3", ".flac"): + self.existedTargets.append(content) + + def resolveLink(self, progress: TaskID, link: str) -> NCMTrack | NCMAlbum | NCMPlaylist: + try: + parsed = parseLink(link) + except UnsupportedLinkError: + self.console.print(f"不支持的链接:{link}", style="error") + return + except ParseLinkError: + self.console.print_exception() + self.console.print(f"解析链接时出现错误:{link}", style="error") + return + + match parsed.type: + case LinkType.Track: + result = self.api.getDetailsForTrack(parsed.id) + case LinkType.Album: + result = self.api.getDetailsForAlbum(parsed.id) + case LinkType.Playlist: + result = self.api.getDetailsForPlaylist(parsed.id) + result.fillDetailsOfTracks(self.api) + + self.progress.advance(progress) + return result + + def resolvePath(self, progress: TaskID, track: NCMTrack) -> tuple[NCMTrack, Path | None]: + regex: Pattern[str] | None = None + targetLrc: Path | None = None + + for target in self.existedTargets: + if regex is None: + # "(?:," + ")?(?:,".join((escapeRegex(artist) for artist in track.artists[3:])) + ")?" + regex = compileRegex( + rf"^{escapeRegex(",".join(track.artists[:3])).replace(",", "(?:,| )")} - {escapeRegex(track.name.rstrip("."))}.+(ncm|mp3|flac)$" + ) + matched = regex.match(target.name) + if matched is not None: + targetLrc = target.with_suffix(".lrc") + break + + self.progress.advance(progress) + + if targetLrc is None: + if self.exist: + return (track, None) + else: + targetLrc = self.outputs[-1] / safeFileName(f"{",".join(track.artists)} - {track.name}.lrc") + + if not self.overwrite and targetLrc.exists(): + return (track, None) + return (track, targetLrc) + + def exportLrc(self, progress: TaskID, track: NCMTrack, path: Path | None) -> None: + if path is None: + self.console.print( + f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[dark_turquoise] [warning]对应的歌词文件已存在, 跳过此曲目。[/warning]" + ) + self.progress.advance(progress) + return + + ncmlyrics = self.api.getLyricsByTrack(track.id) + if ncmlyrics.isPureMusic: + self.console.print( + f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[/dark_turquoise] [warning]为纯音乐, 跳过此曲目。[/warning]" + ) + else: + if not self.quiet: + self.console.print( + f"[trackarrow]-->[/trackarrow] {track.prettyString()} [dark_turquoise]==>[/dark_turquoise] [info]{str(path)}[/info]" + ) + Lrc.fromNCMLyrics(ncmlyrics).saveAs(path) + + self.progress.advance(progress) + + @staticmethod + def _repeat(content: Any) -> Generator[Any, None, None]: + while True: + yield content diff --git a/ncmlyrics/enum.py b/ncmlyrics/enum.py index 3cca4f4..105d018 100644 --- a/ncmlyrics/enum.py +++ b/ncmlyrics/enum.py @@ -8,7 +8,7 @@ class LrcType(Enum): Translation = auto() Romaji = auto() - def pretty(self) -> str: + def prettyString(self) -> str: match self: case LrcType.Origin: return "源" diff --git a/ncmlyrics/lrc.py b/ncmlyrics/lrc.py index 76585a8..5c8def9 100644 --- a/ncmlyrics/lrc.py +++ b/ncmlyrics/lrc.py @@ -2,7 +2,7 @@ from json import JSONDecodeError from json import loads as loadJson from pathlib import Path from re import Match -from re import compile as reCompile +from re import compile as compileRegex from typing import Generator, Iterable, Self from .constant import CONFIG_LRC_AUTO_MERGE, CONFIG_LRC_AUTO_MERGE_OFFSET @@ -12,11 +12,11 @@ from .object import NCMLyrics __all__ = ["Lrc"] -LRC_RE_COMMIT = reCompile(r"^\s*#") -LRC_RE_META = reCompile(r"^\s*\[(?Pti|ar|al|au|length|by|offset):\s*(?P.+?)\s*\]\s*$") -LRC_RE_META_NCM_SPECIAL = reCompile(r"^\s*\{.*\}\s*$") -LRC_RE_LYRIC = reCompile(r"^\s*(?P(?:\s*\[\d{1,2}:\d{1,2}(?:\.\d{1,3})?\])+)\s*(?P.+?)\s*$") -LRC_RE_LYRIC_TIMELABEL = reCompile(r"\[(?P\d{1,2}):(?P\d{1,2}(?:\.\d{1,3})?)\]") +LRC_RE_COMMIT = compileRegex(r"^\s*#") +LRC_RE_META = compileRegex(r"^\s*\[(?Pti|ar|al|au|length|by|offset):\s*(?P.+?)\s*\]\s*$") +LRC_RE_META_NCM_SPECIAL = compileRegex(r"^\s*\{.*\}\s*$") +LRC_RE_LYRIC = compileRegex(r"^\s*(?P(?:\s*\[\d{1,2}:\d{1,2}(?:\.\d{1,3})?\])+)\s*(?P.+?)\s*$") +LRC_RE_LYRIC_TIMELABEL = compileRegex(r"\[(?P\d{1,2}):(?P\d{1,2}(?:\.\d{1,3})?)\]") class Lrc: @@ -147,7 +147,7 @@ class Lrc: for metaType in LrcMetaType: if metaType in self.metadata: for lrcType in self.metadata[metaType].keys(): - yield f"[{metaType.value}:{lrcType.pretty()}/{self.metadata[metaType][lrcType]}]" + yield f"[{metaType.value}:{lrcType.prettyString()}/{self.metadata[metaType][lrcType]}]" for metaType, content in self.specials["metadata"]: yield f"[{metaType.value}:{content}]" diff --git a/ncmlyrics/object.py b/ncmlyrics/object.py index 85df614..d1a9c0d 100644 --- a/ncmlyrics/object.py +++ b/ncmlyrics/object.py @@ -28,7 +28,7 @@ class NCMTrack: data = data.get("songs") if data is None: - raise ObjectParseError("不存在单曲对应的结构") + raise ObjectParseError("不存在单曲对应的结构", data) result = [] @@ -48,9 +48,16 @@ class NCMTrack: except KeyError as e: raise ObjectParseError(f"需要的键不存在: {e}") + @property + def tracks(self) -> list[Self]: + return [self] + def link(self) -> str: return f"https://music.163.com/song?id={self.id}" + def prettyString(self) -> str: + return f"{"/".join(self.artists)} - {self.name}" + @dataclass class NCMAlbum: diff --git a/ncmlyrics/util.py b/ncmlyrics/util.py index 94de07c..bf2252f 100644 --- a/ncmlyrics/util.py +++ b/ncmlyrics/util.py @@ -1,21 +1,25 @@ from dataclasses import dataclass -from pathlib import Path -from re import compile as reCompile +from platform import system +from re import compile as compileRegex from urllib.parse import parse_qs as parseQuery from urllib.parse import urlparse as parseUrl -from httpx import get as httpGet +from httpx import get as getHttp from .enum import LinkType from .error import ParseLinkError, UnsupportedLinkError -from .object import NCMTrack -__all__ = ["Link", "parseLink", "testExistTrackSource", "pickOutput"] +__all__ = ["Link", "parseLink", "safeFileName"] -RE_SHARE_LINK_ID_BY_PATH = reCompile(r"^/?(?P\d+)$") -RE_SHARE_LINK_ANDROID_ALBUM_PATH = reCompile(r"^/album/(?P\d+)/?$") -RE_SAFE_FILENAME = reCompile(r"\*{2,}") -TRANSLATER_SAFE_FILENAME = str.maketrans({i: 0x2A for i in ("<", ">", ":", '"', "/", "\\", "|", "?")}) +RE_SHARE_LINK_ID_BY_PATH = compileRegex(r"^/?(?P\d+)$") +RE_SHARE_LINK_ANDROID_ALBUM_PATH = compileRegex(r"^/album/(?P\d+)/?$") + +if system() == "Windows": + TRANSLATER_SAFE_FILENAME = str.maketrans( + {i: 0x5F for i in (0x2F, 0x5C, 0x3A, 0x2A, 0x3F, 0x22, 0x3C, 0x3E, 0x7C)} + ) # /, \, :, *, ?, ", <, >, | => _ +else: + TRANSLATER_SAFE_FILENAME = str.maketrans({0x2F: 0x5F}) # / => _ @dataclass @@ -57,7 +61,7 @@ def parseLink(url: str) -> Link: case _: raise UnsupportedLinkError(parsedUrl) case "163cn.tv": - response = httpGet(url) + response = getHttp(url) if response.status_code != 302: raise ParseLinkError(f"未知的 Api 响应: {response.status_code}") newUrl = response.headers.get("Location") @@ -107,34 +111,5 @@ def parseLink(url: str) -> Link: return Link(contentType, contentId) -def testExistTrackSource(track: NCMTrack, path: Path) -> Path | None: - safeFileName = RE_SAFE_FILENAME.sub( - "*", f"{"?".join(track.artists[:3])} - {track.name.rstrip(".")}.*".translate(TRANSLATER_SAFE_FILENAME) - ) - - globing = path.glob(safeFileName, case_sensitive=False) - - try: - return next(globing) - except StopIteration: - return None - - -def pickOutput(track: NCMTrack, outputs: list[Path], forceSourceExists: bool = False) -> Path | None: - match len(outputs): - case 0: - result = testExistTrackSource(track, Path()) - if result is not None: - return result.with_suffix(".lrc") - return None if forceSourceExists else Path(f"{",".join(track.artists)} - {track.name}.lrc") - case 1: - result = testExistTrackSource(track, outputs[0]) - if result is not None: - return result.with_suffix(".lrc") - return None if forceSourceExists else outputs[0] / f"{",".join(track.artists)} - {track.name}.lrc" - case _: - for output in outputs: - result = testExistTrackSource(track, output) - if result is not None: - return result.with_suffix(".lrc") - return None if forceSourceExists else outputs[-1] / f"{",".join(track.artists)} - {track.name}.lrc" +def safeFileName(filename: str) -> str: + return filename.translate(TRANSLATER_SAFE_FILENAME)