~ubuntu-branches/ubuntu/quantal/nginx/quantal-updates

« back to all changes in this revision

Viewing changes to debian/modules/nginx-http-push/src/ngx_http_push_rbtree_util.c

  • Committer: Bazaar Package Importer
  • Author(s): Kartik Mistry
  • Date: 2011-04-16 13:47:58 UTC
  • mfrom: (4.2.31 sid)
  • Revision ID: james.westby@ubuntu.com-20110416134758-yqca2qp5crh2hw2f
Tags: 1.0.0-2
* debian/rules:
  + Removed --with-file-aio support. Fixed FTBFS on kFreeBSD-* arch
    (Closes: #621882)

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
#include <ngx_config.h>
 
2
#include <ngx_core.h>
 
3
#include <ngx_http.h>
 
4
 
 
5
 
 
6
static ngx_http_push_channel_t * ngx_http_push_get_channel(ngx_str_t * id, ngx_log_t * log);
 
7
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t * id, ngx_log_t * log);
 
8
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash);
 
9
 
 
10
static ngx_http_push_channel_t *        ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel);
 
11
 
 
12
static void     ngx_rbtree_generic_insert(      ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel, int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right));
 
13
static void     ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp, ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel);
 
14
static int      ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right);
 
15
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool);
 
16
 
 
17
static ngx_http_push_channel_t *        ngx_http_push_clean_channel_locked(ngx_http_push_channel_t * channel) {
 
18
        ngx_queue_t                 *sentinel = &channel->message_queue->queue;
 
19
        time_t                       now = ngx_time();
 
20
        ngx_http_push_msg_t         *msg=NULL;
 
21
        while(!ngx_queue_empty(sentinel)){
 
22
                msg = ngx_queue_data(ngx_queue_head(sentinel), ngx_http_push_msg_t, queue);
 
23
                if (msg!=NULL && msg->expires != 0 && now > msg->expires) {
 
24
                        ngx_http_push_delete_message_locked(channel, msg, ngx_http_push_shpool);
 
25
                }
 
26
                else { //definitely a message left to send
 
27
                        return NULL;
 
28
                }
 
29
        }
 
30
        //at this point, the queue is empty
 
31
        return channel->subscribers==0 ? channel : NULL; //if no waiting requests, return this channel to be deleted
 
32
}
 
