X

Multiprocessing in python to speed up log file processing

I have a love hate relationship with the optimisation of code, essentially because it can be very rewarding or soul destroying.

As part of log file processing one area which could be improved is the speed at which you can processes the lines in the log file. First a quick recap as to how python will typically process your function.

Python speed limitations

Python by default is limited by a global interpreter lock which makes execution of it’s functions synchronous and causes them to only run on one processor & thread. A processor can have multiple threads running simultaneously and threading is another way of speeding up scripts but it is different to multi processing.

Brief theory over, parsing through log data is very processing intensive and if you can distribute the load across many processors my idea was that it would be much faster. Caveat this is being run on a 20 core machine, improvements on a 4 core machine may vary.

Log files can vary in size and here are the results of the test using two varying log files which are typical in size of a medium website. Or large website pre-filtered to Googlebot.

101mb file

500mb file

Optimisation %: 118% faster

These initial results included the read times of the CSV which was done on a single process, so I ran it again and counted only the time it took to parse the files

100mb file

500mb file

Optimisation %: 106% faster

So essentially doubling the speed of processing these files

Base Problem Structure

I have a long list of strings that I need split and identify rules and values within the subsequent list.

The simplest way for me to handle this is to break the list into the number of chunks that are present on the computer. Multi processing has a handy way to get the number of cores on a machine.

How to get the number of processors on a computer

multiprocessing.cpu_count()

This returns and integer of the number of available processors on your machine.

Now that I have the number of processors on my machine I can turn my list of strings into chunks using a generator.

def chunks(self, l, n):
    for i in range(0, len(l), n):
        yield l[i:i + n]

Breaking this down l is the total list and n is the length each list created from the original list will be. For example if I parsed in the value l = [1,2,3,4,5,6,7,8,9,10] and n = 2 the generator will yield =[[1,2],[3,4]….]

Because I want to distribute the load across the available cores on my machine I want to parse in my n value to equal the total length of my list divided by the processing count. So this is what I parsed into the chunks function.

chunks(listify, int(len(listify) / (multiprocessing.cpu_count() - 2)))

Now I have my list of lists evenly broken up so I have one list per CPU I invoke the processing pool.

p = Pool(processes=multiprocessing.cpu_count() - 2)

You may be wondering why I’m doing cpu count -2 it’s just so the computer I do this one doesn’t grind to a halt while it’s parsing.

Now that I have my accessible pool I need tell my pool to run each list in my list of lists.

results = [p.apply_async(self.parse_chunk, args=(list(x),)) for x in data]

So this may seem complicated but it’s actually quite simple. The main thing to focus on is the apply_async function.

The below is a copy pasta from the python website. Highlighted in red is why you shouldn’t use .apply().

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async()is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.


Everything in the square brackets is optional so the first thing that is sent is the function that is going to process the data but you can’t include any variables just the function name. The data being parsed to that function needs to go into the argument tuple with the data in the [0] index.

So this line applies a asynchronous task for each list in our list of lists.

These all return the data in a new list which is stored in a result object.

Get the result from an Apply_async function

results = [item.get() for item in results]
self.results = sum(results, [])

All the results are lists and in order to access then we need to call the .get() method on the result object. Because each object is in a list when I call the get method in list comprehension I create a list of lists… again.

Finally to merge a list of lists a sum function can be used to create a new list.

With this process I was able to double the speed of processing a long list of values.

Watch this space for further optimisations.

Finally a shoutout to Ed who originally wrote a lot of the base code this is based on.

Final script – https://github.com/saiyancode/LogIt

    def chunks(self, l, n):
        for i in range(0, len(l), n):
            yield l[i:i + n]

    def parse_chunk(self,data):
        results = []
        for line in data:
            try:
                parsed = dict()
                parsed['ip'] = line[0]
                parsed['date'], parsed['time'] = self.__parse_date_time(line[3].lstrip('['))
                parsed['url'] = line[6]
                parsed['status'] = line[8]
                parsed['request_size'] = line[9]
                parsed['user_agent'] = ' '.join(line[11:])
                parsed['verify'] = str(self.verify(parsed['user_agent'],parsed['ip']))
                results.append(parsed)
            except Exception as e:
                print(e)
        return results

    def run_process(self, file):
        file = self.open_file(file).readlines()
        listify = [line.split() for line in file]

        data = self.chunks(listify, int(len(listify) / (multiprocessing.cpu_count() - 2)))
        p = Pool(processes=multiprocessing.cpu_count() - 2)
        results = [p.apply_async(self.parse_chunk, args=(list(x),)) for x in data]

        # wait for results
        results = [item.get() for item in results]
        self.results = sum(results, [])

 

Will Cecil: Digital Marketer, Python Tinkerer & Tech Enthusiast. Follow me: Website / Twitter / Github
Related Post