building a multiprocessing process runner
June 30, 2019
10 min read

DISCLAIMER: The code here is written in a highly simplified way and does not address important corner cases of multiprocess worker execution, so please do not try to use this as a basis for something important. It was brain-dumped as an example to illustrate a high-level approach to a very specific problem.

motivation

I’m a big fan of sh/bash but sometimes my lack knowledge for those languages can make it difficult for me to solve a problem.

Recently, I’ve had the need to invoke a python program with a huge set of different inputs and do something with the output of each invocation of that program, in order of execution. I hit a huge limitation with my shell scripts because I just couldn’t really find a good (or readable) way to manage multiple processes in a shell and easily deal with their output.

This post is just about the approach I took to solving this problem using Python’s multiprocessing standard library module.

Also, the example uses both requests and click to provide a contrived “thing to do” and a simple CLI for the script, respectively.

the example

This isn’t actually the reason I needed to build a multiprocessing-parallelizer-thingy but the underlying problem is more or less the same; you have a python program and it does a thing based on input, the thing takes some time, and you actually care about the output from that program. In addition, you want to run that program lots and lots of times with varied input while you do something with the output on a per-run basis.

As an example, let’s say we wrote a script that does a thing, and that thing is accept a website as input and make an HTTP request to it. Once the response comes back, the program then prints the status code to stdout.

"""fetch_website.py"""
import click
import argparse
import sys
from urllib import parse as urlparse

import requests


def fetch(website):
    response = requests.get(website)
    print(response.status_code)


@click.command()
@click.argument("website", metavar="WEBSITE")
def main(website):
    fetch(website)


if __name__ == "__main__":
    main()

And testing it out:

$ python fetch_website.py https://blog.amilstead.com/
200

So that’s simple but not super useful yet. Let’s update the script to do something more important.

Let’s say we want to test how often a given status code is seen for lots of calls to our site rather than just one. Once captured, we can then print a whole report of status codes to the terminal.

To implement that let’s add a new option and tweak fetch to request the website in a loop:

    ...
    def fetch(website, num_times):
        fetches_remaining = num_times or 1
        status_codes = collections.defaultdict(int)
        while fetches_remaining > 0:
            response = requests.get(website)
            status_codes[response.status_code] += 1
            fetches_remaining -= 1

        print(f"report for: {website}")
        for status_code in sorted(status_codes):
            print(f"saw {status_code}: {status_codes[status_code]} times")


@click.command()
@click.option(
    "-n", "--num-times",
    type=click.INT,
    help="number of times to fetch WEBSITE"
)
@click.argument("website", metavar="WEBSITE")
def main(website, num_times):
    fetch(website, num_times)

Now to give it a spin:

$ python fetch_website.py -n 5 https://blog.amilstead.com/
report for: https://blog.amilstead.com/
saw 200: 5 times

a “scale problem”

So now that our script can give us some sampling, we can extend our use case to begin capturing these reports periodically over time and using the raw data to establish a signal for the health of a collection of sites. For better signal into our health we need to allow for a high throughput of data, so it will be important for us to try and parallelize execution and the interpret the result.

We could do this directly in bash, but managing forked processes in shell languages has always sort of seemed like a black art to me and so far, I have yet to stumble upon an uncomplicated way to fork something and manage its behavior (e.g. output, exit status, etc.).

So why not use Python? The script is already written in it, and the standard library has lots of different ways to manage parallel execution. Let’s use one!

a multiprocessing approach

Python’s multiprocessing module has some great options for helping scale up the usage of our python code. In particular, process pools have a great high-level API, making it super easy to distribute work, and multiprocessing managers give us a way to share information across processes using shared objects.

Arming our trusty fetch_website.py script with the standard library, we can scale up our execution to new heights! Let’s start with parallelizing the fetch.

process pools

Python’s mulitprocessing.Pool effectively manages a group of workers waiting to receive work from their parent. You submit work to the pool through a handful of high-level APIs, and each worker will grab work out of a work queue until the pool is told to shutdown.

Since our script is actually just a python program, we can take advantage of process pools by submitting the fetch method directly to the pool’s work queue.

For example:

"""multiproc_fetch.py"""
import multiprocessing
import sys


def fetch(website, num_times):
    pid = os.getpid()
    print(f"starting fetch from {pid}")
    ...


def fetch_parallel(num_times, websites):

    pool = multiprocessing.Pool()
    for website in websites:
        # queue to call fetch
        call_fetch = functools.partial(fetch, website, num_times)
        pool.apply_async(call_fetch)

    # No more work can be submitted.
    pool.close()

    # Wait for the work to complete.
    pool.join()


@click.command()
@click.option(
    "-n", "--num-times",
    type=click.INT,
    help="Number of times to fetch WEBSITE"
)
@click.argument("website", metavar="WEBSITE", nargs=-1)
def main(num_times, website):
    fetch_parallel(num_times, website)


if __name__ == "__main__":
    main()

So now running it against a couple of sites will do work in parallel:

$ python fetch_website.py -n 5 https://blog.amilstead.com https://blog.amilstead.com
starting fetch from 550
starting fetch from 551
report for: https://blog.amilstead.com
saw 200: 5 times
report for: https://blog.amilstead.com
saw 200: 5 times

This is great because we can now parallelize the heck out of our script. But now we have a new problem— anything digesting our report output is going to get messed up, as stdout will be a battleground of interleaved strings from now on. You may have noticed that even in the two-website fetch test above the starting fetch from ... strings print at the same time rather than in-order based on submission.

