import time
import logging
from enum import Enum, auto
logger = logging.getLogger(“analysis”)
logger.setLevel(logging.INFO)
class Source(Enum):
CONTINUE_WATCHING = auto()
EPISODE_URL = auto()
EPISODE_DETAIL = auto()
SEASON_LIST = auto()
Config
RETRY_ATTEMPTS = 2
RETRY_BACKOFF_SECONDS = 0.5
def get_current_tab_url(edge_all_open_tabs):
for t in edge_all_open_tabs:
try:
if t.get(“isCurrent”):
return t.get(“pageUrl”)
except Exception:
continue
return None
— Stubs: bind these to your implementations —
def fetch_continue_watching(user_id, series_id):
""“Return CW object or None.”""
return None
def is_valid_episode_url(url):
if not url:
return False
url = url.lower()
return (“/episode/” in url) or (“episodeid=” in url) or (“/watch/” in url and “episode” in url)
def fetch_episode_detail_from_url(url):
""“Return episode metadata dict or None.”""
return None
def fetch_episode_detail_from_season(series_id, season_id):
""“Return best-match episode metadata or None.”""
return None
def invalidate_series_cache(series_id):
logger.info(f"Cache invalidated for series {series_id}")
def log_telemetry(user_id, series_id, chosen_source, details=None):
logger.info(f"Telemetry user={user_id} series={series_id} source={chosen_source.name} details={details}")
— Core selection logic —
def select_analysis_source(user_id, series_id, season_id=None, candidate_episode_url=None, edge_tabs=None, allow_retry=True):
if not candidate_episode_url and edge_tabs:
candidate_episode_url = get_current_tab_url(edge_tabs)
attempt = 0
while True:
try:
cw = fetch_continue_watching(user_id, series_id)
if cw:
episode = cw.get("episode") or cw
log_telemetry(user_id, series_id, Source.CONTINUE_WATCHING, {"cw": True})
return Source.CONTINUE_WATCHING, episode
if candidate_episode_url and is_valid_episode_url(candidate_episode_url):
ep = fetch_episode_detail_from_url(candidate_episode_url)
if ep:
log_telemetry(user_id, series_id, Source.EPISODE_URL, {"url": candidate_episode_url})
return Source.EPISODE_URL, ep
if season_id:
ep = fetch_episode_detail_from_season(series_id, season_id)
if ep:
log_telemetry(user_id, series_id, Source.EPISODE_DETAIL, {"season": season_id})
return Source.EPISODE_DETAIL, ep
log_telemetry(user_id, series_id, Source.SEASON_LIST, {"note": "fallback to season list"})
return Source.SEASON_LIST, None
except Exception:
logger.exception("Error selecting analysis source")
if not allow_retry or attempt >= RETRY_ATTEMPTS:
raise
backoff = RETRY_BACKOFF_SECONDS * (2 ** attempt)
time.sleep(backoff)
attempt += 1
def on_continue_watching_changed(user_id, series_id):
invalidate_series_cache(series_id)
log_telemetry(user_id, series_id, Source.CONTINUE_WATCHING, {“event”: “cw_changed”})
def prepare_and_run_analysis(user_id, series_id, season_id=None, candidate_episode_url=None, edge_tabs=None):
source, episode = select_analysis_source(user_id, series_id, season_id, candidate_episode_url, edge_tabs)
if source in (Source.CONTINUE_WATCHING, Source.EPISODE_URL, Source.EPISODE_DETAIL):
logger.info(f"Starting single-episode analysis using source {source.name}")
start_single_episode_analysis(episode)
else:
logger.info(“Starting season/series analysis (fallback)”)
start_season_analysis(series_id)
log_telemetry(user_id, series_id, source, {“episode_id”: episode.get(“id”) if isinstance(episode, dict) else None})