1+ #!/usr/bin/env python3
2+ """
3+ find_broken_urls.py
4+
5+ Reads an old sitemap, maps each URL to the new host, checks availability,
6+ and writes ONLY broken results (404 or request errors) to CSV.
7+
8+ Usage:
9+ python3 find_broken_urls.py --old old-sitemap.xml --old-base https://old.example \
10+ --new-base https://new.example --output broken.csv --workers 20
11+
12+ Options:
13+ --old path or URL to old sitemap (required)
14+ --old-base old site base to join path-only locs (optional but recommended)
15+ --new-base new site base to check against (required)
16+ --output output CSV file (default: broken.csv)
17+ --workers concurrency (default: 20)
18+ --timeout request timeout seconds (default: 20)
19+ --insecure do not verify TLS certs (useful for staging)
20+ --header repeatable header(s) to add to requests, e.g. --header "Authorization: Bearer TOKEN"
21+ --include-5xx include 5xx server errors in the output (off by default)
22+ """
23+ from __future__ import annotations
24+ import csv
25+ import gzip
26+ import sys
27+ import time
28+ import xml .etree .ElementTree as ET
29+ from concurrent .futures import ThreadPoolExecutor , as_completed
30+ from urllib .parse import urlparse , urljoin , urldefrag , urlunparse , ParseResult
31+
32+ import requests
33+ import argparse
34+
35+ def read_bytes (path_or_url : str ) -> bytes :
36+ if path_or_url .startswith ("http://" ) or path_or_url .startswith ("https://" ):
37+ r = requests .get (path_or_url , timeout = 20 )
38+ r .raise_for_status ()
39+ return r .content
40+ with open (path_or_url , "rb" ) as f :
41+ return f .read ()
42+
43+ def parse_xml_bytes (b : bytes ) -> ET .Element :
44+ try :
45+ if b [:2 ] == b"\x1f \x8b " :
46+ b = gzip .decompress (b )
47+ except Exception :
48+ pass
49+ return ET .fromstring (b )
50+
51+ def extract_locs (path_or_url : str ) -> list [str ]:
52+ try :
53+ b = read_bytes (path_or_url )
54+ except Exception as e :
55+ print (f"Error reading { path_or_url } : { e } " , file = sys .stderr )
56+ return []
57+ try :
58+ root = parse_xml_bytes (b )
59+ except Exception as e :
60+ print (f"Error parsing { path_or_url } : { e } " , file = sys .stderr )
61+ return []
62+ locs = []
63+ for elem in root .iter ():
64+ tag = getattr (elem , "tag" , "" )
65+ if isinstance (tag , str ) and tag .lower ().endswith ("loc" ) and elem .text :
66+ locs .append (elem .text .strip ())
67+ # try to fetch referenced child sitemaps if present
68+ candidates = []
69+ for elem in root .iter ():
70+ tag = getattr (elem , "tag" , "" )
71+ if isinstance (tag , str ) and tag .lower ().endswith ("loc" ) and elem .text :
72+ t = elem .text .strip ()
73+ if t .endswith (".xml" ) or t .endswith (".xml.gz" ) or "sitemap" in t .lower ():
74+ candidates .append (t )
75+ for child in candidates :
76+ try :
77+ cb = read_bytes (child )
78+ croot = parse_xml_bytes (cb )
79+ for elem in croot .iter ():
80+ tag = getattr (elem , "tag" , "" )
81+ if isinstance (tag , str ) and tag .lower ().endswith ("loc" ) and elem .text :
82+ locs .append (elem .text .strip ())
83+ except Exception :
84+ continue
85+ # dedupe preserving order
86+ seen = set ()
87+ out = []
88+ for u in locs :
89+ if u not in seen :
90+ seen .add (u )
91+ out .append (u )
92+ return out
93+
94+ def make_absolute (u : str , base : str | None ) -> str :
95+ p = urlparse (u )
96+ if p .netloc :
97+ return u
98+ if base :
99+ base_pref = base if base .endswith ("/" ) else base + "/"
100+ return urljoin (base_pref , u .lstrip ("/" ))
101+ return u
102+
103+ def map_to_new (u_abs : str , old_base : str | None , new_base : str | None ) -> str :
104+ pu = urlparse (u_abs )
105+ if not new_base :
106+ return u_abs
107+ pnew = urlparse (new_base )
108+ if old_base :
109+ pold = urlparse (old_base )
110+ if pu .netloc == pold .netloc or pu .netloc .lstrip ("www." ) == pold .netloc .lstrip ("www." ):
111+ return urlunparse (ParseResult (pnew .scheme or pu .scheme , pnew .netloc , pu .path , pu .params , pu .query , "" ))
112+ if not pu .netloc :
113+ base_pref = new_base if new_base .endswith ("/" ) else new_base + "/"
114+ return urljoin (base_pref , u_abs .lstrip ("/" ))
115+ return urlunparse (ParseResult (pnew .scheme or pu .scheme , pnew .netloc , pu .path , pu .params , pu .query , "" ))
116+
117+ def http_check (url : str , session : requests .Session , timeout : int , verify : bool , delay_ms : int ):
118+ # slight per-request delay (helpful for servers)
119+ time .sleep (delay_ms / 1000.0 )
120+ out = {"status" : "" , "final_url" : "" , "redirect_chain" : "" , "elapsed_ms" : "" , "error" : "" }
121+ try :
122+ start = time .time ()
123+ resp = session .get (url , timeout = timeout , allow_redirects = True , verify = verify )
124+ elapsed = (time .time () - start ) * 1000.0
125+ out ["status" ] = str (resp .status_code )
126+ out ["final_url" ] = resp .url
127+ chain = [r .url for r in resp .history ] + [resp .url ]
128+ out ["redirect_chain" ] = " -> " .join (chain )
129+ out ["elapsed_ms" ] = str (int (elapsed ))
130+ except Exception as e :
131+ out ["status" ] = "error"
132+ out ["error" ] = repr (e )
133+ return out
134+
135+ def main ():
136+ ap = argparse .ArgumentParser ()
137+ ap .add_argument ("--old" , required = True , help = "path or URL to old sitemap" )
138+ ap .add_argument ("--new-base" , required = True , help = "new site base to check (e.g. https://new.example)" )
139+ ap .add_argument ("--old-base" , help = "old site base to join path-only locs (e.g. https://old.example)" )
140+ ap .add_argument ("--output" , default = "broken.csv" , help = "output CSV with only broken rows" )
141+ ap .add_argument ("--workers" , type = int , default = 20 )
142+ ap .add_argument ("--delay-ms" , type = int , default = 50 )
143+ ap .add_argument ("--timeout" , type = int , default = 20 )
144+ ap .add_argument ("--insecure" , action = "store_true" , help = "do not verify TLS certs" )
145+ ap .add_argument ("--header" , action = "append" , default = [], help = 'Header, e.g. --header "Authorization: Bearer TOKEN"' )
146+ ap .add_argument ("--include-5xx" , action = "store_true" , help = "also include 5xx server errors in output" )
147+ args = ap .parse_args ()
148+
149+ locs = extract_locs (args .old )
150+ if not locs :
151+ print ("No locs found in old sitemap; exiting." , file = sys .stderr )
152+ sys .exit (2 )
153+ print (f"Found { len (locs )} locs in old sitemap" )
154+
155+ # build distinct requested URLs from locs
156+ triples = []
157+ seen = set ()
158+ for raw in locs :
159+ old_abs = make_absolute (raw , args .old_base )
160+ old_abs = urldefrag (old_abs )[0 ]
161+ requested = map_to_new (old_abs , args .old_base , args .new_base )
162+ if requested in seen :
163+ continue
164+ seen .add (requested )
165+ triples .append ((raw , old_abs , requested ))
166+
167+ print (f"Checking { len (triples )} distinct URLs on { args .new_base } with { args .workers } workers" )
168+
169+ session = requests .Session ()
170+ for h in args .header :
171+ if ":" in h :
172+ k , v = h .split (":" , 1 )
173+ session .headers [k .strip ()] = v .strip ()
174+
175+ results = []
176+ with ThreadPoolExecutor (max_workers = args .workers ) as ex :
177+ futures = {ex .submit (http_check , req , session , args .timeout , not args .insecure , args .delay_ms ): (raw , old_abs , req ) for raw , old_abs , req in triples }
178+ for fut in as_completed (futures ):
179+ raw , old_abs , req = futures [fut ]
180+ res = fut .result ()
181+ status = res .get ("status" , "" )
182+ is_404 = status .startswith ("4" )
183+ is_5xx = status .startswith ("5" )
184+ is_error = status == "error"
185+ include = is_error or (is_404 ) or (args .include_5xx and is_5xx )
186+ if include :
187+ row = {
188+ "old_raw" : raw ,
189+ "old_abs" : old_abs ,
190+ "requested_url" : req ,
191+ "status" : status ,
192+ "final_url" : res .get ("final_url" , "" ),
193+ "redirect_chain" : res .get ("redirect_chain" , "" ),
194+ "elapsed_ms" : res .get ("elapsed_ms" , "" ),
195+ "error" : res .get ("error" , "" ),
196+ }
197+ results .append (row )
198+
199+ # write only broken rows
200+ fieldnames = ["old_raw" ,"old_abs" ,"requested_url" ,"status" ,"final_url" ,"redirect_chain" ,"elapsed_ms" ,"error" ]
201+ with open (args .output , "w" , newline = "" , encoding = "utf-8" ) as f :
202+ w = csv .DictWriter (f , fieldnames = fieldnames )
203+ w .writeheader ()
204+ for r in results :
205+ w .writerow (r )
206+
207+ counts = {}
208+ for r in results :
209+ counts [r ["status" ]] = counts .get (r ["status" ], 0 ) + 1
210+ print ("Broken counts:" )
211+ for k in sorted (counts .keys ()):
212+ print (f" { k } : { counts [k ]} " )
213+ print (f"Wrote { len (results )} broken rows to { args .output } " )
214+ # exit non-zero if any broken found (useful for CI)
215+ sys .exit (1 if len (results ) else 0 )
216+
217+ if __name__ == "__main__" :
218+ main ()
0 commit comments