We could certainly solve this problem by using the subprocess module (e.g. use pipes and buffer output) but working with subprocess has some downsides; in particular we now need to think about our parallelism in terms of processes on a host instead of functions in a program and subprocess doesn’t have a built-in pool, so we’d have to make our own.

Fortunately, multiprocessing has a way to share objects (like strings) across processes by using a Manager and shared objects.

managers

One way to handle digesting our report in the order it was submitted is by putting the report itself into some kind of queue (or list), instead of a standard output stream, on a per-report basis. After the report process has completed, we can treat each element in that queue as part of our report. It turns out that Managers are more or less built explicitly for sharing data structures across multiple processes, so that seems like a good fit.

The only rub is that we use print, and maybe for some crazy reason we want to keep using print. One way to still capture the report is to patch sys.stdout and sys.stderr, so let’s go with that.

patching standard output streams

First, we need a way for a child process to patch its standard output streams to use a shared data structure instead of writing directly to a stream. For this, we can use a simple QueueStream implementation:

class QueueStream:

    @classmethod
    def patch(cls, stream_name, queue):
        """Patch a standard output stream with a QueueStream"""
        setattr(sys, stream_name, cls(stream_name, queue))

    def __init__(self, stream_name, queue):
        self.stream_name = stream_name
        self.queue = queue

    def write(self, *args, **kwargs):
        self.queue.put((self.stream_name, args, kwargs))

    def flush(self, *args, **kwargs):
        """For file-like object API compatibility"""
        pass

sharing objects across processes

Now that we have a way of redirecting standard output, we need to update our fetch method to actually perform the patch before it prints anything. It’s important we invoke the patch function inside the fetch function because that’s the specific code our pool workers execute. Patching anywhere else will affect streams in the parent process rather than a worker.

So let’s update fetch:

...
def fetch(website, num_times, output_queue=None):
    pid = os.getpid()
    ppid = os.getppid()
    # If we're a child process, replace standard output streams
    if pid != ppid and output_queue:
        QueueStream.patch("stdout", output_queue)
        QueueStream.patch("stderr", output_queue)

    print(f"[{pid}] starting fetch from {pid}")

    ...

    print(f"[{pid}] report for: {website}")
    for status_code in sorted(status_codes):
        print(f"[{pid}] saw {status_code}: {status_codes[status_code]} times")

Next, we need to update parallel fetcher function to create a Manager and some shared Queue objects to pass into fetch when the worker goes to execute:

def fetch_parallel(num_times, websites):

    pool = multiprocessing.Pool()
    manager = multiprocessing.Manager()
    async_results = []
    for website in websites:
        # Create an output queue for this workload
        output_queue = manager.Queue()
        # queue to call fetch
        call_fetch = functools.partial(
            fetch, website, num_times, output_queue=output_queue
        )
        # save queues for later use
        async_results.append((output_queue, pool.apply_async(call_fetch)))
    ...

Great! Now the logic in fetch for initializing child streams will fire since we’re passing a shared output_queue into its execution.

Our final task is to actually print the output from our queue in order of process submission. We can start by adding a drain function to our QueueStream to drain the output of the queue:

class QueueStream:
    @staticmethod
    def drain(queue):
        """Drain a queue and print its messages to standard output streams"""
        while not queue.empty():
            stream, args, kwargs = queue.get()
            output_stream = getattr(sys, stream)
            output_stream.write(*args, **kwargs)
    ...

Then last but not least, we can iterate through our list of queues and drain them while the process is still working:

...
def fetch_parallel(num_times, websites)

    ...
    # No more work can be submitted
    pool.close()

    # Now print output in the order it was submitted
    for output_queue, async_result in async_results:
        while True:
            try:
                result = async_result.get(1)
            except multiprocessing.TimeoutError:
                # this raises while the process is still running.
                QueueStream.drain(output_queue)
                continue
            else:
                # if the process completed, drain and move to the next one.
                QueueStream.drain(output_queue)
                break

    # Wait for the work to complete.
    pool.join()

That should be it for our modifications. Let’s give it a spin:

$ python fetch_website.py -n 5 https://blog.amilstead.com https://blog.amilstead.com
[3093] starting fetch from 3093
[3093] report for: https://blog.amilstead.com
[3093] saw 200: 5 times
[3094] starting fetch from 3094
[3094] report for: https://blog.amilstead.com
[3094] saw 200: 5 times

Excellent! Now our output is in order of submission and we’re still highly parallel.

recap

Okay, so let’s recap the situation and what we did.

Our scenario:

  • A long-running script needs to be executed in parallel.
  • That script is written in python.
  • Order is important in standard output streams written to by the script.

Our solution:

  • Use multiprocessing.Pool to parallelize our script behavior.
  • Patch standard output streams to put strings into queue object (via QueueStream).
  • Use multiprocessing.Manager shared objects to allow parent/child processes to use the same queue object when writing and reading.

conclusion

This is a pretty specific use case and in general there are several different ways to parallelize process execution and do something with its output. It leverages the high-level APIs for multiprocess pools and managers and provides a relatively simple way to manage ordered output.

The standard library has a lot to offer (though not all of it is great) and sometimes what it provides fits your use case exceedingly well, thus paving the way to a readable, easy to understand solution to your problem.