1
#include <ngx_config.h>
6
static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t * id, ngx_log_t * log);
7
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t * id, ngx_log_t * log);
8
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash);
10
static ngx_http_push_channel_t * ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel);
12
static void ngx_rbtree_generic_insert( ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
13
static void ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
14
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
15
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool);
17
static ngx_http_push_channel_t * ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel) {
18
ngx_queue_t *sentinel = &channel->message_queue->queue;
19
time_t now = ngx_time();
20
ngx_http_push_msg_t *msg=NULL;
21
while(!ngx_queue_empty(sentinel)){
22
msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_msg_t, queue);
23
if (msg!=NULL && msg->expires != 0 && now > msg->expires) {
24
ngx_http_push_delete_message_locked(channel, msg, ngx_http_push_shpool);
26
else { //definitely a message left to send
30
//at this point, the queue is empty
31
return channel->subscribers==0 ? channel : NULL; //if no waiting requests, return this channel to be deleted
34
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash) {
36
res = ngx_http_push_delete_node_locked(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, (ngx_rbtree_node_t *)trash, ngx_http_push_shpool);
38
((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels--;
45
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool) {
46
//assume the shm zone is already locked
47
if(trash != NULL){ //take out the trash
48
ngx_rbtree_delete(tree, trash);
50
//delete the worker-subscriber queue
51
ngx_queue_t *sentinel = (ngx_queue_t *)(&((ngx_http_push_channel_t *)trash)->workers_with_subscribers);
52
ngx_queue_t *cur = ngx_queue_head(sentinel);
54
while(cur!=sentinel) {
55
next = ngx_queue_next(cur);
56
ngx_slab_free_locked(shpool, cur);
60
ngx_slab_free_locked(shpool, trash);
66
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t *id, ngx_log_t *log) {
67
ngx_rbtree_t *tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
69
ngx_rbtree_node_t *node, *sentinel;
71
ngx_http_push_channel_t *up = NULL;
72
ngx_http_push_channel_t *trash[] = { NULL, NULL, NULL };
74
ngx_uint_t i, trashed=0;
79
hash = ngx_crc32_short(id->data, id->len);
82
sentinel = tree->sentinel;
84
while (node != sentinel) {
86
//every search is responsible for deleting a couple of empty, if it comes across them
87
if (trashed < (sizeof(trash) / sizeof(*trash))) {
88
if((trash[trashed]=ngx_http_push_clean_channel_locked((ngx_http_push_channel_t *) node))!=NULL) {
93
if (hash < node->key) {
98
if (hash > node->key) {
103
/* hash == node->key */
106
up = (ngx_http_push_channel_t *) node;
108
rc = ngx_memn2cmp(id->data, up->id.data, id->len, up->id.len);
112
for(i=0; i<trashed; i++) {
113
if(trash[i] != up){ //take out the trash
114
ngx_http_push_delete_channel_locked(trash[i]);
117
ngx_http_push_clean_channel_locked(up);
121
node = (rc < 0) ? node->left : node->right;
123
} while (node != sentinel && hash == node->key);
128
for(i=0; i<trashed; i++) {
129
ngx_http_push_delete_channel_locked(trash[i]);
134
//find a channel by id. if channel not found, make one, insert it, and return that.
135
static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t *id, ngx_log_t *log) {
137
ngx_http_push_channel_t *up=ngx_http_push_find_channel(id, log);
139
if(up != NULL) { //we found our channel
142
tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
143
if((up = ngx_http_push_slab_alloc_locked(sizeof(*up) + id->len + sizeof(ngx_http_push_msg_t)))==NULL) {
146
up->id.data = (u_char *) (up+1); //contiguous piggy
147
up->message_queue = (ngx_http_push_msg_t *) (up->id.data + id->len);
149
up->id.len = (u_char) id->len;
150
ngx_memcpy(up->id.data, id->data, up->id.len);
151
up->node.key = ngx_crc32_short(id->data, id->len);
152
ngx_rbtree_insert(tree, (ngx_rbtree_node_t *) up);
155
ngx_queue_init(&up->message_queue->queue);
158
ngx_queue_init(&up->workers_with_subscribers.queue);
161
((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels++;
167
static void ngx_rbtree_generic_insert(
168
ngx_rbtree_node_t *temp,
169
ngx_rbtree_node_t *node,
170
ngx_rbtree_node_t *sentinel,
171
int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right))
174
if (node->key < temp->key) {
176
if (temp->left == sentinel) {
183
} else if (node->key > temp->key) {
185
if (temp->right == sentinel) {
192
} else { /* node->key == temp->key */
193
if (compare(node, temp) < 0) {
195
if (temp->left == sentinel) {
204
if (temp->right == sentinel) {
215
node->left = sentinel;
216
node->right = sentinel;
220
#define ngx_http_push_walk_rbtree(apply) \
221
ngx_http_push_rbtree_walker(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, (ngx_slab_pool_t *)ngx_http_push_shm_zone->shm.addr, apply, ((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree.root)
223
static void ngx_http_push_rbtree_walker(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_int_t (*apply)(ngx_http_push_channel_t * channel, ngx_slab_pool_t * shpool), ngx_rbtree_node_t *node) {
224
ngx_rbtree_node_t *sentinel = tree->sentinel;
227
apply((ngx_http_push_channel_t *)node, shpool);
228
if(node->left!=NULL) {
229
ngx_http_push_rbtree_walker(tree, shpool, apply, node->left);
231
if(node->right!=NULL) {
232
ngx_http_push_rbtree_walker(tree, shpool, apply, node->right);
237
static void ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel)
239
ngx_rbtree_generic_insert(temp, node, sentinel, ngx_http_push_compare_rbtree_node);
242
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right)
244
ngx_http_push_channel_t *left = (ngx_http_push_channel_t *) v_left, *right = (ngx_http_push_channel_t *) v_right;
245
return ngx_memn2cmp(left->id.data, right->id.data, left->id.len, right->id.len);