This post was originally published on the YipitData Engineering Blog.


Photo by Cátia Matos via https://www.pexels.com/photo/people-in-line-1604200/

Photo by Cátia Matos via https://www.pexels.com/photo/people-in-line-1604200/

At YipitData we have built a queueing system that works well for web scraping. When we started to develop this system back in 2014, we couldn’t find anything in the market that was a good fit. Most of the solutions we found worked either as a FIFO or LIFO, but we needed more flexibility than that.

Challenges of Web Scraping

Web scraping sounds simple, but we’ve had to be creative to build a scalable and robust solution.

For example, imagine you need to retrieve all links from a page and then scrape each page and save some information to the database. You could write some code like this:

def scrape_listing():
    for link in get_links(LISTING_URL):
        scrape_link(link)
 
 
def scrape_link(url):
    resp = requests.get(url)
    info = get_link_info(resp.content)
    save('link', info)

This code works at a small scale and that’s how a lot of people write scrapers. However, when you’re running scrapers at scale, many things can go wrong and a robust solution must accommodate problems such as:

  • the website changes
  • servers are terminated suddenly
  • the process uses too much memory and crashes
  • exceptions are thrown
  • a bug that only happens on a percentage of the dataset

If any of these problems happen with the previous code, you’ll have to start over. If you have a million listing URLs to scrape, it may be too costly and time-consuming to re-do all the work when something goes wrong. The previous solution isn’t acceptable for our business.

Web Scraping at YipitData

Our scrapers are composed of multiple tasks. Each task is short-lived and can be executed in isolation, so if anything happens to a part of the scraper, that part can be re-executed independently. This is how we’d write the previous solution with our framework:

@starting_task
def scrape_listing():
    for link in get_links(LISTING_URL):
        schedule('scrape_link', link, seconds=0)
 
 
@subtask
def scrape_link(url):
    resp = requests.get(url)
    info = get_link_info(resp.content)
    save('link', info)

The biggest difference is in the scrape_listing() function, which is delaying the call to scrape_link(). This example is identical to a regular FIFO queue, however, we have the ability to schedule tasks to run at any given time in the future (via the seconds parameter). We have a separate queue for each @subtask decorated function.

It’s important for us not to perform extra scrapes when the system crashes. If anything goes wrong with scrape_listing(), we don’t want to add the same tasks to a queue, which means that our queueing solution must have uniqueness built-in. The following code only reschedules link in the scrape_link queue — it doesn’t add any duplicates to the queue:

for _ in range(10):
   schedule('scrape_link', link, seconds=0)

Our queueing solution isn’t FIFO or LIFO, it has uniqueness built-in and elements can move around. We have a flexible delayed unique priority queueing system.

Built on Redis

We took advantage of Redis’s rich data structures to create our queueing solution, mostly because of sorted sets and the serial properties of Redis (no concurrent read/write). Each of our queues is a sorted set in Redis, the score represents the scheduled time and the element is the data. All of the queueing logic lives in the client side but is executed on the Redis side — because of the serial properties of Redis, there’s no need for task coordination, which simplifies the solution. We have built a queueing framework in Python and may release it as open-source in the future.

For example, the following code means that we want to add the element http://example.com to the scrape_link queue, and it will be available in 0 seconds:

schedule('scrape_link', 'http://example.com', seconds=0)

The previous code is almost equivalent to a ZADD scrape_link 0 "http://example.com" in Redis (we do more things behind the scenes).

Redis & Lua scripts

We implement at-least-once delivery, which means that an element can be processed more than once. This is necessary because if an element is retrieved from a queue but never acknowledges the task completion (took too long to finish or crashed), it will be available in the queue, in the future, for a retry. We take advantage of Lua scripts to create atomic custom commands, such as one to remove an element from the queue and reschedule it in the future (in case the task never finishes).

Feature Set Summary

The following are our system’s features:

  • Each queue is a task queue
  • Each queue has elements ordered by timestamp
  • When adding an element to a queue and it already exists, its timestamp is overwritten
  • Elements in a queue are unique (no duplication)
  • Queues can retain elements forever (unlimited retention period)
  • Dead Letter Queue support (not explained in this post)
  • Queues have an element count
  • Queues have a past due element count

Challenges with Redis

Redis is an in-memory database, which means that all the data lives in-memory at any point in time. When a system has millions of items across a few queues, it can use multiple gigabytes of memory space. Memory is finite, expensive, and we can’t change a server’s memory size on the fly (the server has to be replaced for a bigger one and that’s a manual task).

We use Redis because of its rich data structures and flexibility, not because of its in-memory speed. If one day you need to add double the items to a queue, it may mean that you need twice the memory space. This is annoying for us because sometimes we can’t predict these requirements and adjust timely —we have seen queues growing too quickly and Redis running out of memory. We will write a separate blog post explaining these and other challenges with more details.

In summary, we need a solution that supports delayed unique priority queues, has near-infinite storage, and has low operating costs.

Readypipe

We’re building a web scraping platform called Readypipe. We are clients of our platform but also offer it to external clients as a service. The challenges mentioned previously were less of a concern when we were the only clients because knew what type of projects our queues supported. However, when dealing with external customers, we can’t predict how much queue capacity their systems will need, which makes an elastic solution with low operating costs a requirement, not an option.

Solutions that don’t seem to work for us

AWS SQS

AWS SQS has multiple types of queues and provides elastic storage, but none of them seemed to satisfy our requirements:

  • Standard Queues and FIFO Queues: No element uniqueness, lack of message delay, can’t reorder elements
  • Delay Queues: Maximum delay of 15 minutes, no element uniqueness, can’t reorder elements

We could create a solution on top of SQS, but it would require a few hacks, such as removing items from a queue and then re-adding them if they aren’t ready to execute — this would create some complexity, require task coordination, and make queue introspection hard (e.g., count how many elements are past due).

RabbitMQ, ZeroMQ

Same problems as SQS.

Apache Kafka

Kafka is a fault-tolerant streaming platform. Although people use topics as queues, they are immutable commit logs. Kafka wouldn’t work for us because of its message immutability and lack of custom element order, but we could create our queuing system on top of it with multiple topics — delays wouldn’t be granular and it’d require task coordination on the client side. For us, Kafka has the same conceptual problems as SQS.

Celery, RQ, Resque, and other task queueing software

All of these solutions are built on top of the previous software (e.g., Redis, SQS, RabbitMQ, Kafka) and they don’t implement the necessary customizations we need on top of those solutions.

Our latest experiment and future work

Redis has worked well for our solution and migrating to any other system involves a lot of risks; we have hundreds of projects built on top of the current behavior, and changing any behavior of the current system may break our projects in unexpected ways (it’s hard to test different solution on every single project). The main problems we have with Redis are its limited storage space and the server costs. It’d be great for us if we could have the Redis behavior through the same API but persist the data differently. We couldn’t find a project that met all of our requirements during our research in 2018, and we have started a project to fill that gap.

We have started DRedis, an open-source disk-based Redis clone. It implements the same network protocol and the same commands as Redis, but it stores the data on the filesystem. Using the filesystem has different problems and we’ll cover all the details of our research and our implementation in a separate post. We’re using it on a few production projects and making improvements constantly. The project is available on Github under the MIT license.

Acknowledgments

Thanks to Mingwei Gu and Jim Shields for their thorough reviews.