Combining functools.lru_cache with multiprocessor technology.

I have a rather complex recursive function with many parameters (Obara-Saika-Scheme in case anyone is wondering) that I would like to evaluate more efficiently. As a first step, I applied @functools.lru_cache

. As a second step, I now want to use a multiprocessing.Pool

long list of inputs to evaluate asynchronously.

Adapting the second example from python functools docs and adding worker pool I have:

from multiprocessing import Pool
from functools import lru_cache

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with Pool(processes=4) as pool:
    for i in range(10):
        res = pool.apply_async(fibonacci, (i,))
        print(res.get())

print(fibonacci.cache_info())

      

Question 1

How to access the cache for different workers. Another question ( How do I share the cache? ) Asks a similar thing, but I couldn't get it to work. Here are my 2 failed approaches to this.

Usage multiprocessing.Pool

:

from multiprocessing import Pool
from functools import lru_cache
import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

res = []
with Pool(processes=4) as pool:

    # submit first task
    res.append(pool.apply_async(fibonacci, (5,)).get())

    # give fibonacci() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(pool.apply_async(fibonacci, (3,)).get())

print(res)

      

Using concurrent.futures

:

import concurrent.futures
from functools import lru_cache

import time

@lru_cache(maxsize=10)
def fibonacci(n):
    print('calculating fibonacci(%i)' %n)   # log whether the function gets called
    if n < 2:
        return n
    return fibonacci(n-1)+fibonacci(n-2)

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

    @lru_cache(maxsize=10)
    def fib_async(n):
        print('calculating fib_async(%i)' %n)
        if n < 2:
            return n
        return fibonacci(n-1) + fibonacci(n-2)

    res = []

    # submit first task
    res.append(executor.submit(fib_async, 5))

    # give fib_async() some time to fill its cache
    time.sleep(1)

    # submit second task
    res.append(executor.submit(fib_async, 3))


res = [e.result() for e in res]

print(res)

      

Both produce basically the same output, showing that the second task is recalculating fibonacci(2)

, although the first task should have already calculated it. How do I share my cache?

This should speed things up a bit, but it still has problems if the repeated calls are poorly synchronized: the call currently being evaluated by worker1 is not yet cached, and worker2 might start evaluating the same thing. Which brings me to:

Question 2

The calculation of Fibonacci numbers is fairly linear in its recursion, i.e. only one parameter decreases. My function is more complex and I can use something that controls not only what inputs have already been calculated, but also keeps track of what is currently being calculated.

To be clear: I want to make many concurrent calls to a recursive function that will make many new calls to the recursive function.

It can be difficult to avoid assigning one call directly to a worker, as this can cause deadlocks when recursion depths exceed the number of workers.

Is there such a thing that I could use? Or do I need to build something myself? I attacked multiprocessing.managers

and concurrent.futures.ProcessPoolExecutor

that might be helpful. But I could use some help to get started.

+3


source to share





All Articles