Skip to content

Cookbook

Memory efficient file processing

If you have a large file, you can process it line by line without loading the whole file into memory.

import asyncio
import random
import string
from pathlib import Path

from tqdm import tqdm

from grugstream import Observable


async def fake_google_search(query: str) -> list[dict[str, str]]:
    """
    Fake Google search that returns a list of results.
    """
    await asyncio.sleep(1)
    return [{"result": f"Result {i} for {query}", "query": query} for i in range(10)]


def generate_random_string():
    # Get all the ASCII letters in lowercase and uppercase
    letters = string.ascii_letters
    # Randomly get a sized 10 string from the letters
    random_string = ''.join(random.choice(letters) for _ in range(10))
    return random_string


def create_big_file(file_name: str):
    with open(file_name, "w") as f:
        for i in range(1000000):
            to_write = generate_random_string()
            f.write(f"{to_write}\n")


async def main():
    file_name = "big_file.txt"
    # Dump a big file, just for example purposes
    create_big_file(file_name)

    observable = (
        # Read the file line by line
        Observable.from_file(Path(file_name))
        # Search google in parallel
        .map_async_par(fake_google_search, max_par=10)
        # Make a tqdm bar
        .tqdm(tqdm_bar=tqdm(desc="Searching Google", unit_scale=1))
        # Since we get a list of results, we want to flatten it
        .flatten_list()
        # We want to stop after 1000 results
        .take(1000)
    )
    # Output the results to a file line by line
    await observable.to_file_appending(Path("results.txt"))


if __name__ == "__main__":
    asyncio.run(main())

Web crawler - Adding items back to the start

Some streams are more complicated that others. Usually a stream is like

start -> apply some functions -> end

But sometimes you need to do more complicated things where you want to add the processed items back to the start of the stream.

start -> apply some functions -> add more things to the start -> repeat

A web crawler is a good example of this. You can easily write a concurrent web crawler with grugstream. Here's an example of crawling from one website recursively for 1000 links:

from pathlib import Path
from typing import List, Optional
import asyncio

from aiohttp import ClientSession
from anyio import create_memory_object_stream, create_task_group
from anyio.streams.memory import MemoryObjectSendStream, MemoryObjectReceiveStream
from bs4 import BeautifulSoup

from grugstream import Observable

headers = {"User-Agent": "AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124"}


async def fetch_page_content(url: str) -> Optional[str]:
    """
    Fetches the content of the specified URL using aiohttp.
    """
    async with ClientSession() as session:
        async with session.get(url, headers=headers) as response:
            try:
                text = await response.text()
                return text
            # Sometimes we get a UnicodeDecodeError, let's just ignore it
            except UnicodeDecodeError:
                return None


def extract_hyperlinks_from_content(content: str) -> List[str]:
    """
    Extracts all hyperlinks from the given content using BeautifulSoup.
    """
    soup = BeautifulSoup(content, 'html.parser')
    links = [a['href'] for a in soup.find_all('a', href=True)]
    return links


async def get_all_hyperlinks(url: str) -> List[str]:
    """
    Fetches content of the specified URL and extracts all hyperlinks.
    """
    content = await fetch_page_content(url)
    return extract_hyperlinks_from_content(content) if content else []


def is_valid_url(url: str) -> bool:
    return url.startswith("http")


async def main():
    url_to_crawl = "https://webscraper.io/test-sites/e-commerce/allinone"

    async def run_crawler(
        receive_stream: MemoryObjectReceiveStream[str], send_stream: MemoryObjectSendStream[str]
    ) -> None:
        already_seen = set()
        pipeline: Observable[str] = (
            Observable
            # Create an Observable from the receive_stream
            .from_receive_stream(receive_stream)
            .print(prefix="Starting to crawl: ")
            .map_async_par(get_all_hyperlinks)
            # flatten the list of lists into a single list
            .flatten_list()
            # only keep valid urls
            .filter(is_valid_url)
            # only carry on links we haven't seen before
            .filter(lambda link: link not in already_seen)
            # track it so we don't crawl it again
            .for_each(lambda link: already_seen.add(link))
            .print(prefix="Sending new link to crawl ")
            # send it back to the send_stream for processing
            .for_each_to_stream(send_stream)
            # We only want to crawl 1000 links
            .take(1000)
            # output it to a file to save
            .for_each_to_file_appending(file_path=Path("results.txt"))
        )
        await pipeline.run_to_completion()

    send_stream, receive_stream = create_memory_object_stream[str](max_buffer_size=100)
    async with create_task_group() as tg:
        tg.start_soon(run_crawler, receive_stream, send_stream)
        # don't close the send_stream, we want to keep sending items to it
        await send_stream.send(url_to_crawl)


if __name__ == "__main__":
    asyncio.run(main())