4
* Copyright Ericsson AB 2011. All Rights Reserved.
6
* The contents of this file are subject to the Erlang Public License,
7
* Version 1.1, (the "License"); you may not use this file except in
8
* compliance with the License. You should have received a copy of the
9
* Erlang Public License along with this software. If not, it can be
10
* retrieved online at http://www.erlang.org/.
12
* Software distributed under the License is distributed on an "AS IS"
13
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
14
* the License for the specific language governing rights and limitations
21
* Description: Lock-free queue for communication between threads.
23
* Currently only a many-to-one version has been,
24
* implemented, i.e., many threads can enqueue but
25
* only one thread can dequeue at a time. It doesn't
26
* have to be the same thread dequeuing every time, but
27
* synchronization so that only one thread dequeues
28
* at a time has to be provided by other means.
30
* When/If the need for a many-to-many queue arises,
31
* this implementation can relatively easy be extended
32
* to support that too.
34
* Usage instructions can be found in erts_thr_queue.c
36
* Author: Rickard Green
39
#ifndef ERL_THR_QUEUE_H__
40
#define ERL_THR_QUEUE_H__
43
#include "erl_threads.h"
44
#include "erl_alloc.h"
45
#include "erl_thr_progress.h"
48
ERTS_THR_Q_LIVE_UNDEF,
49
ERTS_THR_Q_LIVE_SHORT,
53
#define ERTS_THR_Q_INIT_DEFAULT \
56
ERTS_THR_Q_LIVE_UNDEF, \
57
ERTS_THR_Q_LIVE_SHORT \
64
typedef struct ErtsThrQ_t_ ErtsThrQ_t;
69
ErtsThrQLive_t objects;
72
void (*notify)(void *);
73
int auto_finalize_dequeue;
76
typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
77
typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
81
ErtsThrQElement_t *ptr;
84
struct ErtsThrQElement_t_ {
93
ErtsThrQElement_t *start;
94
ErtsThrQElement_t *end;
99
ERTS_THR_Q_NEED_THR_PRGR,
101
} ErtsThrQCleanState_t;
106
ErtsThrQElement_t marker;
108
erts_atomic_t um_refc[2];
109
erts_atomic32_t um_refc_ix;
112
erts_atomic32_t thr_prgr_clean_scheduled;
115
void (*notify)(void *);
120
* This structure needs to be cache line aligned for best
124
/* Modified by threads enqueuing */
126
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))];
129
* Everything below this point is *only* accessed by the
135
ErtsThrQElement_t *first;
136
ErtsThrQElement_t *unref_end;
137
int clean_reached_head_count;
140
ErtsThrQElement_t *start;
141
ErtsThrQElement_t *end;
145
ErtsThrPrgrVal thr_progress;
146
int thr_progress_reached;
149
ErtsThrQElement_t *unref_end;
153
void (*notify)(void *);
162
#else /* !USE_THREADS */
166
ErtsThrQElement_t *first;
167
ErtsThrQElement_t *last;
175
void erts_thr_q_init(void);
176
void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *);
177
ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *);
178
ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *);
179
ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *);
180
ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *);
181
ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int);
182
ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *);
183
void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *);
184
void erts_thr_q_enqueue(ErtsThrQ_t *, void *);
185
void * erts_thr_q_dequeue(ErtsThrQ_t *);
186
int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *,
188
void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *,
190
int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *);
191
void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *);
194
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q);
197
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
200
ERTS_GLB_INLINE ErtsThrPrgrVal
201
erts_thr_q_need_thr_progress(ErtsThrQ_t *q)
203
return q->head.next.thr_progress;
207
#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */
209
#endif /* ERL_THR_QUEUE_H__ */