Filter with tolerance for numpy 1d array

Let's say I have a 1d numpy array with some noisy rows of data.

I want to set a threshold to check when the values ​​are high and low. However, since the data is noisy, it makes no sense to just do

is_high = data > threshold

      

I tried to establish a tolerance for this threshold, as many control systems do (for example, most heating and air conditioning systems). The idea is that the signal state only changes from low to high when a threshold plus tolerance is passed. Likewise, the signal will only change from high to low if it is below the threshold minus the tolerance. In other words:

def tolerance_filter(data, threshold, tolerance):
    currently_high = False  # start low
    signal_state = np.empty_like(data, dtype=np.bool)
    for i in range(data.size):
        # if we were high and we are getting too low, become low
        if currently_high and data[i] < (threshold-tolerance):
            currently_high = False
        # if we were low and are getting too high, become high
        elif not currently_high and data[i] > (threshold+tolerance):
            currently_high = True
        signal_state[i] = currently_high
    return signal_state

      

This function gives the result that I expect. However, I'm wondering if there is a way to do this using speed numpy

or scipy

instead of a python loop for

.

Any ideas? :)


UPDATE:

Thanks to the comment by Joe Kington who pointed me to the term hysteresis , I found this other question . I'm afraid it is very similar (duplicate?) And there is one nice working solution there too, Bas Swinckels.

Anyway, I tried to implement the speedups suggested by Joe Kington (don't know if I did it right) and compared his solution to Fergal and Bas versus my naive approach. Here are the results (code below):

Proposed function in my original question
10 loops, best of 3: 22.6 ms per loop

Proposed function by Fergal
1000 loops, best of 3: 995 µs per loop

Proposed function by Bas Swinckels in the hysteresis question
1000 loops, best of 3: 1.05 ms per loop

Proposed function by Joe Kington using Cython
Approximate time cost of compiling: 2.195411
1000 loops, best of 3: 1.35 ms per loop

      

All approaches in the answers are similar (although it will take a few extra steps to get the Fergal boolean vector!). Any thought to add here? Also, I'm surprised that the approach is Cython

slower (albeit slightly). Anyway, I have to admit that this is the fastest code to code if you don't know all the functions numpy

by heart ...

Here is the code I used to compare various parameters. Audits and changes are more than welcome!: P (Cython code is in the middle to force SO to keep all code in the same scrollable snippet. Of course I used it in a different file)

# Naive approach from the original question
def tolerance_filter1(data, threshold, tolerance):
    currently_high = False  # start low
    signal_state = np.empty_like(data, dtype=np.bool)
    for i in range(data.size):
        # if we were high and we are getting too low, become low
        if currently_high and data[i] < (threshold-tolerance):
            currently_high = False
        # if we were low and are getting too high, become high
        elif not currently_high and data[i] > (threshold+tolerance):
            currently_high = True
        signal_state[i] = currently_high
    return signal_state

# Numpythonic approach suggested by Fergal
def tolerance_filter2(data, threshold, tolerance):
    a = np.zeros_like(data)
    a[ data < threshold-tolerance] = -1
    a[ data > threshold+tolerance] = +1
    wh = np.where(a != 0)[0]
    idx= np.diff( a[wh]) == 2
    #This variable indexes the values of data where data crosses
    #from below threshold-tol to above threshold+tol
    crossesAboveThreshold = wh[idx]
    return crossesAboveThreshold

# Approach suggested by Bas Swinckels and borrowed
# from the hysteresis question
def tolerance_filter3(data, threshold, tolerance, initial=False):
    hi = data >= threshold+tolerance
    lo_or_hi = (data <= threshold-tolerance) | hi
    ind = np.nonzero(lo_or_hi)[0]
    if not ind.size: # prevent index error if ind is empty
        return np.zeros_like(x, dtype=bool) | initial
    cnt = np.cumsum(lo_or_hi) # from 0 to len(x)
    return np.where(cnt, hi[ind[cnt-1]], initial)

#########################################################
## IN A DIFFERENT FILE (tolerance_filter_cython.pyx)
## So that StackOverflow shows a single scrollable code block :)

import numpy as np
import cython

@cython.boundscheck(False)
def tolerance_filter(data, float threshold, float tolerance):
    cdef bint currently_high = 0  # start low
    signal_state = np.empty_like(data, dtype=int)
    cdef double[:] data_view = data
    cdef long[:] signal_state_view = signal_state
    cdef int i = 0
    cdef int l = len(data)
    low = np.empty_like(data, dtype=bool)
    high = np.empty_like(data, dtype=bool)
    low = data < (threshold-tolerance)
    high = data > (threshold+tolerance)

    for i in range(l):
        # if we were high and we are getting too low, become low
        if currently_high and low[i]:
            currently_high = False
        # if we were low and are getting too high, become high
        elif not currently_high and high[i]:
            currently_high = True
        signal_state_view[i] = currently_high
    return signal_state

##################################################################
# BACK TO THE PYTHON FILE

import numpy as np
from time import clock
from datetime import datetime
from IPython import get_ipython
ipython = get_ipython()
time = np.arange(0,1000,0.01)
data = np.sin(time*3) + np.cos(time/7)*8 + np.random.normal(size=time.shape)*2
threshold, tolerance = 0, 4

print "Proposed function in my original question"
ipython.magic("timeit tolerance_filter1(data, threshold, tolerance)")

print "\nProposed function by Fergal"
ipython.magic("timeit tolerance_filter2(data, threshold, tolerance)")

print "\nProposed function by Bas Swinckels in the hysteresis question"
ipython.magic("timeit tolerance_filter3(data, threshold, tolerance)")

print "\nProposed function by Joe Kington using Cython"
start = datetime.now()
import pyximport; pyximport.install()
import tolerance_filter_cython
print "Approximate time cost of compiling: {}".format((datetime.now()-start).total_seconds())
tolerance_filter4 = tolerance_filter_cython.tolerance_filter
ipython.magic("timeit tolerance_filter4(data, threshold, tolerance)")

      

+3


source to share


2 answers


I find it sometimes surprising to see how easy and Python-like cython extensions are . Here is your code converted to cython. It can be called from Python, but should give you C ++ speeds.

def tolerance_filter(data, float threshold, float tolerance):
    cdef bint currently_high = 0  # start low
    signal_state = np.empty_like(data, dtype=int)
    cdef float[:] data_view = data
    cdef int[:] signal_state_view = signa_state
    cdef int i = 0
    cdef int l = len(data)
    for i in range(l):
        # if we were high and we are getting too low, become low
        if currently_high and data[i] < (threshold-tolerance):
            currently_high = False
        # if we were low and are getting too high, become high
        elif not currently_high and data[i] > (threshold+tolerance):
            currently_high = True
        signal_state_view[i] = currently_high

      



There are a few notes:

  • Note the use of mapped memory types at the beginning of the function

  • The function was intentionally kept as close to the original code as possible. However, it can be sped up by disabling range checking (see Cython docs) and calculating upper and lower effective thresholds outside of the loop.

+2


source


I'm not sure if this is better than your solution, but it is more numpythonic.



a = np.zeros_like(data)
a[ data < threshold-tol] = -1
a[ data > threshold+tol] = +1
wh = np.where(a != 0)
idx= np.diff( a[wh]) == 2
#This variable indexes the values of data where data crosses
#from below threshold-tol to above threshold+tol
crossesAboveThreshold = wh[idx]

      

0


source







All Articles