Source code for sxm.client

import base64
import datetime
import json
import logging
import re
import time
import traceback
import urllib.parse
from typing import Any, Callable, Dict, List, Optional, Union

import httpx
from fake_useragent import UserAgent  # type: ignore
from make_it_sync import make_sync  # type: ignore
from tenacity import retry, stop_after_attempt, wait_fixed
from ua_parser import user_agent_parser  # type: ignore

from sxm.models import QualitySize, RegionChoice, XMChannel, XMLiveChannel

__all__ = [
    "HLS_AES_KEY",
    "SXMClient",
    "SXMClientAsync",
    "AuthenticationError",
    "SegmentRetrievalException",
]


SXM_APP_VERSION = "5.36.514"
SXM_DEVICE_MODEL = "EverestWebClient"
HLS_AES_KEY = base64.b64decode("0Nsco7MAgxowGvkUT8aYag==")
FALLBACK_UA = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0"
)
REST_V2_FORMAT = "https://player.siriusxm.com/rest/v2/experience/modules/{}"
REST_V4_FORMAT = "https://player.siriusxm.com/rest/v4/experience/modules/{}"
SESSION_MAX_LIFE = 14400

ENABLE_NEW_CHANNELS = True


class SXMError(Exception):
    """Base class for all other SXM Errors"""


class ConfigurationError(SXMError):
    """SXM Configuration retrive failed, renew session, and try again later"""


