Tote bag with multiple arrays
I am getting a very uninformative message FutureWarning
from dask
/ numpy
when executing foldby
in dask.bag
that contains numpy arrays.
def binop(a, b):
print('binop')
return a + b[1]
def combine(a, b):
print('combine')
return a + b[1]
seq = ((np.random.randint(0, 5, size=1)[0], np.ones(5,)) for _ in range(50))
db.from_sequence(seq, partition_size=10)\
.foldby(0, binop=binop, initial=np.zeros(5,), combine=combine)\
.compute()
The goal is to just add a bunch of arrays numpy
. This gives the correct results, but it also creates multiple posts FutureWarning
(looks like one for each section) from numpy
, although they seem to be coming from dask
.
dask / async.py: 247: FutureWarning: rudimentary comparison failed; returns a scalar instead, but an elemental comparison will be done in the future return func (* args2)
Just adding two arrays numpy
without dask
does not result in some involvement in parallel being clearly present .foldby
. It looks like warnings are issued before any computation is done.
- How do I determine if an alert is something I should be worried about?
- How can I alert a warning if I am concerned about it?
I am using python 3.6
dask 0.14.1
and numpy1.12.1
UPDATE
Thanks to @MRocklin's answer, I started looking into this a little more. So the violation code in dask.async.py
this
def _execute_task(arg, cache, dsk=None):
....
if isinstance(arg, list):
return [_execute_task(a, cache) for a in arg]
elif istask(arg):
func, args = arg[0], arg[1:]
args2 = [_execute_task(a, cache) for a in args]
return func(*args2)
is it possible that it dask
is actually trying to iterate over the array numpy
in args2 = [_execute_task(a, cache) for a in args]
, I don't know enough internal (generally speaking, actually) to judge what those variables contain.
source to share
It has something to do with cytoolz.itetoolz.reduceby
meaning init
. Changing init from init=np.zeros((5,))
to init=lambda: np.zeros((5,))
at least gets rid of the warning.
A warning is generated by this line
cpdef dict reduceby(object key, object binop, object seq, object init='__no__default__'):
...
cdef bint skip_init = init == no_default
which compares the passed init ( np.zeros((5,))
) value to a string "__no__default__"
, calling numpy
to exclude an elementary comparison carray
and str
.
So, to answer my own questions:
- No, you don't need to worry about this warning, but it might slow down the program in the future.
- avoid warning altogether by using a callable value
init
- it looks like it won't have any serious negative consequences, but keep in mind that the called
init
call will be called once per executing process
source to share
This warning is indeed from numpy. A quick search through the codebase gives these lines :
if (!strcmp(ufunc_name, "equal") ||
!strcmp(ufunc_name, "not_equal")) {
/* Warn on non-scalar, return NotImplemented regardless */
assert(nin == 2);
if (PyArray_NDIM(out_op[0]) != 0 ||
PyArray_NDIM(out_op[1]) != 0) {
if (DEPRECATE_FUTUREWARNING(
"elementwise comparison failed; returning scalar "
"instead, but in the future will perform elementwise "
"comparison") < 0) {
return -1;
}
}
Dask can make it a little worse, because you will get a warning once per process (dask.bag uses the default process pool).
Also, if your computations are numpy related, you can switch to a threading scheduler rather than a multiprocessing scheduler
mybag.compute(get=dask.threaded.get)
source to share