Cookbook
Memory efficient file processing
If you have a large file, you can process it line by line without loading the whole file into memory.
import asyncio
import random
import string
from pathlib import Path
from tqdm import tqdm
from grugstream import Observable
async def fake_google_search(query: str) -> list[dict[str, str]]:
"""
Fake Google search that returns a list of results.
"""
await asyncio.sleep(1)
return [{"result": f"Result {i} for {query}", "query": query} for i in range(10)]
def generate_random_string():
# Get all the ASCII letters in lowercase and uppercase
letters = string.ascii_letters
# Randomly get a sized 10 string from the letters
random_string = ''.join(random.choice(letters) for _ in range(10))
return random_string
def create_big_file(file_name: str):
with open(file_name, "w") as f:
for i in range(1000000):
to_write = generate_random_string()
f.write(f"{to_write}\n")
async def main():
file_name = "big_file.txt"
# Dump a big file, just for example purposes
create_big_file(file_name)
observable = (
# Read the file line by line
Observable.from_file(Path(file_name))
# Search google in parallel
.map_async_par(fake_google_search, max_par=10)
# Make a tqdm bar
.tqdm(tqdm_bar=tqdm(desc="Searching Google", unit_scale=1))
# Since we get a list of results, we want to flatten it
.flatten_list()
# We want to stop after 1000 results
.take(1000)
)
# Output the results to a file line by line
await observable.to_file_appending(Path("results.txt"))
if __name__ == "__main__":
asyncio.run(main())
Web crawler - Adding items back to the start
Some streams are more complicated that others. Usually a stream is like
But sometimes you need to do more complicated things where you want to add the processed items back to the start of the stream.
A web crawler is a good example of this. You can easily write a concurrent web crawler with grugstream. Here's an example of crawling from one website recursively for 1000 links:
from pathlib import Path
from typing import List, Optional
import asyncio
from aiohttp import ClientSession
from anyio import create_memory_object_stream, create_task_group
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream
from bs4 import BeautifulSoup
from grugstream import Observable
headers = {"User-Agent": "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124"}
async def fetch_page_content(url: str) -> Optional[str]:
"""
Fetches the content of the specified URL using aiohttp.
"""
async with ClientSession() as session:
async with session.get(url, headers=headers) as response:
try:
text = await response.text()
return text
# Sometimes we get a UnicodeDecodeError, let's just ignore it
except UnicodeDecodeError:
return None
def extract_hyperlinks_from_content(content: str) -> List[str]:
"""
Extracts all hyperlinks from the given content using BeautifulSoup.
"""
soup = BeautifulSoup(content, 'html.parser')
links = [a['href'] for a in soup.find_all('a', href=True)]
return links
async def get_all_hyperlinks(url: str) -> List[str]:
"""
Fetches content of the specified URL and extracts all hyperlinks.
"""
content = await fetch_page_content(url)
return extract_hyperlinks_from_content(content) if content else []
def is_valid_url(url: str) -> bool:
return url.startswith("http")
async def main():
url_to_crawl = "https://webscraper.io/test-sites/e-commerce/allinone"
async def run_crawler(
receive_stream: MemoryObjectReceiveStream[str], send_stream: MemoryObjectSendStream[str]
) -> None:
already_seen = set()
pipeline: Observable[str] = (
Observable
# Create an Observable from the receive_stream
.from_receive_stream(receive_stream)
.print(prefix="Starting to crawl: ")
.map_async_par(get_all_hyperlinks)
# flatten the list of lists into a single list
.flatten_list()
# only keep valid urls
.filter(is_valid_url)
# only carry on links we haven't seen before
.filter(lambda link: link not in already_seen)
# track it so we don't crawl it again
.for_each(lambda link: already_seen.add(link))
.print(prefix="Sending new link to crawl ")
# send it back to the send_stream for processing
.for_each_to_stream(send_stream)
# We only want to crawl 1000 links
.take(1000)
# output it to a file to save
.for_each_to_file_appending(file_path=Path("results.txt"))
)
await pipeline.run_to_completion()
send_stream, receive_stream = create_memory_object_stream[str](max_buffer_size=100)
async with create_task_group() as tg:
tg.start_soon(run_crawler, receive_stream, send_stream)
# don't close the send_stream, we want to keep sending items to it
await send_stream.send(url_to_crawl)
if __name__ == "__main__":
asyncio.run(main())