Determining intervals with specific values
I'm new to Python and am stuck trying to identify "intervals" where y-value = <70. I have an ordered dictionary entry:
d = {0: '92', 11: '70', 43: '77', 44: '76', 61: '77', 64: '69',
68: '67', 84: '68', 93: '87', 108: '81', 141: '74'}
I want to write a function that allows me to identify "intervals" (a, b) based on keys (x-values) d based on y-values = <N. My endpoints (a, b) should be where the values start descend and ascend to and from this N value, so the actual end point values will be "above" N, but the entries in between should be lower.
(a,b): {above, below, below, below, above}
For example, I'm interested in intervals as a dictionary, here N = 70:
{(0,43):{92,70,77}, (61,93): {77, 69, 67, 68, 87}} <-- includes the values at endpoints
But, can ignore those other "intervals" where the values are never lower than 70, so in this case we do not need: (43,51), (93,180)
Is there an easy way to do this? Until now, I could identify the points where the transition from "above" to "below" 70 or vice versa occurs, but I'm not sure how to proceed when creating spans and values (for example, in a dictionary). I seem to have looked at this for too long.
source to share
The following code should give you the result you requested:
oninterval = False
dd = {}
keys = d.keys()
keys.sort()
start_key, first_val = keys[0], d[keys[0]]
for k in keys:
v = float(d[k])
if oninterval:
cur_list.append(v)
if not int(v) <= 70: # interval ends
oninterval = False
dd[(start_key,k)] = cur_list
else:
if int(v) <= 70:
cur_list = [first_val, v]
oninterval = True
else:
start_key, first_val = k, v
if oninterval: dd[(start_key, keys[-1])] = cur_list
Edit:
Extended code share to accept the first or last element to have a y-value <= 70 and treat y-values as floats
source to share
For reasons that I cannot fully explain, this problem mesmerized me. But I think I finally got it from my system. First, a basic, clean and simple solution:
intervals = [[]]
prev = None
sorted_items = sorted(d.iteritems())
for k, v in sorted_items:
if v <= 70:
ext = (k,) if (intervals[-1] or prev is None) else (prev, k)
intervals[-1].extend(ext)
elif intervals[-1]:
intervals[-1].append(k)
intervals.append([])
prev = k
if not intervals[-1]:
intervals.pop()
print dict(((iv[0], iv[-1]), [d[k] for k in iv]) for iv in intervals)
It's easy enough to abstract the above to create an iterator:
def iter_intervals(vals, filter_f, _nil=object()):
prev = _nil
interval = []
for x in vals:
if filter_f(x):
ext = (x,) if (interval or prev is _nil) else (prev, x)
interval.extend(ext)
elif interval:
interval.append(x)
yield interval
interval = []
prev = x
if interval:
yield interval
intervals = iter_intervals(d.iteritems(), lambda x: x[1] <= 70)
print dict(((iv[0][0], iv[-1][0]), [v for k, v in iv]) for iv in intervals)
But it has to store a lot of state. I wonder if there is a way to do less ...
def iter_intervals(vals, filter_f, _nil=object()):
iters = itertools.tee(itertools.chain((_nil,), vals, (_nil,)), 3)
next(iters[1]); next(iters[2]); next(iters[2])
triplets = itertools.izip(*iters)
interval = set()
for p, curr, n in triplets:
if filter_f(curr):
interval.update((p, curr, n))
elif interval:
interval.discard(_nil)
yield sorted(interval)
interval = set()
if interval:
interval.discard(_nil)
yield sorted(interval)
intervals = iter_intervals(d.iteritems(), lambda x: x[1] <= 70)
print dict(((iv[0][0], iv[-1][0]), [v for k, v in iv]) for iv in intervals)
Having done this, it is now more obvious how to adapt ninjagecko's solution to avoid the lookahead / lookbehind issue that caused it to keep the list:
def framed_intervals(points, filter_f, _nil=object()):
iters = itertools.tee(itertools.chain((_nil,), points, (_nil,)), 3)
next(iters[1]); next(iters[2]); next(iters[2])
triplets = itertools.izip(*iters)
for below, group in itertools.groupby(triplets, lambda x: filter_f(x[1])):
if below:
interval = set(itertools.chain.from_iterable(group))
interval.discard(_nil) # or continue if None in interval to
yield sorted(interval) # drop incomplete intervals
intervals = framed_intervals(d.iteritems(), lambda x: x[1] <= 70)
print dict(((iv[0][0], iv[-1][0]), [v for k, v in iv]) for iv in intervals)
source to share
d = {0: '92', 11: '70', 43: '77', 44: '76', 61: '77', 64: '69',
68: '67', 84: '68', 93: '87', 108: '81', 141: '74'}
r = []
k = None
v = None
for i in sorted(d.keys()):
if not k is None:
v.append(d[i])
if int(d[i]) > 70:
if k is None:
k = [i]
v = [d[i]];
else:
k.append(i)
r.append((tuple(k), v))
k = None
v = None
print r
source to share
Here's a somewhat detailed solution:
import collections
values = [(0, '92'), (11, '70'), (43, '77'), (44, '76'), (61, '77'), (64, '69'),
(68, '67'), (84, '68'), (93, '87'), (108, '81'), (141, '74')]
d = collections.OrderedDict(values)
def intervals(d, n):
result = collections.OrderedDict()
interval = list()
lastk, lastv, startk = None, None, None
for k, v in d.iteritems():
if int(v) > n:
if startk is not None:
interval.append(int(d[k]))
result[(startk, k)] = interval
interval = list()
startk = None
else:
if lastv:
interval.append(int(d[lastk]))
startk = lastk
interval.append(int(d[k]))
lastk, lastv = k, int(v) > n
return result
if __name__ == '__main__':
print intervals(d, 70)
When I run this, it prints:
OrderedDict([((0, 43), [92, 70, 77]), ((61, 93), [77, 69, 67, 68, 87])])
which is the desired result.
source to share
sidenote: your dictionary has string values, not int values. And you could mean <= and not <, in your example.
So, to formulate your problem more clearly, you:
- Has an ordered list of points
(x,y)
- Enter the threshold value T
- We want to find all consecutive runs of points p i, ..., p jso that the end points are> T but the other points are not; those. all areas where the points "go down and out" of the threshold line. ( Note that such runs can overlap, for example
[71,70,{71],70,71}
)
The algorithm will look like this:
from itertools import *
def dippingIntervals(points, threshold=70):
yBelowThreshold = lambda i: points[i][1]<=threshold
for below,g in groupby(range(len(points)), yBelowThreshold):
if below:
interval = list(g)
start,end = interval[0],interval[-1]
if start>0 and end<len(points)-2: #modify if "open" intervals also desired
yield points[start-1 : end+2]
Demo:
>>> d = [(0, 92), (11, 70), (43, 77), (44, 76), (61, 77), (64, 69), (68, 67), (84, 68), (93, 87), (108, 81), (141, 74)]
>>> pprint(list( dippingIntervals(d) ))
[((0, 92), (11, 70), (43, 77)),
((61, 77), (64, 69), (68, 67), (84, 68), (93, 87))]
You can post-process the data without too much trouble, for example to get it in the format you want, by changing the above function as follows:
... yield (start,end), {xy[1] for xy in points[start-1 : end+2]}
The disadvantage of this method is that it does not work on iterators; the following will work on iterators and a more "classic" way of doing it:
def getY(point):
return point[1]
def dippingIntervals(points, threshold=70, key=getY):
"""
Returns runs of points whose y-values dip below intervals
>>> list( dippingIntervals([71,70,74,64,64,70,71], key=lambda x:x) )
[(71, [70], 74),
(74, [64, 64, 70], 71)]
"""
def match(point):
return key(point)<=threshold
lastP = None
for p in points:
if lastP==None:
lastP = p
continue
if not match(lastP) and match(p):
start = lastP
R = [p]
elif match(lastP) and match(p):
R += [p]
elif match(lastP) and not match(p):
end = p
yield start,R,end
lastP = p
source to share