Tote bag with multiple arrays

I am getting a very uninformative message FutureWarning

from dask

/ numpy

when executing foldby

in dask.bag

that contains numpy arrays.

def binop(a, b):
    print('binop')
    return a + b[1]

def combine(a, b):
    print('combine')
    return a + b[1]

seq = ((np.random.randint(0, 5, size=1)[0], np.ones(5,)) for _ in range(50))
db.from_sequence(seq, partition_size=10)\
    .foldby(0, binop=binop, initial=np.zeros(5,), combine=combine)\
    .compute()

      

The goal is to just add a bunch of arrays numpy

. This gives the correct results, but it also creates multiple posts FutureWarning

(looks like one for each section) from numpy

, although they seem to be coming from dask

.

dask / async.py: 247: FutureWarning: rudimentary comparison failed; returns a scalar instead, but an elemental comparison will be done in the future return func (* args2)

Just adding two arrays numpy

without dask

does not result in some involvement in parallel being clearly present .foldby

. It looks like warnings are issued before any computation is done.

  • How do I determine if an alert is something I should be worried about?
  • How can I alert a warning if I am concerned about it?

I am using python 3.6

dask 0.14.1

and numpy1.12.1

dask.bag.foldby


UPDATE

Thanks to @MRocklin's answer, I started looking into this a little more. So the violation code in dask.async.py

this

def _execute_task(arg, cache, dsk=None):
....
    if isinstance(arg, list):
        return [_execute_task(a, cache) for a in arg]
    elif istask(arg):
        func, args = arg[0], arg[1:]
        args2 = [_execute_task(a, cache) for a in args]
        return func(*args2)

      

is it possible that it dask

is actually trying to iterate over the array numpy

in args2 = [_execute_task(a, cache) for a in args]

, I don't know enough internal (generally speaking, actually) to judge what those variables contain.

+3


source to share


2 answers


It has something to do with cytoolz.itetoolz.reduceby

meaning init

. Changing init from init=np.zeros((5,))

to init=lambda: np.zeros((5,))

at least gets rid of the warning.

A warning is generated by this line

cpdef dict reduceby(object key, object binop, object seq, object init='__no__default__'):
...
    cdef bint skip_init = init == no_default

      



which compares the passed init ( np.zeros((5,))

) value to a string "__no__default__"

, calling numpy

to exclude an elementary comparison carray

and str

.

So, to answer my own questions:

  • No, you don't need to worry about this warning, but it might slow down the program in the future.
  • avoid warning altogether by using a callable value init

  • it looks like it won't have any serious negative consequences, but keep in mind that the called init

    call will be called once per executing process
+2


source


This warning is indeed from numpy. A quick search through the codebase gives these lines :

    if (!strcmp(ufunc_name, "equal") ||
            !strcmp(ufunc_name, "not_equal")) {
        /* Warn on non-scalar, return NotImplemented regardless */
        assert(nin == 2);
        if (PyArray_NDIM(out_op[0]) != 0 ||
                PyArray_NDIM(out_op[1]) != 0) {
            if (DEPRECATE_FUTUREWARNING(
                    "elementwise comparison failed; returning scalar "
                    "instead, but in the future will perform elementwise "
                    "comparison") < 0) {
                return -1;
            }
        }

      

Dask can make it a little worse, because you will get a warning once per process (dask.bag uses the default process pool).



Also, if your computations are numpy related, you can switch to a threading scheduler rather than a multiprocessing scheduler

mybag.compute(get=dask.threaded.get)

      

See http://dask.pydata.org/en/latest/scheduler-choice.html

+2


source







All Articles