~ubuntu-branches/debian/experimental/ipython/experimental

« back to all changes in this revision

Viewing changes to docs/examples/newparallel/nwmerge.py

  • Committer: Package Import Robot
  • Author(s): Julian Taylor
  • Date: 2013-02-02 11:14:27 UTC
  • mfrom: (1.4.1) (10.1.7 sid)
  • Revision ID: package-import@ubuntu.com-20130202111427-kehypljxret7idvu
Tags: 0.13.2~rc2-1
* New upstream release candidate (LP: #1161818, #1162112)
* pass -a to xvfb-run
* drop DM-Upload-Allowed, not needed anymore
* don't link documentation of ipython-doc so ipython3 does not depend on
  ipython (Closes: #695554)
  Requires ipython-doc.preinst to not lose copyright on upgrade
* add ipython3 and ipython3-qtconsole desktop files (Closes: #693612)
* fix detection of cython modules for multiarch python (Closes: #697704)
* don't install tests for notebook and qtconsole
* bump standard to 3.9.4, no changes required
* add autopkgtests running testsuite and testing tools, cython magics
  and incomplete install message
* fix crash on tracebacks without line numbers (Closes: #701597)
* add tkinter package to debianize-error-messages.patch (Closes: #701707)
* use canonical vcs fields in control

Show diffs side-by-side

added added

removed removed

Lines of Context:
1
 
"""Example showing how to merge multiple remote data streams.
2
 
"""
3
 
# Slightly modified version of:
4
 
# http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511509
5
 
 
6
 
import heapq
7
 
from IPython.parallel.error import RemoteError
8
 
 
9
 
def mergesort(list_of_lists, key=None):
10
 
    """ Perform an N-way merge operation on sorted lists.
11
 
 
12
 
    @param list_of_lists: (really iterable of iterable) of sorted elements
13
 
    (either by naturally or by C{key})
14
 
    @param key: specify sort key function (like C{sort()}, C{sorted()})
15
 
 
16
 
    Yields tuples of the form C{(item, iterator)}, where the iterator is the
17
 
    built-in list iterator or something you pass in, if you pre-generate the
18
 
    iterators.
19
 
 
20
 
    This is a stable merge; complexity O(N lg N)
21
 
 
22
 
    Examples::
23
 
 
24
 
    >>> print list(mergesort([[1,2,3,4],
25
 
    ...                      [2,3.25,3.75,4.5,6,7],
26
 
    ...                      [2.625,3.625,6.625,9]]))
27
 
    [1, 2, 2, 2.625, 3, 3.25, 3.625, 3.75, 4, 4.5, 6, 6.625, 7, 9]
28
 
 
29
 
    # note stability
30
 
    >>> print list(mergesort([[1,2,3,4],
31
 
    ...                      [2,3.25,3.75,4.5,6,7],
32
 
    ...                      [2.625,3.625,6.625,9]],
33
 
    ...                      key=int))
34
 
    [1, 2, 2, 2.625, 3, 3.25, 3.75, 3.625, 4, 4.5, 6, 6.625, 7, 9]
35
 
 
36
 
 
37
 
    >>> print list(mergesort([[4, 3, 2, 1],
38
 
    ...                      [7, 6, 4.5, 3.75, 3.25, 2],
39
 
    ...                      [9, 6.625, 3.625, 2.625]],
40
 
    ...                      key=lambda x: -x))
41
 
    [9, 7, 6.625, 6, 4.5, 4, 3.75, 3.625, 3.25, 3, 2.625, 2, 2, 1]
42
 
    """
43
 
 
44
 
    heap = []
45
 
    for i, itr in enumerate(iter(pl) for pl in list_of_lists):
46
 
        try:
47
 
            item = itr.next()
48
 
            if key:
49
 
                toadd = (key(item), i, item, itr)
50
 
            else:
51
 
                toadd = (item, i, itr)
52
 
            heap.append(toadd)
53
 
        except StopIteration:
54
 
            pass
55
 
    heapq.heapify(heap)
56
 
 
57
 
    if key:
58
 
        while heap:
59
 
            _, idx, item, itr = heap[0]
60
 
            yield item
61
 
            try:
62
 
                item = itr.next()
63
 
                heapq.heapreplace(heap, (key(item), idx, item, itr) )
64
 
            except StopIteration:
65
 
                heapq.heappop(heap)
66
 
 
67
 
    else:
68
 
        while heap:
69
 
            item, idx, itr = heap[0]
70
 
            yield item
71
 
            try:
72
 
                heapq.heapreplace(heap, (itr.next(), idx, itr))
73
 
            except StopIteration:
74
 
                heapq.heappop(heap)
75
 
 
76
 
 
77
 
def remote_iterator(view,name):
78
 
    """Return an iterator on an object living on a remote engine.
79
 
    """
80
 
    view.execute('it%s=iter(%s)'%(name,name), block=True)
81
 
    while True:
82
 
        try:
83
 
            result = view.apply_sync(lambda x: x.next(), Reference('it'+name))
84
 
        # This causes the StopIteration exception to be raised.
85
 
        except RemoteError, e:
86
 
            if e.ename == 'StopIteration':
87
 
                raise StopIteration
88
 
            else:
89
 
                raise e
90
 
        else:
91
 
            yield result
92
 
 
93
 
# Main, interactive testing
94
 
if __name__ == '__main__':
95
 
 
96
 
    from IPython.parallel import Client, Reference
97
 
    rc = Client()
98
 
    view = rc[:]
99
 
    print 'Engine IDs:', rc.ids
100
 
 
101
 
    # Make a set of 'sorted datasets'
102
 
    a0 = range(5,20)
103
 
    a1 = range(10)
104
 
    a2 = range(15,25)
105
 
 
106
 
    # Now, imagine these had been created in the remote engines by some long
107
 
    # computation.  In this simple example, we just send them over into the
108
 
    # remote engines.  They will all be called 'a' in each engine.
109
 
    rc[0]['a'] = a0
110
 
    rc[1]['a'] = a1
111
 
    rc[2]['a'] = a2
112
 
 
113
 
    # And we now make a local object which represents the remote iterator
114
 
    aa0 = remote_iterator(rc[0],'a')
115
 
    aa1 = remote_iterator(rc[1],'a')
116
 
    aa2 = remote_iterator(rc[2],'a')
117
 
 
118
 
    # Let's merge them, both locally and remotely:
119
 
    print 'Merge the local datasets:'
120
 
    print list(mergesort([a0,a1,a2]))
121
 
    
122
 
    print 'Locally merge the remote sets:'
123
 
    print list(mergesort([aa0,aa1,aa2]))