import asyncio import os import re import time import shutil import sqlite3 from pathlib import Path from collections import defaultdict from dataclasses import dataclass from urllib.parse import parse_qs, urlparse import json import warnings import aiohttp import pandas as pd from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning from tqdm import tqdm # Suppress XML parsing warning (HTML parser works fine for our use case) warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning) CONFIG = { # Firefox profile - will auto-detect if not specified 'firefox_profile_path': None, # Crawling Settings 'delay_per_domain': 0.5, 'max_concurrent': 100, 'max_retries': 3, 'timeout': 5, 'limit_per_host': 3, 'min_visit_count': 1, 'max_content_length': 15000, 'checkpoint_interval': 500, 'batch_size': 1000, 'user_agent': 'BrowserHistoryEnricherCrawler/2.0 (personal archival) github.com/jenkc THANK YOU!', 'exclude_domains': [ 'localhost', '127.0.0.1', ], 'skip_crawl_domains': [ 'google.com', 'www.google.com', 'mail.google.com', 'docs.google.com', 'drive.google.com', 'calendar.google.com', ] } # Output folder and files OUTPUT_DIR = "crawl_output" OUTPUT_FILES = { "pickle": f"{OUTPUT_DIR}/firefox_history.pkl", "csv": f"{OUTPUT_DIR}/firefox_history.csv", "clean_csv": f"{OUTPUT_DIR}/firefox_history_clean.csv", "errors_csv": f"{OUTPUT_DIR}/firefox_history_errors.csv", "checkpoint": f"{OUTPUT_DIR}/firefox_crawl_checkpoint.pkl", } # ============================================================================= # FIREFOX DATABASE FUNCTIONS # ============================================================================= def find_firefox_profile() -> str: """ Auto-detect the default profile directory on macOS. TODO: Add support for Windows and Linux. """ # Check if user provided a custom path (manual override) if CONFIG['firefox_profile_path']: return CONFIG['firefox_profile_path'] # Build the default Firefox profiles path for macOS firefox_dir = Path.home() / 'Library' / 'Application Support' / 'Firefox' / 'Profiles' # Verify the directory exists if not firefox_dir.exists(): raise FileNotFoundError(f"Firefox profiles directory not found: {firefox_dir}") # Look for the default-release profile (most common) # Fallback to any profile if default-release not found for pattern in ['*.default-release', '*']: profile = next(firefox_dir.glob(pattern), None) if profile: return str(profile) raise FileNotFoundError('No Firefox profiles found.') def load_firefox_history(profile_path: str = None) -> pd.DataFrame: """Load Firefox browsing history from the places.sqlite database.""" # Determine the profile path if profile_path is None: profile_path = find_firefox_profile() places_db = os.path.join(profile_path, 'places.sqlite') if not os.path.exists(places_db): raise FileNotFoundError(f'places.sqlite not found in profile: {places_db}') print(f"Reading Firefox history from:\n{places_db}") print() # Copy the database to a temp location to avoid locks (if Firefox is open) temp_db = os.path.join(OUTPUT_DIR, 'places_temp.sqlite') print("[DEBUG] Attempting to copy database...") try: # Use SQLite backup API for safe copy (handles active database better than file copy) print("[DEBUG] Connecting to source database...") source_conn = sqlite3.connect(places_db) print("[DEBUG] Connecting to destination database...") dest_conn = sqlite3.connect(temp_db) print("[DEBUG] Starting backup...") source_conn.backup(dest_conn) print("[DEBUG] Backup complete, closing connections...") source_conn.close() dest_conn.close() print("[DEBUG] Database copy successful") except (sqlite3.OperationalError, PermissionError) as e: print(f" Warning: SQLite backup failed ({e}), trying file copy...") try: shutil.copy2(places_db, temp_db) except PermissionError: raise PermissionError( f"Cannot access Firefox database. Please close Firefox and try again.\n" f"Database path: {places_db}" ) try: # Connect to the copied database print("[DEBUG] Connecting to temp database...") conn = sqlite3.connect(temp_db) print("[DEBUG] Running SQL query...") # Query to extract history data query = """ SELECT p.url, p.title, p.visit_count, p.last_visit_date, p.description, p.frecency, MIN(h.visit_date) as first_visit_date FROM moz_places p LEFT JOIN moz_historyvisits h ON p.id = h.place_id WHERE p.visit_count >= ? AND p.url LIKE 'http%' GROUP BY p.id ORDER BY p.last_visit_date DESC """ # Read results into a DataFrame df = pd.read_sql_query(query, conn, params=(CONFIG["min_visit_count"],)) print(f"[DEBUG] Query complete, got {len(df)} rows") # Close the connection conn.close() print("[DEBUG] Connection closed") # Convert Firefox timestamps (microseconds since epoch) to datetime print("[DEBUG] Converting timestamps...") df["last_visit"] = pd.to_datetime(df["last_visit_date"] / 1_000_000, unit='s', utc=True) df["last_visit"] = df["last_visit"].dt.tz_convert('America/Chicago') # convert to local time df["first_visit"] = pd.to_datetime(df["first_visit_date"] / 1_000_000, unit='s', utc=True) df["first_visit"] = df["first_visit"].dt.tz_convert('America/Chicago') # convert to local time # Drop the raw timestamp columns df = df.drop(columns=["last_visit_date", "first_visit_date"]) # Extract domain for filtering df["domain"] = df["url"].apply(lambda x: urlparse(x).netloc.lower()) print(f" Loaded {len(df)} URLs from Firefox history") # Filter out excluded domains exclude = CONFIG['exclude_domains'] before = len(df) df = df[~df['domain'].isin(exclude)].reset_index(drop=True) excluded = before - len(df) if excluded > 0: print(f" Excluded {excluded} URLs from excluded domains") # Combine duplicates by summing visit counts and keeping latest visit before = len(df) # Define aggregation rules agg_rules = { 'visit_count': 'sum', # Sum up all visits 'last_visit': 'max', # Keep most recent visit 'title': 'first', # Keep first non-null title } # For all other columns, keep the first value for col in df.columns: if col not in agg_rules and col != 'url': agg_rules[col] = 'first' df = df.sort_values('last_visit', ascending=False) # Sort so 'first' gets most recent data df = df.groupby('url', as_index=False).agg(agg_rules) dupes = before - len(df) if dupes > 0: print(f' Combined {dupes:,} duplicate URLs (visit counts summed)') print(f" Final URL count: {len(df)} unique URLS") return df finally: # Clean up temp database file if os.path.exists(temp_db): os.remove(temp_db) def print_history_stats(df: pd.DataFrame) -> None: """Print basic statistics about the browsing history DataFrame.""" total_urls = len(df) total_visits = df['visit_count'].sum() urls_with_titles = df['title'].notna().sum() percent_urls_with_titles = 100*df['title'].notna().mean() urls_with_descriptions = df['description'].notna().sum() percent_urls_with_descriptions = 100*df['description'].notna().mean() print() print("=" * 60) print("Firefox Browsing History Statistics") print("=" * 60) print(f"Total URLs: {total_urls}") print(f"Total Visits: {total_visits}") print(f"URLs with Titles: {urls_with_titles:,} ({percent_urls_with_titles:.2f}%)") print(f"URLs with Descriptions: {urls_with_descriptions:,} ({percent_urls_with_descriptions:.2f}%)") # Date range if not df['last_visit'].empty: print(f"\nDate range:") print(f" First Visit: {df['first_visit'].min()}") print(f" Last Visit: {df['last_visit'].max()}") # Top domains print(f"\nTop 20 Domains by URL Count:") top_domains = df["domain"].value_counts().head(20) for domain, count in top_domains.items(): print(f" {domain}: {count:,}") # Most visited pages print(f"\nTop 20 Most Visited Pages:") top_visited = df.nlargest(20, "visit_count")[["url", "title", "visit_count"]] for _, row in top_visited.iterrows(): title = row["title"][:40] + "..." if row['title'] and len(row['title']) > 40 else row['title'] print(f" [{row['visit_count']:,}x] {title or row['url'][:50]}") print("=" * 60) # ============================================================================= # URL PARSERS - Extract IDs and metadata from URLs without async data fetching # ============================================================================= # Define data structure for parsed social URLs @dataclass class ParsedSocialURL: platform: str content_type: str content_id: str | None author: str | None title_hint: str | None extra: dict def parse_google_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Google URL to extract search queries, image searches, maps, email, docs, etc.''' path = parsed.path query = parse_qs(parsed.query) domain = parsed.netloc.lower() # Gmail: mail.google.com if 'mail.google.com' in domain: return ParsedSocialURL( platform='google', content_type='email', content_id=None, author=None, title_hint=None, extra={} ) # Google Docs/Sheets/Slides/Forms if 'docs.google.com' in domain: doc_types = { '/document/': 'doc', '/spreadsheets/': 'sheet', '/presentation/': 'slides', '/forms/': 'form', } for pattern, doc_type in doc_types.items(): if pattern in path: doc_id = path.split('/d/')[1].split('/')[0] if '/d/' in path else None return ParsedSocialURL( platform='google', content_type=doc_type, content_id=doc_id, author=None, title_hint=None, extra={} ) # Google Drive if 'drive.google.com' in domain: return ParsedSocialURL( platform='google', content_type='drive', content_id=None, author=None, title_hint=None, extra={} ) # Google Calendar if 'calendar.google.com' in domain: return ParsedSocialURL( platform='google', content_type='calendar', content_id=None, author=None, title_hint=None, extra={} ) # Standard search: google.com/search?q=QUERY if '/search' in path and 'q' in query: search_query = query['q'][0] # Detect search type from tbm parameter tbm = query.get('tbm', [None])[0] search_types = { 'isch': 'image_search', 'vid': 'video_search', 'nws': 'news_search', 'shop': 'shopping_search', 'bks': 'books_search', } content_type = search_types.get(tbm, 'search') return ParsedSocialURL( platform='google', content_type=content_type, content_id=None, author=None, title_hint=search_query, extra={'tbm': tbm} if tbm else {} ) # Maps: google.com/maps?q=PLACE or /maps/place/PLACE if '/maps' in path: place = query.get('q', [None])[0] if not place and '/place/' in path: place = path.split('/place/')[1].split('/')[0].replace('+', ' ') return ParsedSocialURL( platform='google', content_type='maps', content_id=None, author=None, title_hint=place, extra={} ) # Fallback return ParsedSocialURL( platform='google', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_youtube_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse YouTube URLs to extract videos and shorts, video IDs, playlist IDs, channel IDs, and search queries.''' # Separate URL into query and path components query = parse_qs(parsed.query) path = parsed.path # Standard watch URL: youtube.com/watch?v=VIDEO_ID if 'v' in query: video_id = query['v'][0] playlist_id = query.get('list', [None])[0] return ParsedSocialURL( platform='youtube', content_type='video', content_id=video_id, author=None, title_hint=None, extra={'playlist_id': playlist_id, 'timestamp': query.get('t', [None])[0]} ) # Shortened URL: youtu.be/VIDEO_ID if parsed.netloc == 'youtu.be': video_id = path.lstrip('/') return ParsedSocialURL( platform='youtube', content_type='video', content_id=video_id, author=None, title_hint=None, extra={} ) # Channel URL: youtube.com/@channel or youtube.com/c/channel or youtube.com/channel/ID if path.startswith('/@'): return ParsedSocialURL( platform='youtube', content_type='channel', content_id=path[2:].split('/')[0], author=path[2:].split('/')[0], title_hint=None, extra={} ) if '/channel/' in path: channel_id = path.split('/channel/')[1].split('/')[0] return ParsedSocialURL( platform='youtube', content_type='channel', content_id=channel_id, author=None, title_hint=None, extra={} ) # Playlist URL: youtube.com/playlist?list=PLAYLIST_ID if 'list' in query and '/playlist' in path: return ParsedSocialURL( platform='youtube', content_type='playlist', content_id=query['list'][0], author=None, title_hint=None, extra={} ) # Search results URL: youtube.com/results?search_query=QUERY if '/results' in path and 'search_query' in query: return ParsedSocialURL( platform='youtube', content_type='search', content_id=None, author=None, title_hint=query['search_query'][0], extra={} ) # YouTube Shorts URL: youtube.com/shorts/VIDEO_ID if '/shorts/' in path: short_id = path.split('/shorts/')[1].split('/')[0] return ParsedSocialURL( platform='youtube', content_type='short', content_id=short_id, author=None, title_hint=None, extra={} ) # Fallback parser for any other YouTube URLs return ParsedSocialURL( platform='youtube', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_netflix_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Netflix URL to extract content ID and type (watch, browse). TODO: add searches ''' path = parsed.path # Watch URL: netflix.com/watch/CONTENT_ID if '/watch/' in path: title_id = path.split('/watch/')[1].split('/')[0] return ParsedSocialURL( platform='netflix', content_type='watch', content_id=title_id, author=None, title_hint=None, extra={} ) # Title page (browse): netflix.com/title/80100172 if '/title/' in path: title_id = path.split('/title/')[1].split('/')[0] return ParsedSocialURL( platform='netflix', content_type='browse', content_id=title_id, author=None, title_hint=None, extra={} ) # Browse with jbv parameter (title being viewed) query = parse_qs(parsed.query) if 'jbv' in query: title_id = query['jbv'][0] return ParsedSocialURL( platform='netflix', content_type='browse', content_id=title_id, author=None, title_hint=None, extra={} ) # Fallback parser for any other Netflix URLs with returned path for context return ParsedSocialURL( platform='netflix', content_type='browse', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_twitter_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Twitter/X URL to extract tweet ID and username or user profile''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='twitter', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) username = parts[0] # Skip non-user paths if username in ['search', 'explore', 'i', 'intent', 'notifications', 'messages', 'settings']: return ParsedSocialURL( platform='twitter', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) # Tweet URL: twitter.com/username/status/TWEET_ID if len(parts) >= 3 and parts[1] == 'status': return ParsedSocialURL( platform='twitter', content_type='tweet', content_id=parts[2], author=username, title_hint=None, extra={} ) # Profile URL: twitter.com/username return ParsedSocialURL( platform='twitter', content_type='profile', content_id=username, author=username, title_hint=None, extra={} ) def parse_reddit_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Reddit URL to extract subreddit, post ID, and username.''' path = parsed.path.strip('/') parts = path.split('/') # Subreddit Home Page: reddit.com/r/subreddit/ if len(parts) >= 2 and parts[0] == 'r': subreddit = parts[1] # Post: reddit.com/r/subreddit/comments/POST_ID/title if len(parts) >= 4 and parts[2] == 'comments': post_id = parts[3] title_hint = parts[4].replace('_', ' ') if len(parts) >= 5 else None return ParsedSocialURL( platform='reddit', content_type='post', content_id=post_id, author=None, title_hint=title_hint, extra={'subreddit': subreddit} ) return ParsedSocialURL( platform='reddit', content_type='subreddit', content_id=subreddit, author=None, title_hint=None, extra={} ) # User Profile: reddit.com/user/username if len(parts) >= 2 and parts[0] in ['user', 'u']: return ParsedSocialURL( platform='reddit', content_type='profile', content_id=parts[1], author=parts[1], title_hint=None, extra={} ) return ParsedSocialURL( platform='reddit', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_github_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse GitHub URL to extract repo, user, and file info.''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='github', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Skip non-user paths if parts[0] in ['explore', 'topics', 'trending', 'search', 'settings', 'notifications', 'login', 'signup']: return ParsedSocialURL( platform='github', content_type=parts[0], content_id=None, author=None, title_hint=None, extra={'path': path} ) username = parts[0] # Repository URL: github.com/username/repo if len(parts) >= 2: repo = parts[1] # Issue/PR URL: github.com/username/repo/issues/NUMBER or /pull/NUMBER if len(parts) >= 4 and parts[2] in ['issues', 'pull']: return ParsedSocialURL( platform='github', content_type=parts[2][:-1] if parts[2] == 'issues' else 'pr', content_id=parts[3], author=username, title_hint=None, extra={'repo': repo} ) # File URL: github.com/username/repo/blob/branch/path/to/file if len(parts) >= 4 and parts[2] == 'blob': return ParsedSocialURL( platform='github', content_type='file', content_id='/'.join(parts[4:]) if len(parts) > 4 else None, author=username, title_hint=None, extra={'repo': repo, 'branch': parts[3]} ) return ParsedSocialURL( platform='github', content_type='repo', content_id=repo, author=username, title_hint=None, extra={} ) # User Profile: github.com/username return ParsedSocialURL( platform='github', content_type='profile', content_id=username, author=username, title_hint=None, extra={} ) def parse_instagram_url(_url: str, parsed: urlparse) -> ParsedSocialURL: path = parsed.path.strip('/') parts = path.split('/') # Instagram Home Page: instagram.com/ if not parts or not parts[0]: return ParsedSocialURL( platform='instagram', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Post URL: instagram.com/p/POST_ID if parts[0] == 'p' and len(parts) >= 2: return ParsedSocialURL( platform='instagram', content_type='post', content_id=parts[1], author=None, title_hint=None, extra={} ) # Reels URL: instagram.com/reel/REEL_ID if parts[0] in ['reel', 'reels'] and len(parts) >= 2: return ParsedSocialURL( platform='instagram', content_type='reel', content_id=parts[1], author=None, title_hint=None, extra={} ) # Stories URL: instagram.com/stories/username/STORY_ID if parts[0] == 'stories' and len(parts) >= 3: return ParsedSocialURL( platform='instagram', content_type='story', content_id=parts[2], author=parts[1], title_hint=None, extra={} ) if parts[0] == 'stories' and len(parts) >= 2: return ParsedSocialURL( platform='instagram', content_type='story', content_id=None, author=parts[1], title_hint=None, extra={} ) # Skip non-user paths if parts[0] in ['explore', 'direct', 'accounts', 'about', 'developer', 'legal', 'privacy', 'terms']: return ParsedSocialURL( platform='instagram', content_type=parts[0], content_id=None, author=None, title_hint=None, extra={'path': path} ) # Profile URL: instagram.com/username return ParsedSocialURL( platform='instagram', content_type='profile', content_id=parts[0], author=parts[0], title_hint=None, extra={} ) def parse_tiktok_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse TikTok URL to extract video ID and username.''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='tiktok', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Video URL: tiktok.com/@username/video/VIDEO_ID if parts[0].startswith('@') and len(parts) >= 3 and parts[1] == 'video': return ParsedSocialURL( platform='tiktok', content_type='video', content_id=parts[2], author=parts[0][1:], title_hint=None, extra={} ) # User Profile: tiktok.com/@username if parts[0].startswith('@'): return ParsedSocialURL( platform='tiktok', content_type='profile', content_id=parts[0][1:], author=parts[0][1:], title_hint=None, extra={} ) # Fallback parser for any other TikTok URLs return ParsedSocialURL( platform='tiktok', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_linkedin_url(_url: str, parsed: urlparse) -> ParsedSocialURL: path = parsed.path.strip('/') parts = path.split('/') # LinkedIn Home Page: linkedin.com/ if not parts or not parts[0]: return ParsedSocialURL( platform='linkedin', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Profile URL: linkedin.com/in/username if parts[0] == 'in' and len(parts) >= 2: return ParsedSocialURL( platform='linkedin', content_type='profile', content_id=parts[1], author=parts[1], title_hint=None, extra={} ) # Company URL: linkedin.com/company/companyname if parts[0] == 'company' and len(parts) >= 2: return ParsedSocialURL( platform='linkedin', content_type='company', content_id=parts[1], author=parts[1], title_hint=parts[1].replace('-', ' '), extra={} ) # Post URL: linkedin.com/feed/update/urn:li:activity:ACTIVITY_ID if parts[0] == 'posts': return ParsedSocialURL( platform='linkedin', content_type='post', content_id=None, author=None, title_hint=None, extra={'path': path} ) # Job URL: linkedin.com/jobs/view/JOB_ID if parts[0] == 'jobs': return ParsedSocialURL( platform='linkedin', content_type='job', content_id=parts[2] if len(parts) > 2 else None, author=None, title_hint=None, extra={'path': path} ) # Fallback parser for any other LinkedIn URLs return ParsedSocialURL( platform='linkedin', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_twitch_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Twitch URL to extract channel, video, and clip info.''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='twitch', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Video URL: twitch.tv/videos/VIDEO_ID if parts[0] == 'videos' and len(parts) >= 2: return ParsedSocialURL( platform='twitch', content_type='video', content_id=parts[1], author=None, title_hint=None, extra={} ) # Clip URL: twitch.tv/CHANNEL/clip/CLIP_ID or clips.twitch.tv/CLIP_ID if len(parts) >= 3 and parts[1] == 'clip': return ParsedSocialURL( platform='twitch', content_type='clip', content_id=parts[2], author=parts[0], title_hint=None, extra={} ) # Skip non-channel paths if parts[0] in ['directory', 'downloads', 'jobs', 'p', 'settings']: return ParsedSocialURL( platform='twitch', content_type=parts[0], content_id=None, author=None, title_hint=None, extra={'path': path} ) # Channel URL: twitch.tv/CHANNEL return ParsedSocialURL( platform='twitch', content_type='channel', content_id=parts[0], author=parts[0], title_hint=None, extra={} ) def parse_vimeo_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Vimeo URL to extract video and user info.''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='vimeo', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Video URL: vimeo.com/VIDEO_ID (numeric) if parts[0].isdigit(): return ParsedSocialURL( platform='vimeo', content_type='video', content_id=parts[0], author=None, title_hint=None, extra={} ) # Channel/User video: vimeo.com/channels/CHANNEL/VIDEO_ID if parts[0] == 'channels' and len(parts) >= 2: return ParsedSocialURL( platform='vimeo', content_type='channel', content_id=parts[2] if len(parts) >= 3 else None, author=parts[1], title_hint=None, extra={'channel': parts[1]} ) # Skip non-user paths if parts[0] in ['watch', 'categories', 'search', 'upload', 'settings', 'features']: return ParsedSocialURL( platform='vimeo', content_type=parts[0], content_id=None, author=None, title_hint=None, extra={'path': path} ) # User profile: vimeo.com/USERNAME return ParsedSocialURL( platform='vimeo', content_type='profile', content_id=parts[0], author=parts[0], title_hint=None, extra={} ) def parse_spotify_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Spotify URL to extract track, album, artist, and playlist info.''' path = parsed.path.strip('/') parts = path.split('/') if not parts or not parts[0]: return ParsedSocialURL( platform='spotify', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Track URL: open.spotify.com/track/TRACK_ID if parts[0] == 'track' and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type='track', content_id=parts[1], author=None, title_hint=None, extra={} ) # Album URL: open.spotify.com/album/ALBUM_ID if parts[0] == 'album' and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type='album', content_id=parts[1], author=None, title_hint=None, extra={} ) # Artist URL: open.spotify.com/artist/ARTIST_ID if parts[0] == 'artist' and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type='artist', content_id=parts[1], author=parts[1], title_hint=None, extra={} ) # Playlist URL: open.spotify.com/playlist/PLAYLIST_ID if parts[0] == 'playlist' and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type='playlist', content_id=parts[1], author=None, title_hint=None, extra={} ) # Episode/Show (podcasts): open.spotify.com/episode/ID or /show/ID if parts[0] in ['episode', 'show'] and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type=parts[0], content_id=parts[1], author=None, title_hint=None, extra={} ) # User profile: open.spotify.com/user/USER_ID if parts[0] == 'user' and len(parts) >= 2: return ParsedSocialURL( platform='spotify', content_type='profile', content_id=parts[1], author=parts[1], title_hint=None, extra={} ) # Fallback return ParsedSocialURL( platform='spotify', content_type='other', content_id=None, author=None, title_hint=None, extra={'path': path} ) def parse_facebook_url(_url: str, parsed: urlparse) -> ParsedSocialURL: '''Parse Facebook URL to extract profile, page, group, and post info.''' path = parsed.path.strip('/') parts = path.split('/') query = parse_qs(parsed.query) if not parts or not parts[0]: return ParsedSocialURL( platform='facebook', content_type='home', content_id=None, author=None, title_hint=None, extra={} ) # Post/Photo/Video with ID in query: facebook.com/photo?fbid=123 if 'fbid' in query: return ParsedSocialURL( platform='facebook', content_type=parts[0] if parts[0] in ['photo', 'video'] else 'post', content_id=query['fbid'][0], author=None, title_hint=None, extra={} ) # Group URL: facebook.com/groups/GROUP_ID if parts[0] == 'groups' and len(parts) >= 2: return ParsedSocialURL( platform='facebook', content_type='group', content_id=parts[1], author=None, title_hint=None, extra={} ) # Watch (video): facebook.com/watch/?v=VIDEO_ID if parts[0] == 'watch' and 'v' in query: return ParsedSocialURL( platform='facebook', content_type='video', content_id=query['v'][0], author=None, title_hint=None, extra={} ) # Events: facebook.com/events/EVENT_ID if parts[0] == 'events' and len(parts) >= 2: return ParsedSocialURL( platform='facebook', content_type='event', content_id=parts[1], author=None, title_hint=None, extra={} ) # Page/Profile post: facebook.com/USERNAME/posts/POST_ID if len(parts) >= 3 and parts[1] == 'posts': return ParsedSocialURL( platform='facebook', content_type='post', content_id=parts[2], author=parts[0], title_hint=None, extra={} ) # Skip non-profile paths if parts[0] in ['marketplace', 'gaming', 'watch', 'search', 'settings', 'help', 'policies']: return ParsedSocialURL( platform='facebook', content_type=parts[0], content_id=None, author=None, title_hint=None, extra={'path': path} ) # Profile/Page URL: facebook.com/USERNAME or facebook.com/profile.php?id=123 if parts[0] == 'profile.php' and 'id' in query: return ParsedSocialURL( platform='facebook', content_type='profile', content_id=query['id'][0], author=None, title_hint=None, extra={} ) # Default: treat as profile/page return ParsedSocialURL( platform='facebook', content_type='profile', content_id=parts[0], author=parts[0], title_hint=None, extra={} ) # General parsing dispatch function def parse_social_url(url: str) -> ParsedSocialURL | None: ''' Parse a social media or video site URL to extract content IDs and metadata. Returns None if not a recognized social/video site. ''' try: # Break URL into components parsed = urlparse(url) domain = parsed.netloc.lower() # Remove www. and m. prefixes for easier matching clean_domain = domain.replace('www.', '').replace('m.', '').replace('old.', '').replace('mobile.', '') # Check domains and call appropriate parser if clean_domain in ['youtube.com', 'youtu.be']: return parse_youtube_url(url, parsed) elif clean_domain == 'netflix.com': return parse_netflix_url(url, parsed) elif clean_domain in ['twitter.com', 'x.com']: return parse_twitter_url(url, parsed) elif clean_domain == 'reddit.com': return parse_reddit_url(url, parsed) elif clean_domain == 'github.com': return parse_github_url(url, parsed) elif clean_domain == 'instagram.com': return parse_instagram_url(url, parsed) elif clean_domain == 'linkedin.com': return parse_linkedin_url(url, parsed) elif clean_domain == 'tiktok.com': return parse_tiktok_url(url, parsed) elif clean_domain == 'twitch.tv': return parse_twitch_url(url, parsed) elif clean_domain == 'vimeo.com': return parse_vimeo_url(url, parsed) elif clean_domain == 'spotify.com': return parse_spotify_url(url, parsed) elif clean_domain == 'facebook.com': return parse_facebook_url(url, parsed) # Fallback for any other recognized domains elif clean_domain in ['medium.com', 'quora.com', 'pinterest.com', 'tumblr.com', 'soundcloud.com']: path = parsed.path.strip('/') parts = path.split('/') if path else [] return ParsedSocialURL( platform=clean_domain.split('.')[0], content_type='page', content_id=parts[0] if parts else None, author=None, title_hint=None, extra={'path': path, 'domain': domain} ) # Return None if domain not recognized return None except Exception as e: print(f"Failed to parse {url}: {e}") return None # ============================================================================= # ASYNC WEB CRAWLER - Unified crawler for both regular and social/video URLs # ============================================================================= class AsyncHistoryCrawler: """Unified crawler that handles all URLs from Firefox history, including social media and video sites.""" def __init__( self, delay_per_domain: float = 1.0, max_concurrent: int = 50, timeout: int = 10, limit_per_host: int = 3, max_content_length: int = 10000, max_retries: int = 3, batch_size: int = 100, user_agent: str = None, ): self.delay_per_domain = delay_per_domain or CONFIG['delay_per_domain'] self.max_concurrent = max_concurrent or CONFIG['max_concurrent'] self.timeout = timeout or CONFIG['timeout'] self.limit_per_host = limit_per_host or CONFIG['limit_per_host'] self.max_content_length = max_content_length or CONFIG['max_content_length'] self.max_retries = max_retries or CONFIG['max_retries'] self.batch_size = batch_size or CONFIG['batch_size'] self.user_agent = user_agent or CONFIG['user_agent'] self.domain_last_request: dict[str, float] = defaultdict(float) self.domain_locks: dict[str, asyncio.Lock] = {} self.stats = { "success": 0, "errors": 0, "retries": 0, "social_urls": 0, "regular_urls": 0, "error_types": defaultdict(int), } async def wait_for_domain(self, domain: str) -> None: """Rate limiting per domain.""" if domain not in self.domain_locks: self.domain_locks[domain] = asyncio.Lock() async with self.domain_locks[domain]: elapsed = time.time() - self.domain_last_request[domain] if elapsed < self.delay_per_domain: await asyncio.sleep(self.delay_per_domain - elapsed) self.domain_last_request[domain] = time.time() async def fetch_url( self, session: aiohttp.ClientSession, url: str, idx: int ) -> tuple[int, dict]: """Fetch URL with retries, rate limiting, and error handling. Extract social metadata if applicable.""" # Get domain for rate limiting domain = urlparse(url).netloc.lower() # Check if this is a social URL and parse (no HTTP needed) parsed_social = parse_social_url(url) is_social = parsed_social is not None # Build base result - always include social fields result = { 'crawled': True, # Mark as processed regardless of success/failure 'is_social': is_social, 'platform': parsed_social.platform if parsed_social else None, 'content_type': parsed_social.content_type if parsed_social else None, 'content_id': parsed_social.content_id if parsed_social else None, 'parsed_author': parsed_social.author if parsed_social else None, 'title_hint': parsed_social.title_hint if parsed_social else None, 'extra': json.dumps(parsed_social.extra) if parsed_social and parsed_social.extra else None, 'og_title': None, 'og_description': None, 'og_image': None, 'page_title': None, 'fetched_author': None, 'duration': None, 'publish_date': None, 'crawled_description': None, 'content': None, 'word_count': 0, } if is_social: self.stats['social_urls'] += 1 else: self.stats['regular_urls'] += 1 # Check if we should skip fetching for specified URLs and return empty result skip_domains = CONFIG['skip_crawl_domains'] if any(skip_domain in domain for skip_domain in skip_domains): result['fetch_error'] = None self.stats['success'] += 1 return (idx, result) # Now fetch the page await self.wait_for_domain(domain) headers = { "User-Agent": self.user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", } # Error tracking and retries last_error = None for _attempt in range(self.max_retries): try: async with session.get( url, headers=headers, timeout=aiohttp.ClientTimeout(total=self.timeout), allow_redirects=True, ssl=False, ) as response: if response.status == 200: html = await response.text(errors='ignore') soup = BeautifulSoup(html, 'lxml') # NOTE: the following extractions must be done in this order to avoid breaking changes # Extract social metadata (OG tags, etc.) social_extracted = self.extract_social_metadata(soup, url) result.update(social_extracted) # Also extract regular content (description, word count, etc.) regular_extracted = self.extract_regular_content(soup) result.update(regular_extracted) result['fetch_error'] = None self.stats['success'] += 1 return (idx, result) # Handle rate limiting (429) and server overload (503) elif response.status in (429, 503): retry_after = response.headers.get('Retry-After') if retry_after: try: wait_time = int(retry_after) except ValueError: wait_time = 5 # Default if header is malformed else: wait_time = 5 # Default if no header wait_time = min(wait_time, 60) # Cap at 60 seconds await asyncio.sleep(wait_time) last_error = f'HTTP {response.status}' # Don't break - continue to next retry attempt else: last_error = f'HTTP {response.status}' except asyncio.TimeoutError: last_error = 'Timeout' except aiohttp.ClientConnectorError as e: last_error = f"Connection Error: {str(e)[:50]}" except aiohttp.ClientResponseError as e: last_error = f"Response Error: {e.status}" except Exception as e: last_error = f"{type(e).__name__}: {str(e)[:50]}" if _attempt < self.max_retries - 1: self.stats['retries'] += 1 # Report final error after retries failed self.stats['errors'] += 1 error_type = last_error.split(':')[0] if last_error else last_error self.stats['error_types'][error_type] += 1 result['fetch_error'] = last_error return (idx, result) # Function to extract content from regular web pages def extract_regular_content(self, soup: BeautifulSoup) -> dict: """Extract content from regular web pages (articles, blogs, etc.).""" # Meta description meta_description = None meta_tag = soup.find('meta', attrs={'name': 'description'}) if meta_tag and meta_tag.get('content'): meta_description = meta_tag['content'].strip() # Remove unwanted elements for element in soup.find_all(['script', 'style', 'nav', 'footer', 'header', 'aside', 'form', 'noscript', 'iframe']): element.decompose() # Find main content content = None for selector in [ soup.find("article"), soup.find("main"), soup.find("div", class_=re.compile(r'content|articl|post|body|entry', re.I)), soup.find("div", id=re.compile(r'content|articl|post|body|entry', re.I)), ]: if selector: content = selector.get_text(separator=' ', strip=True) break if not content and soup.body: content = soup.body.get_text(separator=' ', strip=True) if content: content = re.sub(r'\s+', ' ', content).strip() content = content[:self.max_content_length] return { 'crawled_description': meta_description, 'content': content, 'word_count': len(content.split()) if content else 0, } # Function to extract social metadata from web pages def extract_social_metadata(self, soup: BeautifulSoup, url: str) -> dict: '''Extract Open Graph metadata from social/video pages.''' data = { 'og_title': None, 'og_description': None, 'og_image': None, 'page_title': None, 'fetched_author': None, 'duration': None, 'publish_date': None, } # Page title if soup.title: data['page_title'] = soup.title.string.strip() if soup.title.string else None # Open Graph tags og_mappings = { 'og:title': 'og_title', 'og:description': 'og_description', 'og:image': 'og_image', } for og_prop, key in og_mappings.items(): tag = soup.find('meta', attrs={'property': og_prop}) if tag and tag.get('content'): data[key] = tag['content'].strip() # Twitter card fallback if not data['og_title']: tag = soup.find('meta', attrs={'name': 'twitter:title'}) if tag and tag.get('content'): data['og_title'] = tag['content'].strip() # Author author_tag = soup.find('meta', attrs={'name': 'author'}) if author_tag and author_tag.get('content'): data['fetched_author'] = author_tag['content'] else: author_tag = soup.find(attrs={'itemprop': 'author'}) if author_tag: data['fetched_author'] = author_tag.get('content') or author_tag.get_text(strip=True) # Publish date date_tag = soup.find('meta', attrs={'property': 'article:published_time'}) if date_tag and date_tag.get('content'): data['publish_date'] = date_tag['content'] else: date_tag = soup.find(attrs={'itemprop': 'datePublished'}) if date_tag: data['publish_date'] = date_tag.get('content') or date_tag.get('datetime') return data # Crawl and process a batch of URLs concurrently. async def crawl_batch( self, session: aiohttp.ClientSession, urls_with_indices: list[tuple[int, str]], pbar: tqdm, ) -> list[tuple[int, dict]]: # Create tasks for all URLs in the batch tasks = [ self.fetch_url(session, url, idx) for idx, url in urls_with_indices ] results = [] # Loop thorough tasks/coroutines and gather results for coro in asyncio.as_completed(tasks): idx, result = await coro results.append((idx, result)) pbar.update(1) return results # Main crawl function with checkpointing async def crawl( self, df: pd.DataFrame, url_column: str = "url", checkpoint_file: str = OUTPUT_FILES['checkpoint'], checkpoint_interval: int = CONFIG['checkpoint_interval'], batch_size: int = CONFIG['batch_size'] ) -> pd.DataFrame: """Crawl URLs from DataFrame and return updated DataFrame with crawl results.""" # Initialize all possible columns all_columns = [ 'crawled', 'is_social', 'platform', 'content_type', 'content_id', 'parsed_author', 'title_hint', 'extra', 'crawled_description', 'content', 'word_count', 'og_title', 'og_description', 'og_image', 'page_title', 'fetched_author', 'duration', 'publish_date', 'fetch_error', ] for col in all_columns: if col not in df.columns: df[col] = None if col != 'crawled' else False needs_processing = df[~df['crawled'].fillna(False)] urls_to_process = list(needs_processing.index) total_urls = len(df) already_done = total_urls - len(urls_to_process) print() print("=" * 60) print("ASYNC WEB CRAWLER") print("=" * 60) print(f"Total URLs: {total_urls:,}") print(f"Already completed: {already_done:,}") print(f"To process: {len(urls_to_process):,}") print(f"Max concurrent: {self.max_concurrent}") print(f"Rate limit: {self.delay_per_domain}s per domain") print(f"Max retries: {self.max_retries}") print("=" * 60) print() if not urls_to_process: print("All URLs have already been processed!") return df # Create connector with limited concurrency connector = aiohttp.TCPConnector( limit=self.max_concurrent, limit_per_host=self.limit_per_host, force_close=True, ) headers = { "User-Agent": self.user_agent, "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "en-US,en;q=0.5", "Accept-Encoding": "gzip, deflate", "Connection": "keep-alive", } # Create context manager session to reuse connections async with aiohttp.ClientSession(connector=connector, headers=headers) as session: pbar = tqdm(total=len(urls_to_process), desc="Crawling URLs", unit="url", ncols=100, mininterval=0.5) processed = 0 last_checkpoint = 0 for i in range(0, len(urls_to_process), batch_size): batch_indices = urls_to_process[i:i+batch_size] urls_with_indices = [(idx, df.at[idx, url_column]) for idx in batch_indices] results = await self.crawl_batch(session, urls_with_indices, pbar) for idx, data in results: for key, value in data.items(): df.at[idx, key] = value # Update progress bar description with error count pbar.set_description(f"Crawling URLs (errors: {self.stats['errors']:,})") processed += len(results) if processed - last_checkpoint >= checkpoint_interval: df.to_pickle(checkpoint_file) timestamp = time.strftime("%H:%M:%S") pbar.write(f"[{timestamp}] Checkpoint saved: {processed:,} URLs/{len(urls_to_process):,} ({100*processed/len(urls_to_process):.1f}% complete)") last_checkpoint = processed pbar.close() # Print statistics print() print("=" * 60) print("Crawling complete!") print() print("Statistics:") total_processed = self.stats['success'] + self.stats['errors'] print(f" Total URLs processed: {total_processed:,}") print(f" Successful: {self.stats['success']:,}") print(f" Errors: {self.stats['errors']:,}") print(f" Retries used: {self.stats['retries']:,}") if self.stats['error_types']: print() print("Top error types:") sorted_errors = sorted(self.stats['error_types'].items(), key=lambda x: x[1], reverse=True) for error_type, count in sorted_errors[:5]: print(f" {error_type}: {count:,}") successful = df[df['fetch_error'].isna()] if len(successful) > 0 and successful['word_count'].notna().any(): avg_words = successful['word_count'].mean() median_words = successful['word_count'].median() print() print("Content statistics (successful crawls):") print(f" Average word count: {avg_words:.0f} words") print(f" Median word count: {median_words:.0f} words") print("=" * 60) return df # ============================================================================= # MAIN EXECUTION # ============================================================================= async def main(): """Main function to orchestrate the Firefox history crawling process.""" # Create output directory os.makedirs(OUTPUT_DIR, exist_ok=True) print("=" * 60) print("FIREFOX HISTORY EXTRACTION PIPELINE") print("=" * 60) print(f"Output folder: {OUTPUT_DIR}/") print() # Load history from Firefox database print("Step 1: Loading Firefox history...") df = load_firefox_history() # Use CONFIG defaults crawler = AsyncHistoryCrawler() # Check for existing checkpoint checkpoint_file = OUTPUT_FILES['checkpoint'] try: df_checkpoint = pd.read_pickle(checkpoint_file) checkpoint_urls = set(df_checkpoint['url']) current_urls = set(df['url']) if checkpoint_urls == current_urls: # Perfect match - resume from checkpoint df = df_checkpoint completed = df['crawled'].fillna(False).sum() print(f"Resuming from checkpoint: {completed:,} URLs already crawled") else: # URL mismatch - warn user added = len(current_urls - checkpoint_urls) removed = len(checkpoint_urls - current_urls) print(f" Warning: Checkpoint URL mismatch (added: {added}, removed: {removed})") print(f" Starting fresh crawl. Delete checkpoint to suppress this warning.") print(f" Checkpoint: {checkpoint_file}") except FileNotFoundError: pass # Crawl and enrich data df = await crawler.crawl( df, url_column="url", checkpoint_file=checkpoint_file, checkpoint_interval=CONFIG['checkpoint_interval'], batch_size=CONFIG['batch_size'] ) # Create final title/description columns # Create final merged columns df['final_title'] = ( df['title'] .fillna(df['og_title']) .fillna(df['page_title']) .fillna(df['title_hint']) ) df['final_author'] = df['parsed_author'].fillna(df['fetched_author']) # Print statistics print_history_stats(df) # Save final output to all formats df.to_pickle(OUTPUT_FILES['pickle']) print(f" Saved pickle: {OUTPUT_FILES['pickle']} (all data, for fast loading)") df.to_csv(OUTPUT_FILES['csv'], index=False) print(f" Saved CSV: {OUTPUT_FILES['csv']} (all data, human-readable)") # Clean CSV (removed error rows) df_clean = df[df['fetch_error'].isna()] df_clean.to_csv(OUTPUT_FILES['clean_csv'], index=False) print(f" Saved cleaned CSV: {OUTPUT_FILES['clean_csv']} ({len(df_clean)} successful pages)") # Errors-only CSV df_errors = df[df["fetch_error"].notna()] if len(df_errors) > 0: df_errors.to_csv(OUTPUT_FILES['errors_csv'], index=False) print(f" Saved {OUTPUT_FILES['errors_csv']} ({len(df_errors):,} failed URLs)") # Clean up checkpoint try: os.remove(checkpoint_file) print("Removed checkpoint file (crawl complete)") except FileNotFoundError: pass print() print("=" * 60) print("PIPELINE COMPLETE") print("=" * 60) # Preview results print() print("Preview of results:") preview_cols = ["title", "visit_count", "word_count"] print(df[preview_cols].head(10).to_string()) if __name__ == "__main__": asyncio.run(main())