~ubuntu-branches/ubuntu/saucy/zeromq3/saucy

« back to all changes in this revision

Viewing changes to src/yqueue.hpp

  • Committer: Package Import Robot
  • Author(s): Alessandro Ghedini
  • Date: 2012-06-04 21:21:09 UTC
  • Revision ID: package-import@ubuntu.com-20120604212109-b7b3m0rn21o8oo2q
Tags: upstream-3.1.0~beta+dfsg
ImportĀ upstreamĀ versionĀ 3.1.0~beta+dfsg

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
/*
 
2
    Copyright (c) 2009-2011 250bpm s.r.o.
 
3
    Copyright (c) 2007-2009 iMatix Corporation
 
4
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
 
5
 
 
6
    This file is part of 0MQ.
 
7
 
 
8
    0MQ is free software; you can redistribute it and/or modify it under
 
9
    the terms of the GNU Lesser General Public License as published by
 
10
    the Free Software Foundation; either version 3 of the License, or
 
11
    (at your option) any later version.
 
12
 
 
13
    0MQ is distributed in the hope that it will be useful,
 
14
    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
15
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
16
    GNU Lesser General Public License for more details.
 
17
 
 
18
    You should have received a copy of the GNU Lesser General Public License
 
19
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
20
*/
 
21
 
 
22
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
 
23
#define __ZMQ_YQUEUE_HPP_INCLUDED__
 
24
 
 
25
#include <stdlib.h>
 
26
#include <stddef.h>
 
27
 
 
28
#include "err.hpp"
 
29
#include "atomic_ptr.hpp"
 
30
 
 
31
namespace zmq
 