[docs]class AuthenticationError(SXMError): """SXM Authentication failed, renew session""" pass
[docs]class SegmentRetrievalException(SXMError): """failed to get HLS segment, renew session""" pass
class SXMClientAsync: """Class to interface with SXM api and access HLS live streams of audio Parameters ---------- username : :class:`str` SXM username password : :class:`str` SXM password region : :class:`str` ("US" or "CA") Sets your SXM account region user_agent : Optional[:class:`str`] User Agent string to use for making requests to SXM. If `None` is passed, it will attempt to generate one based on real browser usage data. Defaults to `None`. update_handler : Optional[Callable[[:class:`dict`], `None`]] Callback to be called whenever a playlist updates and new Live Channel data is retrieved. Defaults to `None`. Attributes ---------- is_logged_in : :class:`bool` Returns if account is logged into SXM's servers is_session_authenticated : :class:`bool` Returns if session is valid and ready to use sxmak_token : :class:`str` Needs documentation gup_id : :class:`str` Needs documentation channels : List[:class:`XMChannel`] Retrieves and returns a full list of all :class:`XMChannel` available to the logged in account favorite_channels : List[:class:`XMChannel`] Retrieves and returns a full list of all :class:`XMChannel` available to the logged in account that are marked as favorite """ last_renew: Optional[float] password: str region: RegionChoice update_handler: Optional[Callable[[dict], None]] update_interval: int username: str stream_quality: QualitySize _channels: Optional[List[XMChannel]] _favorite_channels: Optional[List[XMChannel]] _playlists: Dict[str, str] _use_primary: bool _ua: Dict[str, Any] _session: httpx.AsyncClient _configuration: Optional[Dict] = None _urls: Optional[Dict[str, str]] = None def __init__( self, username: str, password: str, region: RegionChoice = RegionChoice.US, quality: QualitySize = QualitySize.LARGE_256k, user_agent: Optional[str] = None, update_handler: Optional[Callable[[dict], None]] = None, ): self._log = logging.getLogger(__file__) if user_agent is None: try: ua = UserAgent(use_cache_server=False) ua.update() user_agent = ua.chrome except Exception: user_agent = FALLBACK_UA self._ua = user_agent_parser.Parse(user_agent) self.reset_session() self.username = username self.password = password self.region = region self.stream_quality = quality self._playlists = {} self._channels = None self._favorite_channels = None self._use_primary = True # vars to manage session cache self.last_renew = None self.update_interval = 30 # hook function to call whenever the playlist updates self.update_handler = update_handler def __del__(self): make_sync(self.close_session)() @property def is_logged_in(self) -> bool: return "SXMAUTHNEW" in self._session.cookies @property def is_session_authenticated(self) -> bool: return ( "AWSALB" in self._session.cookies and "JSESSIONID" in self._session.cookies ) @property def sxmak_token(self) -> Union[str, None]: try: token = self._session.cookies["SXMAKTOKEN"] return token.split("=", 1)[1].split(",", 1)[0] except (KeyError, IndexError): return None @property def gup_id(self) -> Union[str, None]: try: data = self._session.cookies["SXMDATA"] return json.loads(urllib.parse.unquote(data))["gupId"] except (KeyError, ValueError): return None @property async def channels(self) -> List[XMChannel]: # download channel list if necessary if self._channels is None: channels = await self.get_channels() if len(channels) == 0: return [] self._channels = [] for channel in channels: self._channels.append(XMChannel.from_dict(channel)) self._channels = sorted(self._channels, key=lambda x: int(x.channel_number)) return self._channels @property async def favorite_channels(self) -> List[XMChannel]: if self._favorite_channels is None: self._favorite_channels = [c for c in await self.channels if c.is_favorite] return self._favorite_channels def _extract_configuration(self, data: dict): _config = {} config = data["moduleList"]["modules"][0]["moduleResponse"]["configuration"][ "components" ] for item in config: _config[item["name"]] = item return _config @property async def configuration(self) -> dict: if self._configuration is None: data = await self.get_configuration() if data is None: raise ConfigurationError() self._configuration = self._extract_configuration(data) return self._configuration def _extract_urls(self, urls: dict): _urls = {} for url in urls["settings"][0]["relativeUrls"]: if "url" in url: _urls[url["name"]] = url["url"] return _urls @property async def urls(self) -> Dict[str, str]: if self._urls is None: urls = (await self.configuration)["relativeUrls"] self._urls = self._extract_urls(urls) return self._urls @property def primary(self) -> bool: return self._use_primary async def get_primary_hls_root(self) -> str: urls = await self.urls return urls["Live_Primary_HLS"] async def get_secondary_hls_root(self) -> str: urls = await self.urls return urls["Live_Secondary_HLS"] async def get_hls_root(self) -> str: if self._use_primary: return await self.get_primary_hls_root() return await self.get_secondary_hls_root() def set_primary(self, value: bool): self._use_primary = value self._playlists = {} async def login(self) -> bool: """Attempts to log into SXM with stored username/password""" self._log.debug(f"Logging in as {self.username}...") postdata = self._get_device_info() postdata.update( { "standardAuth": { "username": self.username, "password": self.password, } } ) data = await self._post("modify/authentication", postdata, authenticate=False) if not data: return False try: return data["status"] == 1 and self.is_logged_in except KeyError: self._log.error("Error decoding json response for login") return False @retry(wait=wait_fixed(3), stop=stop_after_attempt(10)) async def authenticate(self) -> bool: """Attempts to create a valid session for use with the client Raises ------ AuthenticationError If login failed and session now needs to be reset """ if not self.is_logged_in and not await self.login(): self._log.error("Unable to authenticate because login failed") await self.close_session() self.reset_session() raise AuthenticationError("Reset session") data = await self._post( "resume?OAtrial=false", self._get_device_info(), authenticate=False ) if not data: return False try: return data["status"] == 1 and self.is_session_authenticated except KeyError: self._log.error("Error parsing json response for authentication") self._log.error(traceback.format_exc()) return False @retry(wait=wait_fixed(3), stop=stop_after_attempt(10)) async def get_configuration(self) -> Optional[Dict[str, Any]]: params = { "result-template": "html5", "app-region": self.region.value, "cacheBuster": str(int(time.time())), } return await self._get("get/configuration", params=params) @retry(stop=stop_after_attempt(25), wait=wait_fixed(1)) async def get_playlist( self, channel_id: str, use_cache: bool = True ) -> Union[str, None]: """Gets playlist of HLS stream URLs for given channel ID Parameters ---------- channel_id : :class:`str` ID of SXM channel to retrieve playlist for use_cache : :class:`bool` Use cached playlists for force new retrival. Defaults to `True` """ url = await self._get_playlist_url(channel_id, use_cache) if url is None: return None response = None try: response = await self._make_request("GET", url, self._token_params()) if response.status_code == 403: self._log.info("Received status code 403 on playlist, renewing session") return await self.get_playlist(channel_id, False) if response.is_error: self._log.warn( f"Received status code {response.status_code} on " f"playlist variant" ) response = None except httpx.RequestError as e: self._log.error(f"Error getting playlist: {e}") if response is None: return None # add base path to segments playlist_entries = [] aac_path = re.findall("AAC_Data.*", url)[0] for line in response.text.split("\n"): line = line.strip() if line.endswith(".aac"): playlist_entries.append(re.sub(r"[^\/]\w+\.m3u8", line, aac_path)) else: playlist_entries.append(line) return "\n".join(playlist_entries) @retry(wait=wait_fixed(1), stop=stop_after_attempt(5)) async def get_segment(self, path: str) -> Union[bytes, None]: """Gets raw HLS segment for given path Parameters ---------- path : :class:`str` SXM path Raises ------ SegmentRetrievalException If segments are starting to come back forbidden and session needs reset """ url = urllib.parse.urljoin(await self.get_hls_root(), path) res = await self._session.get(url, params=self._token_params()) if res.status_code == 403: raise SegmentRetrievalException( "Received status code 403 on segment, renew session" ) if res.is_error: self._log.warn(f"Received status code {res.status_code} on segment") return None return res.content async def get_channels(self) -> List[dict]: """Gets raw list of channel dictionaries from SXM. Each channel dict can be pass into the constructor of :class:`XMChannel` to turn it into an object""" channels: List[Dict[str, str]] = [] postdata = { "consumeRequests": [], "resultTemplate": "responsive", "alerts": [], "profileInfos": [], } if ENABLE_NEW_CHANNELS: data = await self._post( "get?type=2", postdata, channel_list=True, url_format=REST_V4_FORMAT, ) else: data = await self._post("get", postdata, channel_list=True) if not data: self._log.warn("Unable to get channel list") return channels try: channels = data["moduleList"]["modules"][0]["moduleResponse"][ "contentData" ]["channelListing"]["channels"] except (KeyError, IndexError): self._log.error("Error parsing json response for channels") self._log.error(traceback.format_exc()) return [] return channels async def get_channel(self, name: str) -> Union[XMChannel, None]: """Retrieves a specific channel from `self.channels` Parameters ---------- name : :class:`str` name, id, or channel number of SXM channel to get """ name = name.lower() for x in await self.channels: if ( x.name.lower() == name or x.id.lower() == name or x.channel_number == name ): return x return None async def get_now_playing(self, channel: XMChannel) -> Union[Dict[str, Any], None]: """Gets raw dictionary of response data for the live channel. `data['messages'][0]['code']` will have the status response code from SXM `data['moduleList']['modules'][0]['moduleResponse']['liveChannelData']` will have the raw data that can be passed into :class:`XMLiveChannel` constructor to create an object Parameters ---------- channel : :class:`XMChannel` SXM channel to look up live channel data for """ now = time.time() now_dt = datetime.datetime.fromtimestamp(now).replace( tzinfo=datetime.timezone.utc ) params = { "assetGUID": channel.guid, "ccRequestType": "AUDIO_VIDEO", "channelId": channel.id, "hls_output_mode": "custom", "marker_mode": "all_separate_cue_points", "result-template": "web", "time": str(int(round(now * 1000.0))), "timestamp": now_dt.isoformat("T") + "Z", } return await self._get("tune/now-playing-live", params) async def close_session(self): if self._session is not None: await self._session.aclose() self._session = None def reset_session(self) -> None: """Resets session used by client""" self._session_start = time.monotonic() self._session = httpx.AsyncClient() self._session.headers.update({"User-Agent": self._ua["string"]}) self._urls = None self._configuration = None def _token_params(self) -> Dict[str, Union[str, None]]: return { "token": self.sxmak_token, "consumer": "k2", "gupId": self.gup_id, } def _get_device_info(self) -> dict: """Generates a dict of device info to pass to SXM""" browser_version = self._ua["user_agent"]["major"] if self._ua["user_agent"]["minor"] is not None: browser_version = f'{browser_version}.{self._ua["user_agent"]["minor"]}' if self._ua["user_agent"]["patch"] is not None: browser_version = f'{browser_version}.{self._ua["user_agent"]["patch"]}' return { "resultTemplate": "web", "deviceInfo": { "osVersion": self._ua["os"]["family"], "platform": "Web", "sxmAppVersion": SXM_APP_VERSION, "browser": self._ua["user_agent"]["family"], "browserVersion": browser_version, "appRegion": self.region.value, "deviceModel": SXM_DEVICE_MODEL, "clientDeviceId": "null", "player": "html5", "clientDeviceType": "web", }, } async def _make_request( self, method: str, path: str, params: Dict[str, Any], url_format: str = REST_V2_FORMAT, ) -> httpx.Response: if path.startswith("http"): url = path else: url = url_format.format(path) try: if method == "GET": response = await self._session.get(url, params=params) elif method == "POST": response = await self._session.post(url, json=params) else: raise httpx.RequestError("only GET and POST") except httpx.RequestError as e: self._log.error( f"An Exception occurred when trying to perform " f"the {method} request!" ) self._log.error(f"Params: {params}") self._log.error(f"Method: {method}") self._log.error(f"Request: {e.request}") if isinstance(e, httpx.HTTPStatusError): self._log.error(f"Response: {e.response}") # pylint: disable=no-member raise (e) return response async def _request( self, method: str, path: str, params: Dict[str, str], authenticate: bool = True, url_format: str = REST_V2_FORMAT, ) -> Union[Dict[str, Any], None]: """Makes a GET or POST request to SXM servers""" method = method.upper() if authenticate: now = time.monotonic() if (now - self._session_start) > SESSION_MAX_LIFE: self._log.info("Session exceed max time, reseting") await self.close_session() self.reset_session() if not self.is_session_authenticated and not await self.authenticate(): self._log.error("Unable to authenticate") return None response = await self._make_request(method, path, params, url_format=url_format) if response.is_error: self._log.warn( f"Received status code {response.status_code} for " f"path '{path}'" ) self._log.warn(f"Response: {response.text}") return None try: return response.json()["ModuleListResponse"] except (KeyError, ValueError): self._log.error(f"Error decoding json for path '{path}'") return None async def _get( self, path: str, params: Dict[str, str], authenticate: bool = True, url_format: str = REST_V2_FORMAT, ) -> Union[Dict[str, Any], None]: """Makes a GET request to SXM servers""" return await self._request( "GET", path, params, authenticate, url_format=url_format ) async def _post( self, path: str, postdata: dict, channel_list: bool = False, authenticate: bool = True, url_format: str = REST_V2_FORMAT, ) -> Union[Dict[str, Any], None]: """Makes a POST request to SXM servers""" postdata = {"moduleList": {"modules": [{"moduleRequest": postdata}]}} if channel_list: postdata["moduleList"]["modules"][0].update( { "moduleArea": "Discovery", "moduleType": "ChannelListing", "moduleRequest": {"resultTemplate": "responsive"}, } ) return await self._request( "POST", path, postdata, authenticate, url_format=url_format ) async def _get_playlist_url( self, channel_id: str, use_cache: bool = True, max_attempts: int = 5, ) -> Union[str, None]: """Returns HLS live stream URL for a given `XMChannel`""" channel = await self.get_channel(channel_id) if channel is None: self._log.info(f"No channel for {channel_id}") return None now = time.monotonic() if use_cache and channel.id in self._playlists: if ( self.last_renew is None or (now - self.last_renew) > self.update_interval ): del self._playlists[channel.id] else: return self._playlists[channel.id] data = await self.get_now_playing(channel) if data is None: return None # parse response try: message = data["messages"][0]["message"] message_code = data["messages"][0]["code"] except (KeyError, IndexError): self._log.error("Error parsing json response for playlist") self._log.error(traceback.format_exc()) return None # login if session expired if message_code == 201 or message_code == 208: if max_attempts > 0: self._log.info("Session expired, logging in and authenticating") if await self.authenticate(): self._log.info("Successfully authenticated") return await self._get_playlist_url( channel.id, use_cache, max_attempts - 1 ) else: self._log.error("Failed to authenticate") return None else: self._log.warn("Reached max attempts for playlist") return None elif message_code == 204: self._log.warn("Multiple login error received, reseting session") await self.close_session() self.reset_session() if await self.authenticate(): self._log.info("Successfully authenticated") return await self._get_playlist_url( channel.id, use_cache, max_attempts - 1 ) else: self._log.error("Failed to authenticate") return None elif message_code != 100: self._log.warn(f"Received error {message_code} {message}") return None live_channel_raw = data["moduleList"]["modules"][0] live_channel = XMLiveChannel.from_dict(live_channel_raw) live_channel.set_stream_quality(self.stream_quality) live_channel.set_hls_roots( await self.get_primary_hls_root(), await self.get_secondary_hls_root() ) self.update_interval = int(data["moduleList"]["modules"][0]["updateFrequency"]) # get m3u8 url url = live_channel.primary_hls.url if not self._use_primary: url = live_channel.secondary_hls.url playlist = await self._get_playlist_variant_url(url) if playlist is not None: self._playlists[channel.id] = playlist self.last_renew = time.monotonic() if self.update_handler is not None: self.update_handler(live_channel_raw) return self._playlists[channel.id] return None async def _get_playlist_variant_url(self, url: str) -> Union[str, None]: res = await self._session.get(url, params=self._token_params()) if res.is_error: self._log.warn( f"Received status code {res.status_code} on playlist " f"variant retrieval" ) return None for x in res.text.split("\n"): if x.rstrip().endswith(".m3u8"): # first variant should be 256k one return "{}/{}".format(url.rsplit("/", 1)[0], x.rstrip()) return None
[docs]class SXMClient: """Sync wrapper class around SXMClientAsync Parameters ---------- username : :class:`str` SXM username password : :class:`str` SXM password region : :class:`str` ("US" or "CA") Sets your SXM account region user_agent : Optional[:class:`str`] User Agent string to use for making requests to SXM. If `None` is passed, it will attempt to generate one based on real browser usage data. Defaults to `None`. update_handler : Optional[Callable[[:class:`dict`], `None`]] Callback to be called whenever a playlist updates and new Live Channel data is retrieved. Defaults to `None`. Attributes ---------- async_client : :class:`SXMClientAsync` is_logged_in : :class:`bool` Returns if account is logged into SXM's servers is_session_authenticated : :class:`bool` Returns if session is valid and ready to use sxmak_token : :class:`str` Needs documentation gup_id : :class:`str` Needs documentation channels : List[:class:`XMChannel`] Retrieves and returns a full list of all :class:`XMChannel` available to the logged in account favorite_channels : List[:class:`XMChannel`] Retrieves and returns a full list of all :class:`XMChannel` available to the logged in account that are marked as favorite """ def __init__( self, username: str, password: str, region: RegionChoice = RegionChoice.US, quality: QualitySize = QualitySize.LARGE_256k, user_agent: Optional[str] = None, update_handler: Optional[Callable[[dict], None]] = None, ): self.async_client = SXMClientAsync( username=username, password=password, region=region, quality=quality, user_agent=user_agent, update_handler=update_handler, ) @property def last_renew(self) -> Optional[float]: return self.async_client.last_renew @property def password(self) -> str: return self.async_client.password @property def region(self) -> RegionChoice: return self.async_client.region @property def update_handler(self) -> Optional[Callable[[dict], None]]: return self.async_client.update_handler @property def update_interval(self) -> int: return self.async_client.update_interval @property def username(self) -> str: return self.async_client.username @property def stream_quality(self) -> QualitySize: return self.async_client.stream_quality @property def is_logged_in(self) -> bool: return self.async_client.is_logged_in @property def is_session_authenticated(self) -> bool: return self.async_client.is_session_authenticated @property def sxmak_token(self) -> Union[str, None]: return self.async_client.sxmak_token @property def gup_id(self) -> Union[str, None]: return self.async_client.gup_id @property def channels(self) -> List[XMChannel]: # download channel list if necessary if self.async_client._channels is None: channels = self.get_channels() if len(channels) == 0: return [] self.async_client._channels = [] for channel in channels: self.async_client._channels.append(XMChannel.from_dict(channel)) self.async_client._channels = sorted( self.async_client._channels, key=lambda x: int(x.channel_number) ) return self.async_client._channels @property def favorite_channels(self) -> List[XMChannel]: if self.async_client._favorite_channels is None: self.async_client._favorite_channels = [ c for c in self.channels if c.is_favorite ] return self.async_client._favorite_channels @property def configuration(self) -> dict: if self.async_client._configuration is None: data = self.get_configuration() if data is None: raise ConfigurationError() self.async_client._configuration = self.async_client._extract_configuration( data ) return self.async_client._configuration @property def urls(self) -> Dict[str, str]: if self.async_client._urls is None: urls = self.configuration["relativeUrls"] self.async_client._urls = self.async_client._extract_urls(urls) return self.async_client._urls @property def primary(self) -> bool: return self.async_client._use_primary def get_primary_hls_root(self) -> str: return make_sync(self.async_client.get_primary_hls_root)() def get_secondary_hls_root(self) -> str: return make_sync(self.async_client.get_secondary_hls_root)() def get_hls_root(self) -> str: return make_sync(self.async_client.get_hls_root)() def set_primary(self, value: bool): self.async_client.set_primary(value) def login(self) -> bool: return make_sync(self.async_client.login)() def authenticate(self) -> bool: return make_sync(self.async_client.authenticate)() def get_configuration(self) -> Optional[Dict[str, Any]]: return make_sync(self.async_client.get_configuration)() def get_playlist(self, channel_id: str, use_cache: bool = True) -> Union[str, None]: return make_sync(self.async_client.get_playlist)( channel_id=channel_id, use_cache=use_cache ) def get_segment(self, path: str) -> Union[bytes, None]: return make_sync(self.async_client.get_segment)(path=path) def get_channels(self) -> List[dict]: return make_sync(self.async_client.get_channels)() def get_channel(self, name: str) -> Union[XMChannel, None]: return make_sync(self.async_client.get_channel)(name) def get_now_playing(self, channel: XMChannel) -> Union[Dict[str, Any], None]: return make_sync(self.async_client.get_now_playing)(channel) def close_session(self): return make_sync(self.async_client.close_session)() def reset_session(self): return self.async_client.reset_session()