Overview
In this article, I will explain how I downloaded 4,800 files totaling over 3 terabytes of data from commoncrawl, containing over 45 billion URLs. Using Kaggle and self-hosted MinIO to process and store the data, I parsed this data to find all domains and subdomains and then resolved these domains to IP addresses using Google’s and Cloudflare’s DNS over HTTPS services. To maximize hardware capabilities, I used the aiohttp
and multiprocessing
libraries in Python. Ultimately, I discovered over 465,000 Shopify domains. You can get code and the domains from these link below.
Notebook Code: https://www.kaggle.com/code/alighafoori/shopify2024
Download shopify’s domains: https://docs.google.com/spreadsheets/d/1dykrF5EKpQliD4uPNywyYQMzbqMT_iwVOo_bUMbDHB0/edit?usp=sharing
Why I Use Object Storage Instead of a Database
I used Google Colab, Kaggle, and Saturn Cloud for scraping data, and I have a VPS on Hetzner with a substantial amount of free space. By using MinIO for object storage, I was able to streamline the process and reduce the amount of code needed. Due to network latency, it was more efficient to load all data into RAM for processing and then store it in object storage for further use.
Download All URL Index Files
I aimed to process all data from 2022 to 2024, which comprises 14 archives. Each archive of Common Crawl data contains 300 URL index files, and each file includes at least 10 million lines similar to the text below.
net,slideshare,es)/shellyzediyo 20240529233815 {"url": "https://es.slideshare.net/ShellyZediyo", "mime": "text/html", "mime-detected": "text/html", "status": "200", "digest": "PF673LETY7C5454OZXLPRJ6YSUTVN67O", "length": "20654", "offset": "202849866", "filename": "crawl-data/CC-MAIN-2024-22/segments/1715971059412.27/warc/CC-MAIN-20240529230852-20240530020852-00717.warc.gz", "charset": "UTF-8", "languages": "spa"}
Within the `crawl_index_paths` function, identify URLs of all index files present in the archive files. then, leverage multiprocessing to efficiently process these index files in parallel.
def crawl_index_paths(start_urls,bucket_name):
"""
Crawls a list of Common Crawl archives.
Args:
start_urls (list): A list of URLs pointing to Common Crawl archives.
bucket_name (str): The name of the S3 bucket (or MinIO equivalent) containing previously processed files.
"""
#Get a list of previously processed files to skip
lst=get_list_objects(bucket_name)
#Store start time to calculate total processing time
start_time = time.time()
#Determine the number of cores available on the system
num_cores = multiprocessing.cpu_count()
print(f"number cores:{num_cores}")
#List to store URLs of all index files found within archive files
urls=[]
for url in start_urls:
#download index file
response = requests.get(url)
print(f'get index file {url} len {len(response.content)}')
#Open gzip file for reading content line by line
with gzip.GzipFile(fileobj=io.BytesIO(response.content), mode='rb') as gz_file:
for line in gz_file:
l=line.decode('utf-8').strip()
#Add the file to the processing list if it ends with '.gz' and is not present in the skip list
if l.endswith('.gz'):
name=get_object_name(l)
if not name in lst:
urls.append(l)
print(f"count of path files:{len(urls)}")
#Create a pool for parallel processing
pool = multiprocessing.Pool(processes=num_cores)
#distribute the URL list for processing tasks
pool.starmap(proccess_index_file, [(url,bucket_name) for url in urls])
#close the pool and wait for all tasks to finish
pool.close()
pool.join()
#Record the end time and calculate the total execution time
end_time = time.time()
execution_time = end_time - start_time
print(f"Total execution time:", execution_time)
Within the `process_index_file` function:
1. Read the file line by line to extract host names.
2. For each unique host name encountered, add it to the dictionary.
3. Once processing is complete, serialize the dictionary to JSON format.
4. Compress the JSON data and upload the compressed JSON data to object storage.
def proccess_index_file(path,bucket_name):
try:
#Extract the object name from the path
name=get_object_name(path)
print(f"start proccessing {path} {bucket_name}\n")
#Store start time to calculate total processing time
start_time = time.time()
#Define a counter variable for tracking the number of lines processed
a=0
url='https://data.commoncrawl.org/'+path
response = requests.get(url)
print(f"download complete {path}")
#Create a dictionary to efficiently store unique host names encountered
dic=dict()
#Open gzip file for reading content line by line
with gzip.GzipFile(fileobj=io.BytesIO(response.content), mode='rb') as gz_file:
for line in gz_file:
a+=1
try:
# extract host name from text
sp_host=get_before_space(line.decode('utf-8').strip()).split(')')
#Add the host name to the dictionary if it's not already present
if not sp_host[0] in dic:
dic[sp_host[0]]=''
#Print the number of lines processed every 1 million lines for progress tracking
if a % 1000000==0:
print(f'proccess line {a} in {path}')
except Exception as e:
print(a,'error')
print(e)
return
result=dict()
#Split text, reverse the order of elements, and join them back to form the correct host name & add to result dict
for key in dic.keys():
host_array=key.split(',')
host_array.reverse()
host='.'.join(host_array)
result[host]=''
# Write the results dictionary to a JSON file
file_name=f'{name}.txt'
with open(file_name, 'w') as json_file:
json.dump(result, json_file)
# Compress and upload json file to object storage
compress_and_upload_to_minio(file_name,bucket_name,name)
#Record the end time and calculate the total execution time
end_time = time.time()
execution_time = end_time - start_time
print(f"Execution time {path}:", execution_time)
print(f"domains count:{len(result)}")
except Exception as e:
print(f'error in path {path} \n {e}')
Processing 300 files from a single archive typically takes around 1.5 hours on average in Kaggle.
Merging and splitting Dictionaries
now I have thousands of files, each containing hundreds of thousands of unique hostnames. Some hostnames might have appeared in multiple files. My goal was to combine everything into one giant list of unique hostnames. putting everything together took up 17 gigabytes of memory on kaggle, and I found over 140 million unique website hostnames!
def download_and_merge_objects(bucket_name):
# Get a list of processed files containing hostnames
processed_files = get_list_objects(bucket_name)
# Get a dictionary of merged dictionaries
merged_files = download_dict('domains', 'finish.gz')
# Get a dictionary that contains all hostnames
merged_data = download_dict('domains', 'list.gz')
# Define a counter variable for tracking the number of merged dictionaries
merge_counter = 0
print(f'files: {len(processed_files)} finish: {len(merged_files)} domains: {len(merged_data)}')
for key in processed_files:
# If the dictionary is already merged, then skip
if key in merged_files:
print(f'skip {key}')
continue
merge_counter += 1
# Download dictionary
data = download_json_dict(bucket_name, key, False)
# Merge data
for item in data.keys():
if item not in merged_data:
merged_data[item] = data[item]
# Free memory
del data
# Add current dictionary to the list of merged dictionaries
merged_files[key] = ''
print(f'domains count: {len(merged_data):,} counter: {merge_counter}')
# Every 500 loops, upload data to object storage
if merge_counter % 500 == 0:
upload_dict(merged_data, 'domains', 'list.gz')
upload_dict(merged_files, 'domains', 'finish.gz')
# Collect garbage to free memory
gc.collect()
# Final step: upload data
upload_dict(merged_data, 'domains', 'list.gz')
upload_dict(merged_files, 'domains', 'finish.gz')
return merged_data
Now it’s time to split the giant dictionary into chunks, each containing up to 1 million hostnames.
def split_dictionary():
# Download the dictionary containing over 145 million domains
domain_dict = download_dict('domains', 'list.gz')
# Define a counter to track progress
counter = 0
# Define part ID and part dictionary for splitting
part_id = 1
part = {}
for key in domain_dict:
part[key] = ''
counter += 1
# If the counter hits one million records, upload the part dictionary to object storage
if counter % 1000000 == 0:
upload_dict(part, 'domains-parts', f'part-{part_id}.pkl.gz')
part_id += 1
part = {}
# Upload the remaining records
upload_dict(part, 'domains-parts', f'part-{part_id}.pkl.gz')
Resolve hostnames
Now, I harness the power of parallel programming to utilize the full capacity of the hardware. First, I compile a list of dictionary parts and shuffle it. This shuffling is important because, in future runs, some processes might skip the initial items. Without shuffling, this could result in a single processor at the end of the list doing all the work, which I want to avoid.
def start_resolve():
# Get the list of parts
parts_list = get_list_objects('domains-parts')
# Compile the list of parts and then shuffle it
parts_to_process = [('domains-parts', key) for key in parts_list]
random.shuffle(parts_to_process)
# Determine the number of available CPU cores
num_cores = multiprocessing.cpu_count()
# Create a pool of worker processes and assign the list to it
with multiprocessing.Pool(processes=num_cores) as pool:
pool.starmap(resolve_task, parts_to_process)
pool.close()
pool.join()
We utilize aiohttp
for asynchronous HTTP requests. First, if the IP addresses for the domains have already been found, the function will stop further processing. Otherwise, a session is created with a maximum of 200 connections, allowing up to 100 connections per host. This ensures efficient and controlled concurrency. If a domain is resolved, the result is saved. This approach maximizes the use of available connections while preventing overloading any single host
async def resolve_dns(bucket_name, object_name, loop):
# A dictionary to store hostnames with their corresponding IPs
domains = {}
# Check if the part has already been DNS resolved; if so, download it for further checks
if check_file_existence('domain-resolved', object_name):
print(f'Another check resolve {object_name}')
domains = download_dict('domain-resolved', object_name, check_exist=False)
else:
# If not resolved, download the part to resolve DNS
print(f'Download dict from bucket {object_name}')
domains = download_dict(bucket_name, object_name, check_exist=False)
# Capture the start time for performance tracking
start_time = time.time()
# A dictionary to store futures
futures = {}
# HTTP header request
headers = {"accept": "application/dns-json"}
# Counter for total resolved and skipped entries
resolved = 0
skip = 0
# Define TCPConnector for aiohttp session with a maximum of 200 connections and 100 per host
connector = aiohttp.TCPConnector(limit=200, limit_per_host=100, loop=loop)
session = ClientSession(connector=connector)
# A variable for tracking progress
count = 0
# Define semaphore with a maximum capacity of 200 for concurrent requests
semaphore = asyncio.Semaphore(200)
# Regex pattern for extracting IP addresses
ip_pattern = r'^((25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$'
# Fetch function to send DNS over HTTPS request, release semaphore, and store hostname and IP
async def fetch(url, id):
nonlocal resolved
try:
async with session.get(url, headers=headers) as response:
r = await response.json(content_type=None)
semaphore.release()
out = []
if 'Answer' in r:
for ans in r['Answer']:
if re.match(ip_pattern, ans['data']):
out.append(ans['data'])
del futures[id]
domains[id] = out
if len(out) > 0:
resolved += 1
except Exception as e:
print(f"Error fetching DNS for {id}: {e}")
print(f'Domains count: {len(domains)} object_name: {object_name}')
# Variable to alternate between Google and Cloudflare for DNS resolution
use_cloudflare = True
# Iterate over hostnames in the dictionary
for key in domains.keys():
count += 1
# If already has IP, then skip
if domains[key] != '':
skip += 1
continue
# Wait for semaphore availability
await semaphore.acquire()
url = ''
# Set URL based on Cloudflare or Google
if use_cloudflare:
url = f'https://1.1.1.1/dns-query?name={key}'
else:
url = f'https://dns.google.com/resolve?type=1&name={key}'
# Toggle DNS service
use_cloudflare = !use_cloudflare
# Create and store future for further result
future = asyncio.ensure_future(fetch(url, key))
futures[key] = future
# Log every 10000 steps
if count % 10000 == 0:
print(f'Processed {count} domains. Object name: {object_name} Resolved: {resolved} Skipped: {skip}')
try:
# Wait for all futures to complete and then close session
await asyncio.gather(*list(futures.values()))
await session.close()
except Exception as e:
print(f'Error in final step: {e}')
# Upload resolved domains to object storage if more than 100 entries are resolved
if resolved > 100:
upload_dict(domains, 'domain-resolved', object_name)
else:
print(f'Cancel upload due to lack of changes: {object_name}')
# Calculate and log total execution time
end_time = time.time()
execution_time = end_time - start_time
print(f"Total execution time: {execution_time} seconds. Object name: {object_name} Resolved: {resolved} Skipped: {skip}")
return domains
Last Phase: Locating Shopify Domains
In this section, we will examine each domain along with its IP addresses. If the IP address 23.227.38.65
, which is the main IP of Shopify, is found among them, we will add the domain to our results.
def find_shopify_domains():
# Get the list of all dictionaries containing hostnames and their IPs
dictionaries_list = get_list_objects('domain-resolved')
# A counter for tracking progress
counter = 0
# A dictionary to save Shopify domains
shopify_domains = {}
# Download dictionaries and check if Shopify IP is in them, then add the domain to the result
for object_name in dictionaries_list:
domains = download_dict('domain-resolved', object_name, check_exist=False)
for hostname, ips in domains.items():
counter += 1
for ip in ips:
if ip == '23.227.38.65':
shopify_domains[hostname] = ''
# Print progress every 10,000 domains
if counter % 10000 == 0:
print(f'Processed: {counter} domains, Found Shopify domains: {len(shopify_domains)}')
# Print total number of Shopify domains found and upload the result to object storage
print(f'Final count of Shopify domains: {len(shopify_domains)}')
upload_dict(shopify_domains, 'shopify-list', 'list2.pkl.gz')
Appreciation for Your Attention
Thank you for taking the time to read through to the end. If you have any questions or feedback, feel free to reach out me in linkedin.
Why