Combining functools.lru_cache with multiprocessor technology.
I have a rather complex recursive function with many parameters (Obara-Saika-Scheme in case anyone is wondering) that I would like to evaluate more efficiently. As a first step, I applied @functools.lru_cache
. As a second step, I now want to use a multiprocessing.Pool
long list of inputs to evaluate asynchronously.
Adapting the second example from python functools docs and adding worker pool I have:
from multiprocessing import Pool
from functools import lru_cache
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n)
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with Pool(processes=4) as pool:
for i in range(10):
res = pool.apply_async(fibonacci, (i,))
print(res.get())
print(fibonacci.cache_info())
Question 1
How to access the cache for different workers. Another question ( How do I share the cache? ) Asks a similar thing, but I couldn't get it to work. Here are my 2 failed approaches to this.
Usage multiprocessing.Pool
:
from multiprocessing import Pool
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
res = []
with Pool(processes=4) as pool:
# submit first task
res.append(pool.apply_async(fibonacci, (5,)).get())
# give fibonacci() some time to fill its cache
time.sleep(1)
# submit second task
res.append(pool.apply_async(fibonacci, (3,)).get())
print(res)
Using concurrent.futures
:
import concurrent.futures
from functools import lru_cache
import time
@lru_cache(maxsize=10)
def fibonacci(n):
print('calculating fibonacci(%i)' %n) # log whether the function gets called
if n < 2:
return n
return fibonacci(n-1)+fibonacci(n-2)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
@lru_cache(maxsize=10)
def fib_async(n):
print('calculating fib_async(%i)' %n)
if n < 2:
return n
return fibonacci(n-1) + fibonacci(n-2)
res = []
# submit first task
res.append(executor.submit(fib_async, 5))
# give fib_async() some time to fill its cache
time.sleep(1)
# submit second task
res.append(executor.submit(fib_async, 3))
res = [e.result() for e in res]
print(res)
Both produce basically the same output, showing that the second task is recalculating fibonacci(2)
, although the first task should have already calculated it. How do I share my cache?
This should speed things up a bit, but it still has problems if the repeated calls are poorly synchronized: the call currently being evaluated by worker1 is not yet cached, and worker2 might start evaluating the same thing. Which brings me to:
Question 2
The calculation of Fibonacci numbers is fairly linear in its recursion, i.e. only one parameter decreases. My function is more complex and I can use something that controls not only what inputs have already been calculated, but also keeps track of what is currently being calculated.
To be clear: I want to make many concurrent calls to a recursive function that will make many new calls to the recursive function.
It can be difficult to avoid assigning one call directly to a worker, as this can cause deadlocks when recursion depths exceed the number of workers.
Is there such a thing that I could use? Or do I need to build something myself? I attacked multiprocessing.managers
and concurrent.futures.ProcessPoolExecutor
that might be helpful. But I could use some help to get started.
source to share
No one has answered this question yet
See similar questions:
or similar: