1
"""Example showing how to merge multiple remote data streams.
3
# Slightly modified version of:
4
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
7
from IPython.parallel.error import RemoteError
9
def mergesort(list_of_lists, key=None):
10
""" Perform an N-way merge operation on sorted lists.
12
@param list_of_lists: (really iterable of iterable) of sorted elements
13
(either by naturally or by C{key})
14
@param key: specify sort key function (like C{sort()}, C{sorted()})
16
Yields tuples of the form C{(item, iterator)}, where the iterator is the
17
built-in list iterator or something you pass in, if you pre-generate the
20
This is a stable merge; complexity O(N lg N)
24
>>> print list(mergesort([[1,2,3,4],
25
... [2,3.25,3.75,4.5,6,7],
26
... [2.625,3.625,6.625,9]]))
27
[1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
30
>>> print list(mergesort([[1,2,3,4],
31
... [2,3.25,3.75,4.5,6,7],
32
... [2.625,3.625,6.625,9]],
34
[1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
37
>>> print list(mergesort([[4, 3, 2, 1],
38
... [7, 6, 4.5, 3.75, 3.25, 2],
39
... [9, 6.625, 3.625, 2.625]],
40
... key=lambda x: -x))
41
[9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
45
for i, itr in enumerate(iter(pl) for pl in list_of_lists):
49
toadd = (key(item), i, item, itr)
51
toadd = (item, i, itr)
59
_, idx, item, itr = heap[0]
63
heapq.heapreplace(heap, (key(item), idx, item, itr) )
69
item, idx, itr = heap[0]
72
heapq.heapreplace(heap, (itr.next(), idx, itr))
77
def remote_iterator(view,name):
78
"""Return an iterator on an object living on a remote engine.
80
view.execute('it%s=iter(%s)'%(name,name), block=True)
83
result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
84
# This causes the StopIteration exception to be raised.
85
except RemoteError, e:
86
if e.ename == 'StopIteration':
93
# Main, interactive testing
94
if __name__ == '__main__':
96
from IPython.parallel import Client, Reference
99
print 'Engine IDs:', rc.ids
101
# Make a set of 'sorted datasets'
106
# Now, imagine these had been created in the remote engines by some long
107
# computation. In this simple example, we just send them over into the
108
# remote engines. They will all be called 'a' in each engine.
113
# And we now make a local object which represents the remote iterator
114
aa0 = remote_iterator(rc[0],'a')
115
aa1 = remote_iterator(rc[1],'a')
116
aa2 = remote_iterator(rc[2],'a')
118
# Let's merge them, both locally and remotely:
119
print 'Merge the local datasets:'
120
print list(mergesort([a0,a1,a2]))
122
print 'Locally merge the remote sets:'
123
print list(mergesort([aa0,aa1,aa2]))