32
{
 
33
 
 
34
    //  yqueue is an efficient queue implementation. The main goal is
 
35
    //  to minimise number of allocations/deallocations needed. Thus yqueue
 
36
    //  allocates/deallocates elements in batches of N.
 
37
    //
 
38
    //  yqueue allows one thread to use push/back function and another one 
 
39
    //  to use pop/front functions. However, user must ensure that there's no
 
40
    //  pop on the empty queue and that both threads don't access the same
 
41
    //  element in unsynchronised manner.
 
42
    //
 
43
    //  T is the type of the object in the queue.
 
44
    //  N is granularity of the queue (how many pushes have to be done till
 
45
    //  actual memory allocation is required).
 
46
 
 
47
    template <typename T, int N> class yqueue_t
 
48
    {
 
49
    public:
 
50
 
 
51
        //  Create the queue.
 
52
        inline yqueue_t ()
 
53
        {
 
54
             begin_chunk = (chunk_t*) malloc (sizeof (chunk_t));
 
55
             alloc_assert (begin_chunk);
 
56
             begin_pos = 0;
 
57
             back_chunk = NULL;
 
58
             back_pos = 0;
 
59
             end_chunk = begin_chunk;
 
60
             end_pos = 0;
 
61
        }
 
62
 
 
63
        //  Destroy the queue.
 
64
        inline ~yqueue_t ()
 
65
        {
 
66
            while (true) {
 
67
                if (begin_chunk == end_chunk) {
 
68
                    free (begin_chunk);
 
69
                    break;
 
70
                } 
 
71
                chunk_t *o = begin_chunk;
 
72
                begin_chunk = begin_chunk->next;
 
73
                free (o);
 
74
            }
 
75
 
 
76
            chunk_t *sc = spare_chunk.xchg (NULL);
 
77
            if (sc)
 
78
                free (sc);
 
79
        }
 
80
 
 
81
        //  Returns reference to the front element of the queue.
 
82
        //  If the queue is empty, behaviour is undefined.
 
83
        inline T &front ()
 
84
        {
 
85
             return begin_chunk->values [begin_pos];
 
86
        }
 
87
 
 
88
        //  Returns reference to the back element of the queue.
 
89
        //  If the queue is empty, behaviour is undefined.
 
90
        inline T &back ()
 
91
        {
 
92
            return back_chunk->values [back_pos];
 
93
        }
 
94
 
 
95
        //  Adds an element to the back end of the queue.
 
96
        inline void push ()
 
97
        {
 
98
            back_chunk = end_chunk;
 
99
            back_pos = end_pos;
 
100
 
 
101
            if (++end_pos != N)
 
102
                return;
 
103
 
 
104
            chunk_t *sc = spare_chunk.xchg (NULL);
 
105
            if (sc) {
 
106
                end_chunk->next = sc;
 
107
                sc->prev = end_chunk;
 
108
            } else {
 
109
                end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
 
110
                alloc_assert (end_chunk->next);
 
111
                end_chunk->next->prev = end_chunk;
 
112
            }
 
113
            end_chunk = end_chunk->next;
 
114
            end_pos = 0;
 
115
        }
 
116
 
 
117
        //  Removes element from the back end of the queue. In other words
 
118
        //  it rollbacks last push to the queue. Take care: Caller is
 
119
        //  responsible for destroying the object being unpushed.
 
120
        //  The caller must also guarantee that the queue isn't empty when
 
121
        //  unpush is called. It cannot be done automatically as the read
 
122
        //  side of the queue can be managed by different, completely
 
123
        //  unsynchronised thread.
 
124
        inline void unpush ()
 
125
        {
 
126
            //  First, move 'back' one position backwards.
 
127
            if (back_pos)
 
128
                --back_pos;
 
129
            else {
 
130
                back_pos = N - 1;
 
131
                back_chunk = back_chunk->prev;
 
132
            }
 
133
 
 
134
            //  Now, move 'end' position backwards. Note that obsolete end chunk
 
135
            //  is not used as a spare chunk. The analysis shows that doing so
 
136
            //  would require free and atomic operation per chunk deallocated
 
137
            //  instead of a simple free.
 
138
            if (end_pos)
 
139
                --end_pos;
 
140
            else {
 
141
                end_pos = N - 1;
 
142
                end_chunk = end_chunk->prev;
 
143
                free (end_chunk->next);
 
144
                end_chunk->next = NULL;
 
145
            }
 
146
        }
 
147
 
 
148
        //  Removes an element from the front end of the queue.
 
149
        inline void pop ()
 
150
        {
 
151
            if (++ begin_pos == N) {
 
152
                chunk_t *o = begin_chunk;
 
153
                begin_chunk = begin_chunk->next;
 
154
                begin_chunk->prev = NULL;
 
155
                begin_pos = 0;
 
156
 
 
157
                //  'o' has been more recently used than spare_chunk,
 
158
                //  so for cache reasons we'll get rid of the spare and
 
159
                //  use 'o' as the spare.
 
160
                chunk_t *cs = spare_chunk.xchg (o);
 
161
                if (cs)
 
162
                    free (cs);
 
163
            }
 
164
        }
 
165
 
 
166
    private:
 
167
 
 
168
        //  Individual memory chunk to hold N elements.
 
169
        struct chunk_t
 
170
        {
 
171
             T values [N];
 
172
             chunk_t *prev;
 
173
             chunk_t *next;
 
174
        };
 
175
 
 
176
        //  Back position may point to invalid memory if the queue is empty,
 
177
        //  while begin & end positions are always valid. Begin position is
 
178
        //  accessed exclusively be queue reader (front/pop), while back and
 
179
        //  end positions are accessed exclusively by queue writer (back/push).
 
180
        chunk_t *begin_chunk;
 
181
        int begin_pos;
 
182
        chunk_t *back_chunk;
 
183
        int back_pos;
 
184
        chunk_t *end_chunk;
 
185
        int end_pos;
 
186
 
 
187
        //  People are likely to produce and consume at similar rates.  In
 
188
        //  this scenario holding onto the most recently freed chunk saves
 
189
        //  us from having to call malloc/free.
 
190
        atomic_ptr_t<chunk_t> spare_chunk;
 
191
 
 
192
        //  Disable copying of yqueue.
 
193
        yqueue_t (const yqueue_t&);
 
194
        const yqueue_t &operator = (const yqueue_t&);
 
195
    };
 
196
 
 
197
}
 
198
 
 
199
#endif