236 lines
22 KiB
Python
236 lines
22 KiB
Python
'''
|
|
Function:
|
|
Implementation of SpotifyMusicClient: https://open.spotify.com/
|
|
Author:
|
|
Zhenchao Jin
|
|
WeChat Official Account (微信公众号):
|
|
Charles的皮卡丘
|
|
'''
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import copy
|
|
from bs4 import BeautifulSoup
|
|
from .base import BaseMusicClient
|
|
from pathvalidate import sanitize_filepath
|
|
from urllib.parse import urlparse, parse_qs
|
|
from ..utils.hosts import SPOTIFY_MUSIC_HOSTS
|
|
from ..utils.spotifyutils import SpotifyMusicClientPlaylistUtils, SpotifyMusicClientSearchUtils
|
|
from rich.progress import Progress, TextColumn, BarColumn, TimeRemainingColumn, MofNCompleteColumn
|
|
from ..utils import byte2mb, touchdir, legalizestring, resp2json, seconds2hms, usesearchheaderscookies, safeextractfromdict, naiveguessextfromaudiobytes, useparseheaderscookies, obtainhostname, hostmatchessuffix, extractdurationsecondsfromlrc, SongInfo, AudioLinkTester, LyricSearchClient
|
|
|
|
|
|
'''SpotifyMusicClient'''
|
|
class SpotifyMusicClient(BaseMusicClient):
|
|
source = 'SpotifyMusicClient'
|
|
def __init__(self, **kwargs):
|
|
super(SpotifyMusicClient, self).__init__(**kwargs)
|
|
self.default_search_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", "Accept": "application/json", "Accept-Language": "en-US,en;q=0.9", "Referer": "https://open.spotify.com/", "Origin": "https://open.spotify.com/"}
|
|
self.default_parse_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36", "Accept": "application/json", "Accept-Language": "en-US,en;q=0.9", "Referer": "https://open.spotify.com/", "Origin": "https://open.spotify.com/"}
|
|
self.default_download_headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"}
|
|
self.default_headers = self.default_search_headers
|
|
self._initsession()
|
|
'''_constructsearchurls'''
|
|
def _constructsearchurls(self, keyword: str, rule: dict = None, request_overrides: dict = None):
|
|
# init
|
|
rule, request_overrides = rule or {}, request_overrides or {}
|
|
# construct search urls based on search rules
|
|
search_urls, page_size, count = [], self.search_size_per_page, 0
|
|
while self.search_size_per_source > count:
|
|
search_urls.append({'api': SpotifyMusicClientSearchUtils.searchbykeyword, 'inputs': {'session': copy.deepcopy(self.session), 'query': keyword, 'limit': page_size, 'offset': count, 'rule': copy.deepcopy(rule), 'request_overrides': request_overrides}})
|
|
count += page_size
|
|
# return
|
|
return search_urls
|
|
'''_parsewithspotisaverapi'''
|
|
def _parsewithspotisaverapi(self, search_result: dict, request_overrides: dict = None):
|
|
# init
|
|
request_overrides, song_id = request_overrides or {}, str(search_result['id'])
|
|
headers = {
|
|
"referer": "https://spotisaver.net/en1", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty",
|
|
"sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36",
|
|
}
|
|
# parse
|
|
(resp := self.get(f'https://spotisaver.net/api/get_playlist.php?id={song_id}&type=track&lang=en', headers=headers, **request_overrides)).raise_for_status()
|
|
payload = {"track": (download_result := resp2json(resp=resp))["tracks"][0], "download_dir": "downloads", "filename_tag": "SPOTISAVER", "user_ip": "2601:1e23:dac0:b1d7:39a4:640e:4700:01c7", "is_premium": "true"}
|
|
(resp := self.post('https://spotisaver.net/api/download_track.php', json=payload, headers=headers, **request_overrides)).raise_for_status()
|
|
try: duration_in_secs = float(safeextractfromdict(download_result, ['tracks', 0, 'duration_ms'], 0)) / 1000
|
|
except Exception: duration_in_secs = 0
|
|
song_info = SongInfo(
|
|
raw_data={'search': search_result, 'download': download_result, 'lyric': {}}, source=self.source, song_name=legalizestring(safeextractfromdict(download_result, ['tracks', 0, 'name'], None)), singers=legalizestring(', '.join(safeextractfromdict(download_result, ['tracks', 0, 'artists'], []) or [])), album=legalizestring(safeextractfromdict(download_result, ['tracks', 0, 'album'], None)), ext=naiveguessextfromaudiobytes(resp.content),
|
|
file_size_bytes=resp.content.__sizeof__(), file_size=byte2mb(resp.content.__sizeof__()), identifier=song_id, duration_s=duration_in_secs, duration=seconds2hms(duration_in_secs), lyric=None, cover_url=safeextractfromdict(download_result, ['tracks', 0, 'image', 'url'], None), download_url=None, downloaded_contents=resp.content, download_url_status={'ok': True},
|
|
)
|
|
if (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = 'mp3'
|
|
# return
|
|
return song_info
|
|
'''_parsewithspotubedlapi'''
|
|
def _parsewithspotubedlapi(self, search_result: dict, request_overrides: dict = None):
|
|
# init
|
|
request_overrides, song_id = request_overrides or {}, str(search_result['id'])
|
|
headers = {
|
|
"referer": "https://spotubedl.com/", "sec-ch-ua": '"Not:A-Brand";v="99", "Google Chrome";v="145", "Chromium";v="145"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty",
|
|
"sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/145.0.0.0 Safari/537.36"
|
|
}
|
|
# parse
|
|
(resp := self.get(f'https://spotubedl.com/api/metadata/{song_id}', headers=headers, **request_overrides)).raise_for_status()
|
|
vid = parse_qs(urlparse(str((download_result := resp2json(resp=resp))['youtube_url'])).query, keep_blank_values=True).get('v')[0]
|
|
(resp := self.get(f'https://spotubedl.com/api/download/{vid}?engine=v1&format=mp3&quality=320', headers=headers, **request_overrides)).raise_for_status()
|
|
download_url = resp2json(resp=resp)['url']; download_result['youtube_resp'] = resp2json(resp=resp)
|
|
song_info = SongInfo(
|
|
raw_data={'search': search_result, 'download': download_result, 'lyric': {}}, source=self.source, song_name=legalizestring(download_result.get('name')), singers=legalizestring(', '.join(download_result.get('artists', []) or [])),
|
|
album=legalizestring(download_result.get('album_name', None)), ext='mp3', file_size_bytes=None, file_size=None, identifier=song_id, duration_s=download_result.get('duration'), duration=seconds2hms(download_result.get('duration')),
|
|
lyric=None, cover_url=download_result.get('cover_url'), download_url=download_url, download_url_status=self.audio_link_tester.test(download_url, request_overrides), default_download_headers=headers,
|
|
)
|
|
song_info.download_url_status['probe_status'] = self.audio_link_tester.probe(song_info.download_url, request_overrides)
|
|
song_info.file_size = song_info.download_url_status['probe_status']['file_size']
|
|
if (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS) and (song_info.download_url_status['probe_status']['ext'] in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = song_info.download_url_status['probe_status']['ext']
|
|
elif (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = 'mp3'
|
|
# return
|
|
return song_info
|
|
'''_parsewithspotidownloaderapi'''
|
|
def _parsewithspotidownloaderapi(self, search_result: dict, request_overrides: dict = None):
|
|
# init
|
|
request_overrides, song_id = request_overrides or {}, str(search_result['id'])
|
|
# fetch token
|
|
(resp := self.get('https://spdl.afkarxyz.fun/token', headers={"User-Agent": "CharlesPikachu-musicdl"}, **request_overrides)).raise_for_status()
|
|
session_token = (download_result := resp2json(resp=resp))['token']
|
|
headers = {"Authorization": f"Bearer {session_token}", "Content-Type": "application/json", "Origin": "https://spotidownloader.com", "Referer": "https://spotidownloader.com/"}
|
|
# parse
|
|
(resp := self.post(f'https://api.spotidownloader.com/download', headers=headers, json={"id": song_id}, **request_overrides)).raise_for_status()
|
|
download_result.update(resp2json(resp=resp))
|
|
download_urls: list[str] = [u for u in [download_result.get('linkFlac'), download_result.get('link')] if u and str(u).startswith('http')]
|
|
try: duration_in_secs = float(safeextractfromdict(search_result, ['item', 'data', 'duration', 'totalMilliseconds'], 0) or safeextractfromdict(search_result, ['itemV2', 'data', 'trackDuration', 'totalMilliseconds'], 0)) / 1000
|
|
except Exception: duration_in_secs = 0
|
|
for download_url in download_urls:
|
|
song_info = SongInfo(
|
|
raw_data={'search': search_result, 'download': download_result, 'lyric': {}}, source=self.source, song_name=legalizestring(safeextractfromdict(download_result, ['metadata', 'title'], None)), singers=legalizestring(safeextractfromdict(download_result, ['metadata', 'artists'], None)), album=legalizestring(safeextractfromdict(download_result, ['metadata', 'album'], None)),
|
|
ext=download_url.split('?')[0].split('.')[-1], file_size_bytes=None, file_size=None, identifier=song_id, duration_s=duration_in_secs, duration=seconds2hms(duration_in_secs), lyric=None, cover_url=safeextractfromdict(download_result, ['metadata', 'cover'], None), download_url=download_url, download_url_status=self.audio_link_tester.test(download_url, request_overrides),
|
|
)
|
|
if song_info.ext in {'m4s', 'mp4'}: song_info.ext = 'm4a'
|
|
song_info.download_url_status['probe_status'] = self.audio_link_tester.probe(song_info.download_url, request_overrides)
|
|
song_info.file_size = song_info.download_url_status['probe_status']['file_size']
|
|
if (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS) and (song_info.download_url_status['probe_status']['ext'] in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = song_info.download_url_status['probe_status']['ext']
|
|
elif (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = 'mp3'
|
|
if song_info.with_valid_download_url: break
|
|
# return
|
|
return song_info
|
|
'''_parsewithspotmateapi'''
|
|
def _parsewithspotmateapi(self, search_result: dict, request_overrides: dict = None):
|
|
# init
|
|
request_overrides, song_id, session = request_overrides or {}, str(search_result['id']), copy.deepcopy(self.session)
|
|
session.headers = {'user-agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Mobile Safari/537.36'}
|
|
(resp := session.get('https://spotmate.online/en', **request_overrides)).raise_for_status()
|
|
cookies = "; ".join([f"{cookie.name}={cookie.value}" for cookie in session.cookies])
|
|
soup = BeautifulSoup(resp.text, 'lxml'); meta_tag = soup.find('meta', attrs={'name': 'csrf-token'}); csrf_token = meta_tag.get('content')
|
|
headers = {
|
|
'authority': 'spotmate.online', 'accept': '*/*', 'accept-language': 'id-ID,id;q=0.9,en-US;q=0.8,en;q=0.7', 'origin': 'https://spotmate.online', 'referer': 'https://spotmate.online/en', 'x-csrf-token': csrf_token,
|
|
'sec-ch-ua': '"Not A(Brand";v="8", "Chromium";v="132"', 'sec-ch-ua-mobile': '?1', 'sec-ch-ua-platform': '"Android"', 'sec-fetch-dest': 'empty', 'sec-fetch-site': 'same-origin', 'content-type': 'application/json',
|
|
'user-agent': 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Mobile Safari/537.36', 'cookie': cookies, 'sec-fetch-mode': 'cors',
|
|
}
|
|
# parse
|
|
(resp := session.post('https://spotmate.online/getTrackData', json={'spotify_url': f'https://open.spotify.com/track/{song_id}'}, headers=headers, **request_overrides)).raise_for_status()
|
|
download_result = resp2json(resp=resp)
|
|
(resp := session.post('https://spotmate.online/convert', json={'urls': f'https://open.spotify.com/track/{song_id}'}, headers=headers, **request_overrides)).raise_for_status()
|
|
download_result['convert'] = resp2json(resp=resp); download_url = download_result['convert']['url']
|
|
try: duration_in_secs = float(safeextractfromdict(download_result, ['duration_ms'], 0)) / 1000
|
|
except Exception: duration_in_secs = 0
|
|
try: ext = parse_qs(urlparse(download_url).query, keep_blank_values=True).get('format')[0]
|
|
except Exception: ext = 'mp3'
|
|
song_info = SongInfo(
|
|
raw_data={'search': search_result, 'download': download_result, 'lyric': {}}, source=self.source, song_name=legalizestring(download_result.get('name', '')), singers=legalizestring(', '.join([singer.get('name') for singer in (download_result.get('artists', []) or []) if isinstance(singer, dict) and singer.get('name')])),
|
|
album=legalizestring(safeextractfromdict(search_result, ['itemV2', 'data', 'albumOfTrack', 'name'], None) or safeextractfromdict(search_result, ['item', 'data', 'albumOfTrack', 'name'], None)), ext=ext, file_size=None, identifier=song_id, duration_s=duration_in_secs, duration=seconds2hms(duration_in_secs), lyric='NULL',
|
|
cover_url=safeextractfromdict(download_result, ['album', 'images', 0, 'url'], None), download_url=download_url, download_url_status=self.audio_link_tester.test(download_url, request_overrides), default_download_headers=headers,
|
|
)
|
|
song_info.download_url_status['probe_status'] = self.audio_link_tester.probe(song_info.download_url, request_overrides)
|
|
song_info.file_size = song_info.download_url_status['probe_status']['file_size']
|
|
if (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS) and (song_info.download_url_status['probe_status']['ext'] in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = song_info.download_url_status['probe_status']['ext']
|
|
elif (song_info.ext not in AudioLinkTester.VALID_AUDIO_EXTS): song_info.ext = 'mp3'
|
|
# return
|
|
return song_info
|
|
'''_parsewiththirdpartapis'''
|
|
def _parsewiththirdpartapis(self, search_result: dict, request_overrides: dict = None):
|
|
for imp_func in [self._parsewithspotisaverapi, self._parsewithspotidownloaderapi, self._parsewithspotmateapi, self._parsewithspotubedlapi]:
|
|
try: song_info_flac = imp_func(search_result, request_overrides); assert song_info_flac.with_valid_download_url; break
|
|
except: song_info_flac = SongInfo(source=self.source)
|
|
return song_info_flac
|
|
'''_parsewithofficialapiv1'''
|
|
def _parsewithofficialapiv1(self, search_result: dict, song_info_flac: SongInfo = None, lossless_quality_is_sufficient: bool = True, lossless_quality_definitions: set | list | tuple = {'flac'}, request_overrides: dict = None) -> "SongInfo":
|
|
# init
|
|
song_info, request_overrides, song_info_flac = SongInfo(source=self.source), request_overrides or {}, song_info_flac or SongInfo(source=self.source)
|
|
if (not isinstance(search_result, dict)) or (not (song_id := search_result.get('id'))): return song_info
|
|
# obtain basic song_info
|
|
if lossless_quality_is_sufficient and song_info_flac.with_valid_download_url and (song_info_flac.ext in lossless_quality_definitions): song_info = song_info_flac
|
|
else:
|
|
pass # TODO: Solve DRM Issues in Spotify
|
|
if not song_info.with_valid_download_url: song_info = song_info_flac
|
|
if not song_info.with_valid_download_url: return song_info
|
|
# supplement lyric results
|
|
lyric_result, lyric = LyricSearchClient().search(artist_name=song_info.singers, track_name=song_info.song_name, request_overrides=request_overrides)
|
|
song_info.raw_data['lyric'] = lyric_result if lyric_result else song_info.raw_data['lyric']
|
|
song_info.lyric = lyric if (lyric and (lyric not in {'NULL'})) else song_info.lyric
|
|
if not song_info.duration or song_info.duration == '-:-:-': song_info.duration = seconds2hms(extractdurationsecondsfromlrc(song_info.lyric))
|
|
# return
|
|
return song_info
|
|
'''_search'''
|
|
@usesearchheaderscookies
|
|
def _search(self, keyword: str = '', search_url: dict = '', request_overrides: dict = None, song_infos: list = [], progress: Progress = None, progress_id: int = 0):
|
|
# init
|
|
request_overrides, search_api, search_api_inputs = request_overrides or {}, search_url['api'], search_url['inputs']
|
|
# successful
|
|
try:
|
|
# --search results
|
|
for search_result in safeextractfromdict((search_resp := search_api(**search_api_inputs)), ['data', 'searchV2', 'tracksV2', 'items'], []) or safeextractfromdict(search_resp, ['data', 'searchV2', 'tracks', 'items'], []):
|
|
search_result['id'] = safeextractfromdict(search_result, ['item', 'data', 'id'], None)
|
|
if not search_result['id']: search_result['id'] = str(safeextractfromdict(search_result, ['item', 'data', 'uri'], '')).removeprefix('spotify:track:')
|
|
# --parse with third part apis
|
|
song_info_flac = self._parsewiththirdpartapis(search_result=search_result, request_overrides=request_overrides)
|
|
# --parse with official apis
|
|
lossless_quality_is_sufficient = False if self.default_cookies or request_overrides.get('cookies') else True
|
|
try: song_info = self._parsewithofficialapiv1(search_result=search_result, song_info_flac=song_info_flac, lossless_quality_is_sufficient=lossless_quality_is_sufficient, request_overrides=request_overrides)
|
|
except Exception: song_info = SongInfo(source=self.source)
|
|
# --append to song_infos
|
|
if not song_info.with_valid_download_url: song_info = song_info_flac
|
|
if not song_info.with_valid_download_url: continue
|
|
song_infos.append(song_info)
|
|
# --judgement for search_size
|
|
if self.strict_limit_search_size_per_page and len(song_infos) >= self.search_size_per_page: break
|
|
# --update progress
|
|
progress.update(progress_id, description=f"{self.source}.search >>> {search_url} (Success)")
|
|
# failure
|
|
except Exception as err:
|
|
progress.update(progress_id, description=f"{self.source}.search >>> {search_url} (Error: {err})")
|
|
# return
|
|
return song_infos
|
|
'''parseplaylist'''
|
|
@useparseheaderscookies
|
|
def parseplaylist(self, playlist_url: str, request_overrides: dict = None):
|
|
# init
|
|
request_overrides = request_overrides or {}
|
|
playlist_url = self.session.head(playlist_url, allow_redirects=True, **request_overrides).url
|
|
playlist_id, song_infos = urlparse(playlist_url).path.strip('/').split('/')[-1].removesuffix('.html').removesuffix('.htm'), []
|
|
if (not (hostname := obtainhostname(url=playlist_url))) or (not hostmatchessuffix(hostname, SPOTIFY_MUSIC_HOSTS)): return song_infos
|
|
# get tracks in playlist
|
|
playlist_result_first, tracks_in_playlist = SpotifyMusicClientPlaylistUtils.parse(copy.deepcopy(self.session), playlist_id=playlist_id, request_overrides=request_overrides)
|
|
tracks_in_playlist = list({d["id"]: d for d in tracks_in_playlist}.values())
|
|
# parse track by track in playlist
|
|
with Progress(TextColumn("{task.description}"), BarColumn(bar_width=None), MofNCompleteColumn(), TimeRemainingColumn(), refresh_per_second=10) as main_process_context:
|
|
main_progress_id = main_process_context.add_task(f"{len(tracks_in_playlist)} songs found in playlist {playlist_id} >>> completed (0/{len(tracks_in_playlist)})", total=len(tracks_in_playlist))
|
|
for idx, track_info in enumerate(tracks_in_playlist):
|
|
if idx > 0: main_process_context.advance(main_progress_id, 1)
|
|
main_process_context.update(main_progress_id, description=f"{len(tracks_in_playlist)} songs found in playlist {playlist_id} >>> completed ({idx}/{len(tracks_in_playlist)})")
|
|
song_info_flac = self._parsewiththirdpartapis(search_result=track_info, request_overrides=request_overrides)
|
|
lossless_quality_is_sufficient = False if self.default_cookies or request_overrides.get('cookies') else True
|
|
try: song_info = self._parsewithofficialapiv1(search_result=track_info, song_info_flac=song_info_flac, lossless_quality_is_sufficient=lossless_quality_is_sufficient, request_overrides=request_overrides)
|
|
except Exception: song_info = song_info_flac
|
|
if not song_info.with_valid_download_url: song_info = song_info_flac
|
|
if song_info.with_valid_download_url: song_infos.append(song_info)
|
|
main_process_context.advance(main_progress_id, 1)
|
|
main_process_context.update(main_progress_id, description=f"{len(tracks_in_playlist)} songs found in playlist {playlist_id} >>> completed ({idx+1}/{len(tracks_in_playlist)})")
|
|
# post processing
|
|
playlist_name = safeextractfromdict(playlist_result_first, ['data', 'playlistV2', 'name'], None)
|
|
song_infos = self._removeduplicates(song_infos=song_infos); work_dir = self._constructuniqueworkdir(keyword=legalizestring(playlist_name or f"playlist-{playlist_id}"))
|
|
for song_info in song_infos:
|
|
song_info.work_dir = work_dir; episodes = song_info.episodes if isinstance(song_info.episodes, list) else []
|
|
for eps_info in episodes: eps_info.work_dir = sanitize_filepath(os.path.join(work_dir, song_info.song_name)); touchdir(work_dir)
|
|
# return results
|
|
return song_infos
|