An efficient way to iterate over subscriptions of a huge list in python
So I need to find an efficient way to iterate over a large list in python.
Specified: array of integers and number (length of subscriptions)
Limitations: array up to 100 thousand elements, elements in the range (1.2 ** 31)
Task: for each sub-list word, find the difference between the number max and min. Print out the biggest difference.
Ex: [4,6,3,4,8,1,9], number = 3
As far as I understand I have to go through every sublist:
[4,6,3] max - min = 6 - 3 = 3
[6,3,4] 3
[3,4,8] 5
[4,8,1] 7
[8,1,9] 8
final max = 8
So my solution is:
import time
def difference(arr, number):
maxDiff = 0
i = 0
while i+number != len(arr)+1:
diff = max(arr[i:i+number]) - min(arr[i:i+number])
if diff > maxDiff:
maxDiff = diff
i += 1
print maxDiff
length = 2**31
arr = random.sample(xrange(length),100000) #array wasn't given. My sample
t0 = time.clock()
difference(arr,3)
print 'It took :',time.clock() - t0
Answer:
2147101251
It took : 5.174262
I also did the same for loops which give the worst timing:
def difference(arr,d):
maxDiff = 0
if len(arr) == 0:
maxDiff = 0
elif len(arr) == 1:
maxDiff = arr[0]
else:
i = 0
while i + d != len(arr)+1:
array = []
for j in xrange(d):
array.append(arr[i + j])
diff = max(array) - min(array)
if diff > maxDiff:
maxDiff = diff
i += 1
print maxDiff
length = 2**31
arr = random.sample(xrange(length),100000) #array wasn't given. My sample
t0 = time.clock()
difference(arr,1000)
print 'It took :',time.clock() - t0
Answer:
2147331163
It took : 14.104639
My challenge was to cut the time down to 2 seconds.
What would be the most efficient way to do this?
Based on @rchang and @gknicker's answer and comments I was able to achieve an improvement. I am wondering if there is something else I can do?
def difference(arr,d):
window = arr[:d]
arrayLength = len(arr)
maxArrayDiff = max(arr) - min(arr)
maxDiff = 0
while d < arrayLength:
localMax = max(window)
if localMax > maxDiff:
diff = localMax - min(window)
if diff == maxArrayDiff:
return diff
break
elif diff > maxDiff:
maxDiff = diff
window.pop(0)
window.append(arr[d])
d += 1
return maxDiff
#arr = [3,4,6,15,7,2,14,8,1,6,1,2,3,10,1]
length = 2**31
arr = random.sample(xrange(length),100000)
t0 = time.clock()
print difference(arr,1000)
print 'It took :',time.clock() - t0
Answer:
2147274599
It took : 2.54171
Not bad. Any other suggestions?
source to share
Here is my attempt at solving this.
I experimented and measured quite a bit and came to the following conclusions:
-
subset_length
has a significant impact on performance. -
numpy
min / max is much faster than built-in functions, but only for large arrays, below, say 50, there are more assemblies. - This leads to the fact that for subset_length
- below 10 your latest version is the fastest
- between 10 and 50 the non-numpy version of my algorithm (not posted (yet)) is faster
- above 50 my algorithm is the fastest
- at 1000 this algorithm outperforms yours by 100 times.
remember that there array
should be numpy.array()
, but subset_length
should be 3 or more.
def difference_np(array, subset_length):
assert subset_length > 2, "subset_length must be larger than 2"
length = array.size
total_diff = array.max()-array.min()
current_min = array[:subset_length].min()
current_max = array[:subset_length].max()
max_diff = current_max - current_min
max_diff_index = 0
index = subset_length
while index < length:
i_new = index
i_old = index-number
index += 1
new = array[i_new]
old = array[i_old]
# the idea here is to avoid calculating the
# min/max over the entire subset as much as possible,
# so we treat every edge case separately.
if new < current_min:
current_min = new
if old == current_max:
current_max = array[i_old+1:i_new-1].max()
elif new > current_max:
current_max = new
if old == current_min:
current_min = array[i_old+1:i_new-1].min()
elif old == current_min:
current_min = array[i_old+1:i_new].min()
elif old == current_max:
current_max = array[i_old+1:i_new].max()
else:
continue
current_diff = current_max-current_min
if current_diff > max_diff:
max_diff = current_diff
max_diff_index = i_old
# shortcut-condition
if max_diff == total_diff:
print('shortcut at', (index-1)/(length-subset_length), '%' )
break
return max_diff, max_diff_index
I'm not sure if the label condition is effective as it rarely applies and costs two full iterations of the input array.
EDIT
Another margin for improvement exists if the algorithm is used list.pop(0)
. Since it is list
optimized for right-hand side operations, it list.pop(0)
is relatively expensive. Since collections.deque
there is an alternative that provides a fast left-hand side pop: deque.popleft()
. This brings very little improvement in overall speed.
Here's the version of my algorithm not numpy
collections.deque
:
def difference_deque(array, subset_length):
assert subset_length > 1, "subset_length must be larger than 1"
length = len(array)
total_diff = max(array)-min(array)
current_slice = collections.deque(array[:subset_length])
current_min = min(current_slice)
current_max = max(current_slice)
max_diff = current_max - current_min
max_diff_index = 0
index = subset_length
while index < length:
i_new = index
i_old = index-number
index += 1
new = array[i_new]
old = current_slice.popleft()
if new < current_min:
current_min = new
if old == current_max:
current_max = max(current_slice)
current_slice.append(new)
elif new > current_max:
current_max = new
if old == current_min:
current_min = min(current_slice)
current_slice.append(new)
elif old == current_min:
current_slice.append(new)
current_min = min(current_slice)
elif old == current_max:
current_slice.append(new)
current_max = max(current_slice)
else:
current_slice.append(new)
continue
current_diff = current_max-current_min
if current_diff > max_diff:
max_diff = current_diff
max_diff_index = i_old+1
# shortcut-condition
if max_diff == total_diff:
print('shortcut at', (index-1)/(length-number), '%' )
break
return max_diff, max_diff_index
It cuts down the ranking slightly at runtime: - Best 10 algorithms (with deque) - up to 100 my algorithm (with deque) is best - over 100 my algorithm (with numpy) is better
source to share
I came up with this optimization that might save your first execution. Instead of using slices to highlight the numbers to count for each iteration, I once used a slice to initialize a "window". At each iteration, the "right-most" item is added to the window, and the "left-most" item is returned.
import time
import random
def difference(arr, number):
thisSlice = arr[:number-1]
arrSize = len(arr)
maxDiff = -1000
while number < arrSize:
# Put the new element onto the window tail
thisSlice.append(arr[number])
thisDiff = max(thisSlice) - min(thisSlice)
if thisDiff > maxDiff: maxDiff = thisDiff
number += 1
# Get rid of the "leftmost" element, we won't need it for next iteration
thisSlice.pop(0)
print maxDiff
if __name__ == '__main__':
length = 2**31
arr = random.sample(xrange(length),100000)
t0 = time.clock()
difference(arr, 1000)
print 'It took :', time.clock() - t0
At least on my laptop it doesn't go below 2 seconds, but I've seen some improvement over the first implementation you posted. On average, your first solution ran 4.2 to 4.3 seconds on my laptop. This version of the staging window averaged 3.5 to 3.6 seconds.
Hope it helps.
source to share
I think you can use one of the various numpy
rolling functions with as_strided
magic - let's say the one I just stole from here :
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
Using the original difference
, but with return
instead of print
, a arr
is a numpy array:
>>> w = 3
>>> %timeit old_d = difference(arr, w)
1 loops, best of 3: 718 ms per loop
>>> %timeit q = rolling_window(arr, w); ma=q.max(1);mi=q.min(1); new_d=(ma-mi).max()
100 loops, best of 3: 5.68 ms per loop
and
>>> w = 1000
>>> %timeit old_d = difference(arr, w)
1 loops, best of 3: 25.1 s per loop
>>> %timeit q = rolling_window(arr, w); ma=q.max(1);mi=q.min(1); new_d=(ma-mi).max()
1 loops, best of 3: 326 ms per loop
source to share