395 lines
18 KiB
Python
395 lines
18 KiB
Python
'''
|
|
Function:
|
|
Implementation of Common Utils
|
|
Author:
|
|
Zhenchao Jin
|
|
WeChat Official Account (微信公众号):
|
|
Charles的皮卡丘
|
|
'''
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
import os
|
|
import html
|
|
import copy
|
|
import emoji
|
|
import errno
|
|
import pickle
|
|
import shutil
|
|
import bleach
|
|
import hashlib
|
|
import requests
|
|
import functools
|
|
import json_repair
|
|
import unicodedata
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from mutagen.mp3 import MP3
|
|
from mutagen.mp4 import MP4
|
|
from mutagen.asf import ASF
|
|
from mutagen.flac import FLAC
|
|
from mutagen.aiff import AIFF
|
|
from mutagen.wave import WAVE
|
|
from bs4 import BeautifulSoup
|
|
from http.cookies import SimpleCookie
|
|
from .importutils import optionalimport
|
|
from mutagen import File as MutagenFile
|
|
from mutagen.oggvorbis import OggVorbis
|
|
from pathvalidate import sanitize_filepath, sanitize_filename
|
|
|
|
|
|
def remove_suffix(value: str, suffix: str) -> str:
|
|
if suffix and value.endswith(suffix):
|
|
return value[: -len(suffix)]
|
|
return value
|
|
|
|
|
|
'''estimatedurationwithfilesizebr'''
|
|
def estimatedurationwithfilesizebr(file_size_bytes: int, br_kbps: float, return_seconds: bool = False) -> str:
|
|
if not file_size_bytes or not br_kbps or br_kbps <= 0: return "-:-:-"
|
|
total_bits = file_size_bytes * 8
|
|
duration_seconds = int(total_bits / (br_kbps * 1000))
|
|
if return_seconds: return duration_seconds
|
|
hours = duration_seconds // 3600
|
|
minutes = (duration_seconds % 3600) // 60
|
|
seconds = duration_seconds % 60
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}"
|
|
|
|
|
|
'''estimatedurationwithfilelink'''
|
|
def estimatedurationwithfilelink(filelink: str = '', headers: dict = None, request_overrides: dict = None):
|
|
headers, request_overrides = headers or {}, request_overrides or {}
|
|
try:
|
|
(resp := requests.get(filelink, headers=headers, timeout=10, **request_overrides)).raise_for_status()
|
|
audio = MutagenFile(BytesIO(resp.content))
|
|
length = getattr(audio.info, "length", 0)
|
|
return int(length)
|
|
except:
|
|
return 0
|
|
|
|
|
|
'''cookies2dict'''
|
|
def cookies2dict(cookies: str | dict = None):
|
|
if not cookies: cookies = {}
|
|
if isinstance(cookies, dict): return cookies
|
|
if isinstance(cookies, str): (c := SimpleCookie()).load(cookies); return {k: morsel.value for k, morsel in c.items()}
|
|
raise TypeError(f'cookies type is "{type(cookies)}", expect cookies to "str" or "dict" or "None".')
|
|
|
|
|
|
'''cookies2string'''
|
|
def cookies2string(cookies: str | dict = None):
|
|
if not cookies: cookies = ""
|
|
if isinstance(cookies, str): return cookies
|
|
if isinstance(cookies, dict): return (lambda c: ([c.__setitem__(k, "" if v is None else str(v)) for k, v in cookies.items()], "; ".join(m.OutputString() for m in c.values()))[1])(SimpleCookie())
|
|
raise TypeError(f'cookies type is "{type(cookies)}", expect cookies to "str" or "dict" or "None".')
|
|
|
|
|
|
'''touchdir'''
|
|
def touchdir(directory, exist_ok=True, mode=511, auto_sanitize=True):
|
|
if auto_sanitize: directory = sanitize_filepath(directory)
|
|
return os.makedirs(directory, exist_ok=exist_ok, mode=mode)
|
|
|
|
|
|
'''replacefile'''
|
|
def replacefile(src: str, dest: str):
|
|
try:
|
|
os.replace(src, dest)
|
|
except OSError as exc:
|
|
if exc.errno != errno.EXDEV: raise Exception
|
|
if os.path.exists(dest):
|
|
if os.path.isdir(dest): raise Exception
|
|
os.remove(dest)
|
|
shutil.move(src, dest)
|
|
|
|
|
|
'''legalizestring'''
|
|
def legalizestring(string: str, fit_gbk: bool = True, max_len: int = 255, fit_utf8: bool = True, replace_null_string: str = 'NULL'):
|
|
if not string: return replace_null_string
|
|
string = str(string)
|
|
string = string.replace(r'\"', '"')
|
|
string = re.sub(r"<\\/", "</", string)
|
|
string = re.sub(r"\\/>", "/>", string)
|
|
string = re.sub(r"\\u([0-9a-fA-F]{4})", lambda m: chr(int(m.group(1), 16)), string)
|
|
# html.unescape
|
|
for _ in range(2):
|
|
new_string = html.unescape(string)
|
|
if new_string == string: break
|
|
string = new_string
|
|
# bleach.clean
|
|
try: string = BeautifulSoup(string, "lxml").get_text(separator="")
|
|
except: string = bleach.clean(string, tags=[], attributes={}, strip=True)
|
|
# unicodedata.normalize
|
|
string = unicodedata.normalize("NFC", string)
|
|
# emoji.replace_emoji
|
|
string = emoji.replace_emoji(string, replace="")
|
|
# isprintable
|
|
string = "".join([ch for ch in string if ch.isprintable() and not unicodedata.category(ch).startswith("C")])
|
|
# sanitize_filename
|
|
string = sanitize_filename(string, max_len=max_len)
|
|
# fix encoding
|
|
if fit_gbk: string = string.encode("gbk", errors="ignore").decode("gbk", errors="ignore")
|
|
if fit_utf8: string = string.encode("utf-8", errors="ignore").decode("utf-8", errors="ignore")
|
|
# return
|
|
string = re.sub(r"\s+", " ", string).strip()
|
|
if not string: string = replace_null_string
|
|
return string
|
|
|
|
|
|
'''shortenpathsinsonginfos'''
|
|
def shortenpathsinsonginfos(song_infos: list, max_path: int = 240, keep_ext: bool = True, with_hash_suffix: bool = False):
|
|
used_paths = set()
|
|
for info in song_infos:
|
|
raw_path = (info.save_path or "").strip()
|
|
if not raw_path or raw_path.upper() == "NULL": continue
|
|
src_path = Path(raw_path); output_dir = src_path.parent.resolve(); output_dir.mkdir(parents=True, exist_ok=True)
|
|
ext = src_path.suffix if keep_ext else ""; stem = src_path.stem
|
|
digest = hashlib.md5(str(src_path).encode("utf-8")).hexdigest()
|
|
for hash_len in (8, 10):
|
|
hash_suffix = f"-{digest[:hash_len]}" if with_hash_suffix else ""
|
|
max_stem_len = max(1, max_path - (len(str(output_dir)) + 1 + len(hash_suffix) + len(ext)))
|
|
safe_stem = (stem[:max_stem_len].rstrip(" .") or "NULL")
|
|
out_path = str(output_dir / f"{safe_stem}{hash_suffix}{ext}")
|
|
if out_path.lower() not in used_paths: break
|
|
used_paths.add(out_path.lower()); info._save_path = out_path
|
|
return song_infos
|
|
|
|
|
|
'''seconds2hms'''
|
|
def seconds2hms(seconds: int):
|
|
try:
|
|
seconds = int(float(seconds))
|
|
m, s = divmod(seconds, 60)
|
|
h, m = divmod(m, 60)
|
|
hms = '%02d:%02d:%02d' % (h, m, s)
|
|
if hms == '00:00:00': hms = '-:-:-'
|
|
except:
|
|
hms = '-:-:-'
|
|
return hms
|
|
|
|
|
|
'''byte2mb'''
|
|
def byte2mb(size: int):
|
|
try:
|
|
size = int(float(size))
|
|
if size == 0: return 'NULL'
|
|
size = round(size / 1024 / 1024, 2)
|
|
if size == 0.0: return 'NULL'
|
|
size = f'{size} MB'
|
|
except:
|
|
size = 'NULL'
|
|
return size
|
|
|
|
|
|
'''resp2json'''
|
|
def _valid_response_types():
|
|
response_types = [requests.Response]
|
|
curl_cffi = optionalimport('curl_cffi')
|
|
curl_requests = getattr(curl_cffi, 'requests', None) if curl_cffi else None
|
|
curl_response = getattr(curl_requests, 'Response', None) if curl_requests else None
|
|
if curl_response is not None:
|
|
response_types.append(curl_response)
|
|
return tuple(response_types)
|
|
|
|
|
|
'''resp2json'''
|
|
def resp2json(resp: requests.Response):
|
|
valid_resp_object = _valid_response_types()
|
|
if not isinstance(resp, valid_resp_object): return {}
|
|
try: result = resp.json()
|
|
except: result = json_repair.loads(resp.text)
|
|
if not result: result = dict()
|
|
return result
|
|
|
|
|
|
'''isvalidresp'''
|
|
def isvalidresp(resp: requests.Response, valid_status_codes: list | tuple | set = {200, 206}):
|
|
valid_resp_object = _valid_response_types()
|
|
if not isinstance(resp, valid_resp_object): return False
|
|
if resp is None or resp.status_code not in valid_status_codes: return False
|
|
return True
|
|
|
|
|
|
'''safeextractfromdict'''
|
|
def safeextractfromdict(data, progressive_keys, default_value = None):
|
|
try:
|
|
result = data
|
|
for key in progressive_keys: result = result[key]
|
|
except:
|
|
result = default_value
|
|
return result
|
|
|
|
|
|
'''cachecookies'''
|
|
def cachecookies(client_name: str = '', cache_cookie_path: str = '', client_cookies: dict = None):
|
|
if os.path.exists(cache_cookie_path):
|
|
with open(cache_cookie_path, 'rb') as fp: cookies = pickle.load(fp)
|
|
else:
|
|
cookies = dict()
|
|
with open(cache_cookie_path, 'wb') as fp:
|
|
cookies[client_name] = client_cookies
|
|
pickle.dump(cookies, fp)
|
|
|
|
|
|
'''usedownloadheaderscookies'''
|
|
def usedownloadheaderscookies(func):
|
|
@functools.wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
self.default_headers = self.default_download_headers
|
|
if hasattr(self, 'default_download_cookies'): self.default_cookies = self.default_download_cookies
|
|
if hasattr(self, 'enable_download_curl_cffi'): self.enable_curl_cffi = self.enable_download_curl_cffi
|
|
if hasattr(self, '_initsession'): self._initsession()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
'''useparseheaderscookies'''
|
|
def useparseheaderscookies(func):
|
|
@functools.wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
self.default_headers = self.default_parse_headers
|
|
if hasattr(self, 'default_parse_cookies'): self.default_cookies = self.default_parse_cookies
|
|
if hasattr(self, 'enable_parse_curl_cffi'): self.enable_curl_cffi = self.enable_parse_curl_cffi
|
|
if hasattr(self, '_initsession'): self._initsession()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
'''usesearchheaderscookies'''
|
|
def usesearchheaderscookies(func):
|
|
@functools.wraps(func)
|
|
def wrapper(self, *args, **kwargs):
|
|
self.default_headers = self.default_search_headers
|
|
if hasattr(self, 'default_search_cookies'): self.default_cookies = self.default_search_cookies
|
|
if hasattr(self, 'enable_search_curl_cffi'): self.enable_curl_cffi = self.enable_search_curl_cffi
|
|
if hasattr(self, '_initsession'): self._initsession()
|
|
return func(self, *args, **kwargs)
|
|
return wrapper
|
|
|
|
|
|
'''searchdictbykey'''
|
|
def searchdictbykey(obj, target_key: str):
|
|
results = []
|
|
if isinstance(obj, dict):
|
|
for k, v in obj.items():
|
|
if k == target_key: results.append(v)
|
|
results.extend(searchdictbykey(v, target_key))
|
|
elif isinstance(obj, list):
|
|
for item in obj: results.extend(searchdictbykey(item, target_key))
|
|
return results
|
|
|
|
|
|
'''naiveguessextfromaudiobytes'''
|
|
def naiveguessextfromaudiobytes(content: bytes):
|
|
if (audio := MutagenFile(BytesIO(content))) is None: return None
|
|
if isinstance(audio, MP3): return "mp3"
|
|
if isinstance(audio, FLAC): return "flac"
|
|
if isinstance(audio, MP4): return "m4a"
|
|
if isinstance(audio, OggVorbis): return "ogg"
|
|
if isinstance(audio, WAVE): return "wav"
|
|
if isinstance(audio, AIFF): return "aiff"
|
|
if isinstance(audio, ASF): return "wma"
|
|
return None
|
|
|
|
|
|
'''AudioLinkTester'''
|
|
class AudioLinkTester(object):
|
|
VALID_AUDIO_EXTS = {
|
|
"aac", "aax", "aaxc", "ac3", "adts", "aif", "aifc", "aiff", "alac", "amr", "ape", "au", "avr", "awb", "caf", "cda", "dff", "dfsf", "dsf", "dss", "dts", "dtshd", "ec3", "f32",
|
|
"f64", "flac", "gsm", "hca", "htk", "iff", "ima", "ircam", "kar", "kss", "la", "l16", "m15", "m3u8", "m4a", "m4b", "m4p", "m4r", "mat4", "mat5", "med", "midi", "mid", "mlp",
|
|
"mod", "mo3", "mp1", "mp2", "mp3", "mpa", "mpc", "mp+", "mpp", "mptm", "msv", "mt2", "mtm", "mxmf", "nist", "nsf", "oga", "ogg", "okt", "oma", "ofr", "ofs", "opus", "paf",
|
|
"pcm", "ptm", "pvf", "ra", "ram", "rf64", "rmi", "rmj", "rmm", "rmx", "roq", "raw", "s3m", "sap", "sds", "sd2", "sd2f", "sf", "shn", "sid", "snd", "spc", "spx", "stm", "tak",
|
|
"tta", "thd", "ul", "ult", "umx", "voc", "vgm", "vgz", "wav", "wave", "wax", "w64", "wma", "wve", "wv", "wvx", "xi", "xm", "8svx", "16svx", "669", "amf", "dmf", "far", "gbs",
|
|
"gym", "hes", "it", "mdl", "mpc2k", "nsa", "psf", "psf1", "psf2", "ssf", "miniusf", "usf", "2sf", "gsf", "qsf", "spu", "at3", "aa3", "at9", "3ga", "m4s"
|
|
}
|
|
AUDIO_CT_PREFIX = "audio/"
|
|
AUDIO_CT_EXTRA = {"application/octet-stream", "application/x-flac", "application/flac", "application/x-mpegurl", "video/mp4"}
|
|
MAGIC = [(b"ID3", "mp3"), (b"\xFF\xFB", "mp3"), (b"fLaC", "flac"), (b"RIFF", "wav"), (b"OggS", "ogg"), (b"MThd", "midi"), (b"\x00\x00\x00\x18ftyp", "mp4/m4a")]
|
|
CTYPE_TO_EXT = {"audio/mpeg": "mp3", "audio/mp3": "mp3", "audio/mp4": "m4a", "audio/x-m4a": "m4a", "audio/aac": "aac", "audio/wav": "wav", "video/mp4": "mp4", "audio/x-wav": "wav", "audio/flac": "flac", "audio/x-flac": "flac", "audio/ogg": "ogg", "audio/opus": "opus", "audio/x-aac": "ogg", "audio/x-ogg": "ogg", "audio/x-m4p": "m4a"}
|
|
def __init__(self, timeout=(5, 15), headers: dict = None, cookies: dict = None):
|
|
self.session = requests.Session()
|
|
self.timeout = timeout
|
|
self.headers = {'Accept': '*/*', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36'}
|
|
self.headers.update(headers or {})
|
|
self.cookies = cookies or {}
|
|
'''isaudioct'''
|
|
@staticmethod
|
|
def isaudioct(ct: str):
|
|
if not ct: return False
|
|
ct = ct.lower().split(";", 1)[0].strip()
|
|
return ct.startswith(AudioLinkTester.AUDIO_CT_PREFIX) or ct in AudioLinkTester.AUDIO_CT_EXTRA
|
|
'''sniffmagic'''
|
|
@staticmethod
|
|
def sniffmagic(b: str):
|
|
for sig, fmt in AudioLinkTester.MAGIC:
|
|
if b.startswith(sig): return fmt
|
|
if len(b) >= 2 and b[0] == 0xFF and (b[1] & 0xF0) == 0xF0: return "aac/adts"
|
|
return None
|
|
'''probe'''
|
|
def probe(self, url: str, request_overrides: dict = None):
|
|
request_overrides, naive_guess_ext = copy.deepcopy(request_overrides or {}), url.split('?')[0].split('.')[-1]
|
|
if 'headers' not in request_overrides: request_overrides['headers'] = self.headers
|
|
if 'timeout' not in request_overrides: request_overrides['timeout'] = self.timeout
|
|
if 'cookies' not in request_overrides: request_overrides['cookies'] = self.cookies
|
|
outputs = dict(file_size='NULL', ctype='NULL', ext='NULL', download_url=url, final_url='NULL')
|
|
# HEAD probe
|
|
try:
|
|
(resp := self.session.head(url, allow_redirects=True, **request_overrides)).raise_for_status()
|
|
resp_headers, final_url = resp.headers, resp.url; resp.close()
|
|
file_size, ctype = byte2mb(resp_headers.get('content-length')), remove_suffix(str(resp_headers.get('content-type')), '; charset=UTF-8')
|
|
if ctype == 'image/jpg; charset=UTF-8' or ctype == 'image/jpg': ctype = 'audio/mpeg'
|
|
if ctype == 'text/plain' and naive_guess_ext == 'm4s': ctype = 'audio/mp4'
|
|
ext = self.CTYPE_TO_EXT.get(ctype, 'NULL')
|
|
outputs = dict(file_size=file_size, ctype=ctype, ext=ext, download_url=url, final_url=final_url)
|
|
except:
|
|
outputs = dict(file_size='NULL', ctype='NULL', ext='NULL', download_url=url, final_url='NULL')
|
|
if outputs['file_size'] and outputs['file_size'] not in ('NULL',): return outputs
|
|
# GETSTREAM probe
|
|
try:
|
|
(resp := self.session.get(url, allow_redirects=True, stream=True, **request_overrides)).raise_for_status()
|
|
resp_headers, final_url = resp.headers, resp.url; resp.close()
|
|
file_size, ctype = byte2mb(resp_headers.get('content-length')), remove_suffix(str(resp_headers.get('content-type')), '; charset=UTF-8')
|
|
if ctype == 'image/jpg; charset=UTF-8' or ctype == 'image/jpg': ctype = 'audio/mpeg'
|
|
if ctype == 'text/plain' and naive_guess_ext == 'm4s': ctype = 'audio/mp4'
|
|
ext = self.CTYPE_TO_EXT.get(ctype, 'NULL')
|
|
outputs = dict(file_size=file_size, ctype=ctype, ext=ext, download_url=url, final_url=final_url)
|
|
except:
|
|
outputs = dict(file_size='NULL', ctype='NULL', ext='NULL', download_url=url, final_url='NULL')
|
|
return outputs
|
|
'''test'''
|
|
def test(self, url: str, request_overrides: dict = None):
|
|
request_overrides, naive_guess_ext = copy.deepcopy(request_overrides or {}), url.split('?')[0].split('.')[-1]
|
|
if 'headers' not in request_overrides: request_overrides['headers'] = self.headers
|
|
if 'timeout' not in request_overrides: request_overrides['timeout'] = self.timeout
|
|
if 'cookies' not in request_overrides: request_overrides['cookies'] = self.cookies
|
|
outputs = dict(ok=False, status=0, method="", final_url=None, ctype=None, clen=None, range=None, fmt=None, reason="")
|
|
# HEAD test
|
|
try:
|
|
resp = self.session.head(url, allow_redirects=True, **request_overrides)
|
|
clen = resp.headers.get("Content-Length")
|
|
clen = int(clen) if clen and clen.isdigit() else None
|
|
outputs.update(dict(status=resp.status_code, method="HEAD", final_url=str(resp.url), ctype=resp.headers.get("Content-Type"), clen=clen, range=(resp.headers.get("Accept-Ranges") or "").lower() == "bytes"))
|
|
if outputs["ctype"] == 'text/plain' and naive_guess_ext == 'm4s': outputs["ctype"] = 'audio/mp4'
|
|
if 200 <= resp.status_code < 300 and ((self.isaudioct(outputs["ctype"]) or (naive_guess_ext in ('m4s',))) and (outputs["clen"] or outputs["range"])): outputs.update(dict(ok=True, reason="HEAD success")); return outputs
|
|
except Exception as err:
|
|
outputs["reason"] = f"HEAD error: {err}"
|
|
# RANGEGET test
|
|
try:
|
|
resp = self.session.get(url, stream=True, allow_redirects=True, **request_overrides)
|
|
outputs.update(dict(status=resp.status_code, method="RANGEGET", final_url=str(resp.url)))
|
|
if resp.status_code not in (200, 206): outputs["reason"] = f"RANGEGET error: response status {resp.status_code}"; return outputs
|
|
chunk = b""
|
|
for b in resp.iter_content(chunk_size=16): chunk = b; break
|
|
resp.close()
|
|
outputs["ctype"] = outputs["ctype"] or resp.headers.get("Content-Type")
|
|
if outputs["ctype"] == 'text/plain' and naive_guess_ext == 'm4s': outputs["ctype"] = 'audio/mp4'
|
|
outputs["range"] = outputs["range"] or (resp.status_code == 206) or (resp.headers.get("Content-Range") is not None)
|
|
clen = resp.headers.get("Content-Length") or (resp.headers.get("Content-Range") or "").split("/")[-1]
|
|
if clen and clen.isdigit(): outputs["clen"] = int(clen)
|
|
outputs["fmt"] = self.sniffmagic(chunk)
|
|
if self.isaudioct(outputs["ctype"]) or outputs["fmt"] or (naive_guess_ext in ('m4s',)): outputs.update(dict(ok=True, reason="RANGEGET success"))
|
|
else: outputs.update(dict(ok=False, reason="RANGEGET error: Not audio-like (CT/magic)"))
|
|
except Exception as err:
|
|
outputs["reason"] = f"RANGEGET error: {err}"
|
|
# return
|
|
return outputs
|