2
Copyright (c) 2009-2011 250bpm s.r.o.
3
Copyright (c) 2007-2009 iMatix Corporation
4
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
6
This file is part of 0MQ.
8
0MQ is free software; you can redistribute it and/or modify it under
9
the terms of the GNU Lesser General Public License as published by
10
the Free Software Foundation; either version 3 of the License, or
11
(at your option) any later version.
13
0MQ is distributed in the hope that it will be useful,
14
but WITHOUT ANY WARRANTY; without even the implied warranty of
15
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16
GNU Lesser General Public License for more details.
18
You should have received a copy of the GNU Lesser General Public License
19
along with this program. If not, see <http://www.gnu.org/licenses/>.
22
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
23
#define __ZMQ_YQUEUE_HPP_INCLUDED__
29
#include "atomic_ptr.hpp"
34
// yqueue is an efficient queue implementation. The main goal is
35
// to minimise number of allocations/deallocations needed. Thus yqueue
36
// allocates/deallocates elements in batches of N.
38
// yqueue allows one thread to use push/back function and another one
39
// to use pop/front functions. However, user must ensure that there's no
40
// pop on the empty queue and that both threads don't access the same
41
// element in unsynchronised manner.
43
// T is the type of the object in the queue.
44
// N is granularity of the queue (how many pushes have to be done till
45
// actual memory allocation is required).
47
template <typename T, int N> class yqueue_t
54
begin_chunk = (chunk_t*) malloc (sizeof (chunk_t));
55
alloc_assert (begin_chunk);
59
end_chunk = begin_chunk;
67
if (begin_chunk == end_chunk) {
71
chunk_t *o = begin_chunk;
72
begin_chunk = begin_chunk->next;
76
chunk_t *sc = spare_chunk.xchg (NULL);
81
// Returns reference to the front element of the queue.
82
// If the queue is empty, behaviour is undefined.
85
return begin_chunk->values [begin_pos];
88
// Returns reference to the back element of the queue.
89
// If the queue is empty, behaviour is undefined.
92
return back_chunk->values [back_pos];
95
// Adds an element to the back end of the queue.
98
back_chunk = end_chunk;
104
chunk_t *sc = spare_chunk.xchg (NULL);
106
end_chunk->next = sc;
107
sc->prev = end_chunk;
109
end_chunk->next = (chunk_t*) malloc (sizeof (chunk_t));
110
alloc_assert (end_chunk->next);
111
end_chunk->next->prev = end_chunk;
113
end_chunk = end_chunk->next;
117
// Removes element from the back end of the queue. In other words
118
// it rollbacks last push to the queue. Take care: Caller is
119
// responsible for destroying the object being unpushed.
120
// The caller must also guarantee that the queue isn't empty when
121
// unpush is called. It cannot be done automatically as the read
122
// side of the queue can be managed by different, completely
123
// unsynchronised thread.
124
inline void unpush ()
126
// First, move 'back' one position backwards.
131
back_chunk = back_chunk->prev;
134
// Now, move 'end' position backwards. Note that obsolete end chunk
135
// is not used as a spare chunk. The analysis shows that doing so
136
// would require free and atomic operation per chunk deallocated
137
// instead of a simple free.
142
end_chunk = end_chunk->prev;
143
free (end_chunk->next);
144
end_chunk->next = NULL;
148
// Removes an element from the front end of the queue.
151
if (++ begin_pos == N) {
152
chunk_t *o = begin_chunk;
153
begin_chunk = begin_chunk->next;
154
begin_chunk->prev = NULL;
157
// 'o' has been more recently used than spare_chunk,
158
// so for cache reasons we'll get rid of the spare and
159
// use 'o' as the spare.
160
chunk_t *cs = spare_chunk.xchg (o);
168
// Individual memory chunk to hold N elements.
176
// Back position may point to invalid memory if the queue is empty,
177
// while begin & end positions are always valid. Begin position is
178
// accessed exclusively be queue reader (front/pop), while back and
179
// end positions are accessed exclusively by queue writer (back/push).
180
chunk_t *begin_chunk;
187
// People are likely to produce and consume at similar rates. In
188
// this scenario holding onto the most recently freed chunk saves
189
// us from having to call malloc/free.
190
atomic_ptr_t<chunk_t> spare_chunk;
192
// Disable copying of yqueue.
193
yqueue_t (const yqueue_t&);
194
const yqueue_t &operator = (const yqueue_t&);