Mask only when consecutive nans exceed x
I was answering a question about . The OP wanted to only use interpolation where the number of consecutive was equal to one. The parameter for will interpolate the first one and stop there. The OP wanted to be able to say there was actually more than one and not even bother with the first. pandas
interpolation
np.nan
limit=1
interpolate
np.nan
np.nan
I brewed this up to just execute interpolate
as is and disguise the serial np.nan
after the fact.
The question arises: what is a generic solution that takes a 1-dimensional array a
and an integer x
and creates a boolean mask with False at positions x or more consecutivenp.nan
Consider a 1-dimensional array a
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
I expect the x = 2
mask to look like this:
# assume 1 for True and 0 for False
# a is [ 1. nan nan nan 1. nan 1. 1. nan nan 1. 1.]
# mask [ 1. 0. 0. 0. 1. 1. 1. 1. 0. 0. 1. 1.]
# ^
# |
# Notice that this is not masked because there is only one np.nan
I expect the x = 3
mask to look like this:
# assume 1 for True and 0 for False
# a is [ 1. nan nan nan 1. nan 1. 1. nan nan 1. 1.]
# mask [ 1. 0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1.]
# ^ ^ ^
# | | |
# Notice that this is not masked because there is less than 3 np.nan's
I look forward to learning from other ideas; -)
source to share
I love numbafor such an easy to understand but difficult "numpyfy" problem! Although this package may be too heavy for most libraries, it allows you to write such "python" functions without losing too much speed:
import numpy as np
import numba as nb
import math
@nb.njit
def mask_nan_if_consecutive(arr, limit): # I'm not good at function names :(
result = np.ones_like(arr)
cnt = 0
for idx in range(len(arr)):
if math.isnan(arr[idx]):
cnt += 1
# If we just reached the limit we need to backtrack,
# otherwise just mask current.
if cnt == limit:
for subidx in range(idx-limit+1, idx+1):
result[subidx] = 0
elif cnt > limit:
result[idx] = 0
else:
cnt = 0
return result
At least if you've been working with pure python, this should be pretty easy to understand and it should work:
>>> a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
>>> mask_nan_if_consecutive(a, 1)
array([ 1., 0., 0., 0., 1., 0., 1., 1., 0., 0., 1., 1.])
>>> mask_nan_if_consecutive(a, 2)
array([ 1., 0., 0., 0., 1., 1., 1., 1., 0., 0., 1., 1.])
>>> mask_nan_if_consecutive(a, 3)
array([ 1., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1.])
>>> mask_nan_if_consecutive(a, 4)
array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
But the really good thing about @nb.njit
-decorator is that this function will be fast:
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
i = 2
res1 = mask_nan_if_consecutive(a, i)
res2 = mask_knans(a, i)
np.testing.assert_array_equal(res1, res2)
%timeit mask_nan_if_consecutive(a, i) # 100000 loops, best of 3: 6.03 ยตs per loop
%timeit mask_knans(a, i) # 1000 loops, best of 3: 302 ยตs per loop
So, for short arrays, this is about 50 times faster, although the difference gets lower, still faster for longer arrays:
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1]*100000)
i = 2
%timeit mask_nan_if_consecutive(a, i) # 10 loops, best of 3: 20.9 ms per loop
%timeit mask_knans(a, i) # 10 loops, best of 3: 154 ms per loop
source to share
I created this generalized solution
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
a = np.asarray(a)
k = a.shape[0]
# I will stride n. I want to pad with 1 less False than
# the required number of np.nan's
n = np.append(np.isnan(a), [False] * (x - 1))
# prepare the mask and fill it with True
m = np.empty(k, np.bool8)
m.fill(True)
# stride n into a number of columns equal to
# the required number of np.nan to mask
# this is essentially a rolling all operation on isnull
# also reshape with `[:, None]` in preparation for broadcasting
# np.where finds the indices where we successfully start
# x consecutive np.nan's
s = n.strides[0]
i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
# since I prepped with `[:, None]` when I add `np.arange(x)`
# I'm including the subsequent indices where the remaining
# x - 1 np.nan are
i = i + np.arange(x)
# I use `pd.unique` because it doesn't sort and I don't need to sort
i = pd.unique(i[i < k])
m[i] = False
return m
no comment
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
a = np.asarray(a)
k = a.shape[0]
n = np.append(np.isnan(a), [False] * (x - 1))
m = np.empty(k, np.bool8)
m.fill(True)
s = n.strides[0]
i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
i = i + np.arange(x)
i = pd.unique(i[i < k])
m[i] = False
return m
demo
mask_knans(a, 2)
[ True False False False True True True True False False True True]
mask_knans(a, 3)
[ True False False False True True True True True True True True]
source to share