33
 
 
34
static ngx_int_t ngx_http_push_delete_channel_locked(ngx_http_push_channel_t *trash) {
 
35
        ngx_int_t                      res;
 
36
        res = ngx_http_push_delete_node_locked(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, (ngx_rbtree_node_t *)trash, ngx_http_push_shpool);
 
37
        if(res==NGX_OK) {
 
38
                ((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels--;
 
39
                return NGX_OK;
 
40
        }
 
41
        return res;
 
42
        
 
43
}
 
44
 
 
45
static ngx_int_t ngx_http_push_delete_node_locked(ngx_rbtree_t *tree, ngx_rbtree_node_t *trash, ngx_slab_pool_t *shpool) {
 
46
//assume the shm zone is already locked
 
47
        if(trash != NULL){ //take out the trash         
 
48
                ngx_rbtree_delete(tree, trash);
 
49
                
 
50
                //delete the worker-subscriber queue
 
51
                ngx_queue_t                *sentinel = (ngx_queue_t *)(&((ngx_http_push_channel_t *)trash)->workers_with_subscribers);
 
52
                ngx_queue_t                *cur = ngx_queue_head(sentinel);
 
53
                ngx_queue_t                *next;
 
54
                while(cur!=sentinel) {
 
55
                        next = ngx_queue_next(cur);
 
56
                        ngx_slab_free_locked(shpool, cur);
 
57
                        cur = next;
 
58
                }
 
59
                
 
60
                ngx_slab_free_locked(shpool, trash);
 
61
                return NGX_OK;
 
62
        }
 
63
        return NGX_DECLINED;
 
64
}
 
65
 
 
66
static ngx_http_push_channel_t * ngx_http_push_find_channel(ngx_str_t *id, ngx_log_t *log) {
 
67
        ngx_rbtree_t                   *tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
 
68
        uint32_t                        hash;
 
69
        ngx_rbtree_node_t              *node, *sentinel;
 
70
        ngx_int_t                       rc;
 
71
        ngx_http_push_channel_t        *up = NULL;
 
72
        ngx_http_push_channel_t        *trash[] = { NULL, NULL, NULL };
 
73
        
 
74
        ngx_uint_t                      i, trashed=0;
 
75
        if (tree==NULL) {
 
76
                return NULL;
 
77
        }
 
78
        
 
79
        hash = ngx_crc32_short(id->data, id->len);
 
80
 
 
81
        node = tree->root;
 
82
        sentinel = tree->sentinel;
 
83
 
 
84
        while (node != sentinel) {
 
85
                
 
86
                //every search is responsible for deleting a couple of empty, if it comes across them
 
87
                if (trashed < (sizeof(trash) / sizeof(*trash))) {
 
88
                        if((trash[trashed]=ngx_http_push_clean_channel_locked((ngx_http_push_channel_t *) node))!=NULL) {
 
89
                                trashed++;
 
90
                        }
 
91
                }
 
92
                
 
93
                if (hash < node->key) {
 
94
                        node = node->left;
 
95
                        continue;
 
96
                }
 
97
 
 
98
                if (hash > node->key) {
 
99
                        node = node->right;
 
100
                        continue;
 
101
                }
 
102
                
 
103
                /* hash == node->key */
 
104
 
 
105
                do {
 
106
                        up = (ngx_http_push_channel_t *) node;
 
107
 
 
108
                        rc = ngx_memn2cmp(id->data, up->id.data, id->len, up->id.len);
 
109
 
 
110
                        if (rc == 0) {
 
111
                                //found
 
112
                                for(i=0; i<trashed; i++) {
 
113
                                        if(trash[i] != up){ //take out the trash
 
114
                                                ngx_http_push_delete_channel_locked(trash[i]);
 
115
                                        }
 
116
                                }
 
117
                                ngx_http_push_clean_channel_locked(up);
 
118
                                return up;
 
119
                        }
 
120
 
 
121
                        node = (rc < 0) ? node->left : node->right;
 
122
 
 
123
                } while (node != sentinel && hash == node->key);
 
124
 
 
125
                break;
 
126
        }
 
127
        //not found     
 
128
        for(i=0; i<trashed; i++) {
 
129
                ngx_http_push_delete_channel_locked(trash[i]);
 
130
        }
 
131
        return NULL;
 
132
}
 
133
 
 
134
//find a channel by id. if channel not found, make one, insert it, and return that.
 
135
 static ngx_http_push_channel_t *       ngx_http_push_get_channel(ngx_str_t *id, ngx_log_t *log) {
 
136
        ngx_rbtree_t                   *tree;
 
137
        ngx_http_push_channel_t        *up=ngx_http_push_find_channel(id, log);
 
138
        
 
139
        if(up != NULL) { //we found our channel
 
140
                return up;
 
141
        }
 
142
        tree = &((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree;
 
143
        if((up = ngx_http_push_slab_alloc_locked(sizeof(*up) + id->len + sizeof(ngx_http_push_msg_t)))==NULL) {
 
144
                return NULL;
 
145
        }
 
146
        up->id.data = (u_char *) (up+1); //contiguous piggy
 
147
        up->message_queue = (ngx_http_push_msg_t *) (up->id.data + id->len);
 
148
        
 
149
        up->id.len = (u_char) id->len;
 
150
        ngx_memcpy(up->id.data, id->data, up->id.len);
 
151
        up->node.key = ngx_crc32_short(id->data, id->len);
 
152
        ngx_rbtree_insert(tree, (ngx_rbtree_node_t *) up);
 
153
 
 
154
        //initialize queues
 
155
        ngx_queue_init(&up->message_queue->queue);
 
156
        up->messages=0;
 
157
 
 
158
        ngx_queue_init(&up->workers_with_subscribers.queue);
 
159
        up->subscribers=0;
 
160
        
 
161
        ((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->channels++;
 
162
        
 
163
        return up;
 
164
}
 
165
 
 
166
 
 
167
static void     ngx_rbtree_generic_insert(
 
168
                                ngx_rbtree_node_t *temp, 
 
169
                                ngx_rbtree_node_t *node, 
 
170
                                ngx_rbtree_node_t *sentinel, 
 
171
                                int (*compare)(const ngx_rbtree_node_t *left, const ngx_rbtree_node_t *right))
 
172
{
 
173
        for ( ;; ) {
 
174
                if (node->key < temp->key) {
 
175
 
 
176
                        if (temp->left == sentinel) {
 
177
                                temp->left = node;
 
178
                                break;
 
179
                        }
 
180
 
 
181
                        temp = temp->left;
 
182
 
 
183
                } else if (node->key > temp->key) {
 
184
 
 
185
                        if (temp->right == sentinel) {
 
186
                                temp->right = node;
 
187
                                break;
 
188
                        }
 
189
 
 
190
                        temp = temp->right;
 
191
 
 
192
                } else { /* node->key == temp->key */
 
193
                        if (compare(node, temp) < 0) {
 
194
 
 
195
                                if (temp->left == sentinel) {
 
196
                                        temp->left = node;
 
197
                                        break;
 
198
                                }
 
199
 
 
200
                                temp = temp->left;
 
201
 
 
202
                        } else {
 
203
 
 
204
                                if (temp->right == sentinel) {
 
205
                                        temp->right = node;
 
206
                                        break;
 
207
                                }
 
208
 
 
209
                                temp = temp->right;
 
210
                        }
 
211
                }
 
212
        }
 
213
 
 
214
        node->parent = temp;
 
215
        node->left = sentinel;
 
216
        node->right = sentinel;
 
217
        ngx_rbt_red(node);
 
218
}
 
219
 
 
220
#define ngx_http_push_walk_rbtree(apply)                                            \
 
221
        ngx_http_push_rbtree_walker(&((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree, (ngx_slab_pool_t *)ngx_http_push_shm_zone->shm.addr, apply, ((ngx_http_push_shm_data_t *) ngx_http_push_shm_zone->data)->tree.root)
 
222
 
 
223
static void ngx_http_push_rbtree_walker(ngx_rbtree_t *tree, ngx_slab_pool_t *shpool, ngx_int_t (*apply)(ngx_http_push_channel_t * channel, ngx_slab_pool_t * shpool), ngx_rbtree_node_t *node) {
 
224
        ngx_rbtree_node_t              *sentinel = tree->sentinel;
 
225
        
 
226
        if(node!=sentinel) {
 
227
                apply((ngx_http_push_channel_t *)node, shpool);
 
228
                if(node->left!=NULL) {
 
229
                        ngx_http_push_rbtree_walker(tree, shpool, apply, node->left);
 
230
                }
 
231
                if(node->right!=NULL) {
 
232
                        ngx_http_push_rbtree_walker(tree, shpool, apply, node->right);
 
233
                }
 
234
        }
 
235
}
 
236
 
 
237
static void     ngx_http_push_rbtree_insert(ngx_rbtree_node_t *temp,  ngx_rbtree_node_t *node, ngx_rbtree_node_t *sentinel) 
 
238
{
 
239
        ngx_rbtree_generic_insert(temp, node, sentinel, ngx_http_push_compare_rbtree_node);
 
240
}
 
241
 
 
242
static int ngx_http_push_compare_rbtree_node(const ngx_rbtree_node_t *v_left, const ngx_rbtree_node_t *v_right)
 
243
{
 
244
        ngx_http_push_channel_t *left = (ngx_http_push_channel_t *) v_left, *right = (ngx_http_push_channel_t *) v_right;
 
245
        return ngx_memn2cmp(left->id.data, right->id.data, left->id.len, right->id.len);
 
246
}